跳到主要内容

高级迭代器应用

本节将探讨迭代器的高级应用场景,包括自定义迭代器实现、性能优化、并行处理等。

自定义迭代器实现

实现 Iterator Trait

struct Fibonacci {
current: u64,
next: u64,
}

impl Fibonacci {
fn new() -> Self {
Fibonacci { current: 0, next: 1 }
}
}

impl Iterator for Fibonacci {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
let current = self.current;
self.current = self.next;
self.next = current + self.next;

// 防止溢出
if current > u64::MAX / 2 {
None
} else {
Some(current)
}
}
}

fn main() {
let fib = Fibonacci::new();

// 取前10个斐波那契数
let first_ten: Vec<u64> = fib.take(10).collect();
println!("前10个斐波那契数: {:?}", first_ten);

// 找到第一个大于100的斐波那契数
let first_over_100 = Fibonacci::new()
.find(|&x| x > 100);
println!("第一个大于100的: {:?}", first_over_100);
}

带状态的迭代器

struct MovingAverage {
data: Vec<f64>,
window_size: usize,
current_index: usize,
}

impl MovingAverage {
fn new(data: Vec<f64>, window_size: usize) -> Self {
MovingAverage {
data,
window_size,
current_index: 0,
}
}
}

impl Iterator for MovingAverage {
type Item = f64;

fn next(&mut self) -> Option<Self::Item> {
if self.current_index + self.window_size > self.data.len() {
return None;
}

let window = &self.data[self.current_index..self.current_index + self.window_size];
let average = window.iter().sum::<f64>() / self.window_size as f64;

self.current_index += 1;
Some(average)
}
}

fn main() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let moving_avg = MovingAverage::new(data, 3);

let averages: Vec<f64> = moving_avg.collect();
println!("移动平均: {:?}", averages);
}

双向迭代器

struct Range {
start: i32,
end: i32,
}

impl Range {
fn new(start: i32, end: i32) -> Self {
Range { start, end }
}
}

impl Iterator for Range {
type Item = i32;

fn next(&mut self) -> Option<Self::Item> {
if self.start < self.end {
let current = self.start;
self.start += 1;
Some(current)
} else {
None
}
}
}

impl DoubleEndedIterator for Range {
fn next_back(&mut self) -> Option<Self::Item> {
if self.start < self.end {
self.end -= 1;
Some(self.end)
} else {
None
}
}
}

fn main() {
let mut range = Range::new(1, 6);

println!("从前面: {:?}", range.next()); // Some(1)
println!("从后面: {:?}", range.next_back()); // Some(5)
println!("从前面: {:?}", range.next()); // Some(2)
println!("从后面: {:?}", range.next_back()); // Some(4)

// 收集剩余元素
let remaining: Vec<i32> = range.collect();
println!("剩余: {:?}", remaining); // [3]
}

迭代器组合器

自定义组合器

// 批处理迭代器
struct Batched<I> {
iter: I,
size: usize,
}

impl<I> Batched<I> {
fn new(iter: I, size: usize) -> Self {
Batched { iter, size }
}
}

impl<I> Iterator for Batched<I>
where
I: Iterator,
{
type Item = Vec<I::Item>;

fn next(&mut self) -> Option<Self::Item> {
let mut batch = Vec::with_capacity(self.size);

for _ in 0..self.size {
match self.iter.next() {
Some(item) => batch.push(item),
None => break,
}
}

if batch.is_empty() {
None
} else {
Some(batch)
}
}
}

// 扩展 trait
trait IteratorExt: Iterator {
fn batched(self, size: usize) -> Batched<Self>
where
Self: Sized,
{
Batched::new(self, size)
}

fn unique(self) -> Unique<Self>
where
Self: Sized,
Self::Item: Clone + Eq + std::hash::Hash,
{
Unique::new(self)
}
}

impl<I: Iterator> IteratorExt for I {}

// 去重迭代器
use std::collections::HashSet;

struct Unique<I> {
iter: I,
seen: HashSet<I::Item>,
}

impl<I> Unique<I> {
fn new(iter: I) -> Self {
Unique {
iter,
seen: HashSet::new(),
}
}
}

impl<I> Iterator for Unique<I>
where
I: Iterator,
I::Item: Clone + Eq + std::hash::Hash,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
while let Some(item) = self.iter.next() {
if self.seen.insert(item.clone()) {
return Some(item);
}
}
None
}
}

fn main() {
// 批处理示例
let numbers: Vec<i32> = (1..=10).collect();
let batches: Vec<Vec<i32>> = numbers
.into_iter()
.batched(3)
.collect();
println!("批处理: {:?}", batches);

// 去重示例
let duplicates = vec![1, 2, 2, 3, 1, 4, 3, 5];
let unique: Vec<i32> = duplicates
.into_iter()
.unique()
.collect();
println!("去重: {:?}", unique);
}

异步迭代器

Stream 概念

use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;

// 简化的 Stream trait(实际使用 futures crate)
trait Stream {
type Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

// 异步范围迭代器
struct AsyncRange {
current: i32,
end: i32,
}

impl AsyncRange {
fn new(start: i32, end: i32) -> Self {
AsyncRange { current: start, end }
}
}

impl Stream for AsyncRange {
type Item = i32;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.current < self.end {
let current = self.current;
self.current += 1;
Poll::Ready(Some(current))
} else {
Poll::Ready(None)
}
}
}

// 注意:这只是概念演示,实际使用需要 futures 或 tokio crate
fn main() {
println!("异步迭代器概念演示");
println!("实际使用需要 futures 或 tokio crate");
}

性能优化

避免不必要的分配

fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

// 不好:多次分配
let step1: Vec<i32> = data.iter().map(|x| x * 2).collect();
let step2: Vec<i32> = step1.iter().filter(|&&x| x > 10).cloned().collect();
let result: i32 = step2.iter().sum();

// 好:链式操作,无中间分配
let result_optimized: i32 = data
.iter()
.map(|x| x * 2)
.filter(|&&x| x > 10)
.sum();

println!("多次分配结果: {}", result);
println!("优化后结果: {}", result_optimized);
}

使用 size_hint 优化

struct OptimizedRange {
start: usize,
end: usize,
}

impl OptimizedRange {
fn new(start: usize, end: usize) -> Self {
OptimizedRange { start, end }
}
}

impl Iterator for OptimizedRange {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
if self.start < self.end {
let current = self.start;
self.start += 1;
Some(current)
} else {
None
}
}

// 提供大小提示,帮助 collect 等方法预分配内存
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.end.saturating_sub(self.start);
(remaining, Some(remaining))
}
}

impl ExactSizeIterator for OptimizedRange {}

fn main() {
let range = OptimizedRange::new(0, 1000);

// collect 可以预分配正确大小的向量
let numbers: Vec<usize> = range.collect();
println!("收集了 {} 个数字", numbers.len());
}

并行迭代器(概念)

Rayon 风格的并行处理

// 注意:这只是概念演示,实际使用 rayon crate

fn parallel_sum_concept(data: &[i32]) -> i32 {
// 概念:将数据分块并行处理
let chunk_size = data.len() / 4; // 假设4个线程
let chunks: Vec<&[i32]> = data.chunks(chunk_size).collect();

// 模拟并行处理
chunks
.iter()
.map(|chunk| chunk.iter().sum::<i32>())
.sum()
}

fn main() {
let data: Vec<i32> = (1..=1000).collect();

// 串行处理
let serial_sum: i32 = data.iter().sum();

// 概念性并行处理
let parallel_sum = parallel_sum_concept(&data);

println!("串行求和: {}", serial_sum);
println!("并行求和: {}", parallel_sum);

// 实际使用 rayon 会是这样:
// use rayon::prelude::*;
// let parallel_sum: i32 = data.par_iter().sum();
}

内存效率的迭代器

生成器模式

struct PrimeGenerator {
current: u64,
primes: Vec<u64>,
}

impl PrimeGenerator {
fn new() -> Self {
PrimeGenerator {
current: 2,
primes: Vec::new(),
}
}

fn is_prime(&self, n: u64) -> bool {
for &prime in &self.primes {
if prime * prime > n {
break;
}
if n % prime == 0 {
return false;
}
}
true
}
}

impl Iterator for PrimeGenerator {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
loop {
if self.is_prime(self.current) {
let prime = self.current;
self.primes.push(prime);
self.current += if self.current == 2 { 1 } else { 2 };
return Some(prime);
}
self.current += if self.current == 2 { 1 } else { 2 };

// 防止无限循环
if self.current > 1000000 {
return None;
}
}
}
}

fn main() {
let primes: Vec<u64> = PrimeGenerator::new()
.take(20)
.collect();

println!("前20个质数: {:?}", primes);
}

错误处理与迭代器

Result 迭代器

fn main() {
let strings = vec!["1", "2", "not_a_number", "4", "5"];

// 收集所有结果
let results: Vec<Result<i32, _>> = strings
.iter()
.map(|s| s.parse::<i32>())
.collect();

println!("所有结果: {:?}", results);

// 只保留成功的结果
let successes: Vec<i32> = strings
.iter()
.filter_map(|s| s.parse().ok())
.collect();

println!("成功解析: {:?}", successes);

// 遇到第一个错误就停止
let first_error: Result<Vec<i32>, _> = strings
.iter()
.map(|s| s.parse::<i32>())
.collect();

println!("遇到错误停止: {:?}", first_error);
}

自定义错误处理迭代器

#[derive(Debug)]
enum ProcessError {
ParseError(String),
ValidationError(String),
}

struct ErrorCollectingIterator<I> {
iter: I,
errors: Vec<ProcessError>,
}

impl<I> ErrorCollectingIterator<I> {
fn new(iter: I) -> Self {
ErrorCollectingIterator {
iter,
errors: Vec::new(),
}
}

fn errors(&self) -> &[ProcessError] {
&self.errors
}
}

impl<I> Iterator for ErrorCollectingIterator<I>
where
I: Iterator<Item = Result<i32, ProcessError>>,
{
type Item = i32;

fn next(&mut self) -> Option<Self::Item> {
while let Some(result) = self.iter.next() {
match result {
Ok(value) => return Some(value),
Err(error) => self.errors.push(error),
}
}
None
}
}

fn main() {
let data = vec!["1", "2", "invalid", "4", "also_invalid"];

let results = data.iter().map(|s| {
s.parse::<i32>()
.map_err(|_| ProcessError::ParseError(s.to_string()))
});

let mut error_collector = ErrorCollectingIterator::new(results);
let valid_numbers: Vec<i32> = error_collector.collect();

println!("有效数字: {:?}", valid_numbers);
println!("错误: {:?}", error_collector.errors());
}

实际应用案例

数据处理管道

use std::collections::HashMap;

#[derive(Debug, Clone)]
struct LogEntry {
timestamp: u64,
level: String,
message: String,
user_id: Option<u32>,
}

fn main() {
let logs = vec![
LogEntry { timestamp: 1000, level: "INFO".to_string(), message: "User login".to_string(), user_id: Some(123) },
LogEntry { timestamp: 1001, level: "ERROR".to_string(), message: "Database error".to_string(), user_id: None },
LogEntry { timestamp: 1002, level: "INFO".to_string(), message: "User logout".to_string(), user_id: Some(123) },
LogEntry { timestamp: 1003, level: "WARN".to_string(), message: "High memory usage".to_string(), user_id: None },
LogEntry { timestamp: 1004, level: "ERROR".to_string(), message: "Connection timeout".to_string(), user_id: Some(456) },
];

// 分析错误日志
let error_analysis: HashMap<String, usize> = logs
.iter()
.filter(|log| log.level == "ERROR")
.fold(HashMap::new(), |mut acc, log| {
let key = if log.message.contains("Database") {
"Database".to_string()
} else if log.message.contains("Connection") {
"Network".to_string()
} else {
"Other".to_string()
};
*acc.entry(key).or_insert(0) += 1;
acc
});

println!("错误分析: {:?}", error_analysis);

// 用户活动统计
let user_activities: HashMap<u32, usize> = logs
.iter()
.filter_map(|log| log.user_id)
.fold(HashMap::new(), |mut acc, user_id| {
*acc.entry(user_id).or_insert(0) += 1;
acc
});

println!("用户活动: {:?}", user_activities);
}

文件处理流水线

use std::fs::File;
use std::io::{BufRead, BufReader, Result};

fn process_large_file(filename: &str) -> Result<()> {
let file = File::open(filename)?;
let reader = BufReader::new(file);

let stats = reader
.lines()
.enumerate()
.filter_map(|(line_num, line_result)| {
line_result.ok().map(|line| (line_num + 1, line))
})
.filter(|(_, line)| !line.trim().is_empty())
.map(|(line_num, line)| (line_num, line.split_whitespace().count()))
.fold((0, 0, 0), |(total_lines, total_words, max_words), (_, word_count)| {
(
total_lines + 1,
total_words + word_count,
max_words.max(word_count),
)
});

println!("文件统计:");
println!(" 总行数: {}", stats.0);
println!(" 总单词数: {}", stats.1);
println!(" 最多单词的行: {} 个单词", stats.2);

Ok(())
}

fn main() {
// 模拟文件处理
let sample_lines = vec![
"Hello world",
"",
"This is a longer line with more words",
"Short",
"Another line here",
];

let stats = sample_lines
.iter()
.enumerate()
.filter(|(_, line)| !line.trim().is_empty())
.map(|(line_num, line)| (line_num + 1, line.split_whitespace().count()))
.fold((0, 0, 0), |(total_lines, total_words, max_words), (_, word_count)| {
(
total_lines + 1,
total_words + word_count,
max_words.max(word_count),
)
});

println!("模拟文件统计:");
println!(" 总行数: {}", stats.0);
println!(" 总单词数: {}", stats.1);
println!(" 最多单词的行: {} 个单词", stats.2);
}

最佳实践总结

1. 选择合适的迭代器类型

fn demonstrate_iterator_choices() {
let data = vec![1, 2, 3, 4, 5];

// 只需要读取:使用 iter()
let sum: i32 = data.iter().sum();

// 需要修改:使用 iter_mut()
let mut data_mut = vec![1, 2, 3, 4, 5];
data_mut.iter_mut().for_each(|x| *x *= 2);

// 需要所有权:使用 into_iter()
let owned_data: Vec<String> = data
.into_iter()
.map(|x| x.to_string())
.collect();

println!("求和: {}", sum);
println!("修改后: {:?}", data_mut);
println!("拥有所有权: {:?}", owned_data);
}

2. 合理使用惰性求值

fn demonstrate_lazy_evaluation() {
let numbers = 1..1_000_000;

// 惰性:只计算需要的部分
let first_even = numbers
.filter(|x| x % 2 == 0)
.next();

println!("第一个偶数: {:?}", first_even);

// 避免:不必要的完整计算
// let all_evens: Vec<i32> = numbers.filter(|x| x % 2 == 0).collect();
// let first = all_evens.first();
}

3. 优化内存使用

fn memory_efficient_processing() {
// 好:流式处理
let result = (1..1_000_000)
.filter(|x| x % 2 == 0)
.map(|x| x * x)
.take(10)
.sum::<i32>();

println!("流式处理结果: {}", result);
}

迭代器是 Rust 中最强大的抽象之一,掌握其高级应用可以让你写出既优雅又高效的代码。