Arc 智能指针
Arc<T>(Atomically Reference Counted)是线程安全的引用计数智能指针,允许多个线程安全地共享同一份数据。
什么是 Arc?
Arc<T> 是一个智能指针,它:
- 线程安全的引用计数
- 允许多个所有者跨线程共享数据
- 使用原子操作管理引用计数
- 数据是不可变的(除非配合 Mutex/RwLock)
基本用法
创建和克隆 Arc
use std::sync::Arc;
use std::thread;
fn main() {
let data = Arc::new(vec![1, 2, 3, 4, 5]);
println!("主线程引用计数: {}", Arc::strong_count(&data));
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("子线程数据: {:?}", data_clone);
println!("子线程引用计数: {}", Arc::strong_count(&data_clone));
});
handle.join().unwrap();
println!("主线程最终引用计数: {}", Arc::strong_count(&data));
}
多线程共享数据
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() {
let shared_data = Arc::new(String::from("共享数据"));
let mut handles = vec![];
for i in 0..5 {
let data = Arc::clone(&shared_data);
let handle = thread::spawn(move || {
println!("线程 {} 访问: {}", i, data);
thread::sleep(Duration::from_millis(100));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("所有线程完成,引用计数: {}", Arc::strong_count(&shared_data));
}
Arc 与 Mutex 结合
共享可变状态
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数: {}", *counter.lock().unwrap());
}
复杂的共享状态
use std::sync::{Arc, Mutex};
use std::thread;
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct UserData {
name: String,
score: i32,
}
fn main() {
let shared_map = Arc::new(Mutex::new(HashMap::<u32, UserData>::new()));
let mut handles = vec![];
// 写入线程
for i in 0..5 {
let map = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
let user = UserData {
name: format!("User{}", i),
score: i * 10,
};
map.lock().unwrap().insert(i, user);
println!("线程 {} 插入了用户数据", i);
});
handles.push(handle);
}
// 读取线程
for i in 0..3 {
let map = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(50));
let map_guard = map.lock().unwrap();
println!("读取线程 {} 看到 {} 个用户", i, map_guard.len());
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终用户数据: {:?}", shared_map.lock().unwrap());
}
Arc 与 RwLock 结合
读写锁优化
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
let shared_data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// 多个读取线程
for i in 0..5 {
let data = Arc::clone(&shared_data);
let handle = thread::spawn(move || {
let read_guard = data.read().unwrap();
println!("读取线程 {} 看到数据: {:?}", i, *read_guard);
thread::sleep(Duration::from_millis(100));
});
handles.push(handle);
}
// 一个写入线程
let data = Arc::clone(&shared_data);
let write_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let mut write_guard = data.write().unwrap();
write_guard.push(6);
println!("写入线程添加了元素");
});
handles.push(write_handle);
for handle in handles {
handle.join().unwrap();
}
println!("最终数据: {:?}", shared_data.read().unwrap());
}
线程池示例
简单线程池
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} 执行任务", id);
job();
});
Worker { id, thread }
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("任务 {} 正在执行", i);
thread::sleep(std::time::Duration::from_millis(500));
println!("任务 {} 完成", i);
});
}
thread::sleep(std::time::Duration::from_secs(3));
}
生产者-消费者模式
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
use std::time::Duration;
struct Buffer<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl<T> Buffer<T> {
fn new(capacity: usize) -> Self {
Buffer {
queue: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn put(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
while queue.len() == self.capacity {
queue = self.not_full.wait(queue).unwrap();
}
queue.push_back(item);
self.not_empty.notify_one();
}
fn take(&self) -> T {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() {
queue = self.not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
self.not_full.notify_one();
item
}
}
fn main() {
let buffer = Arc::new(Buffer::new(5));
let mut handles = vec![];
// 生产者线程
for i in 0..3 {
let buffer = Arc::clone(&buffer);
let handle = thread::spawn(move || {
for j in 0..5 {
let item = format!("Producer{}-Item{}", i, j);
println!("生产: {}", item);
buffer.put(item);
thread::sleep(Duration::from_millis(100));
}
});
handles.push(handle);
}
// 消费者线程
for i in 0..2 {
let buffer = Arc::clone(&buffer);
let handle = thread::spawn(move || {
for _ in 0..7 {
let item = buffer.take();
println!("消费者{} 消费: {}", i, item);
thread::sleep(Duration::from_millis(150));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
缓存系统
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
struct CacheEntry {
value: String,
created_at: Instant,
ttl: Duration,
}
impl CacheEntry {
fn new(value: String, ttl: Duration) -> Self {
CacheEntry {
value,
created_at: Instant::now(),
ttl,
}
}
fn is_expired(&self) -> bool {
self.created_at.elapsed() > self.ttl
}
}
struct Cache {
data: Arc<RwLock<HashMap<String, CacheEntry>>>,
}
impl Cache {
fn new() -> Self {
Cache {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
fn get(&self, key: &str) -> Option<String> {
let read_guard = self.data.read().unwrap();
if let Some(entry) = read_guard.get(key) {
if !entry.is_expired() {
return Some(entry.value.clone());
}
}
None
}
fn set(&self, key: String, value: String, ttl: Duration) {
let mut write_guard = self.data.write().unwrap();
write_guard.insert(key, CacheEntry::new(value, ttl));
}
fn cleanup_expired(&self) {
let mut write_guard = self.data.write().unwrap();
write_guard.retain(|_, entry| !entry.is_expired());
}
fn size(&self) -> usize {
self.data.read().unwrap().len()
}
fn clone_handle(&self) -> Cache {
Cache {
data: Arc::clone(&self.data),
}
}
}
fn main() {
let cache = Cache::new();
let mut handles = vec![];
// 写入线程
for i in 0..3 {
let cache = cache.clone_handle();
let handle = thread::spawn(move || {
for j in 0..5 {
let key = format!("key{}_{}", i, j);
let value = format!("value{}_{}", i, j);
cache.set(key.clone(), value, Duration::from_secs(2));
println!("设置缓存: {}", key);
thread::sleep(Duration::from_millis(100));
}
});
handles.push(handle);
}
// 读取线程
for i in 0..2 {
let cache = cache.clone_handle();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
for j in 0..10 {
let key = format!("key{}_{}", j % 3, j % 5);
if let Some(value) = cache.get(&key) {
println!("读取线程{} 获取到: {} = {}", i, key, value);
} else {
println!("读取线程{} 未找到: {}", i, key);
}
thread::sleep(Duration::from_millis(300));
}
});
handles.push(handle);
}
// 清理线程
let cache_cleanup = cache.clone_handle();
let cleanup_handle = thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(1));
let old_size = cache_cleanup.size();
cache_cleanup.cleanup_expired();
let new_size = cache_cleanup.size();
if old_size != new_size {
println!("清理过期缓存: {} -> {}", old_size, new_size);
}
}
});
for handle in handles {
handle.join().unwrap();
}
println!("最终缓存大小: {}", cache.size());
}
性能考虑
Arc vs Rc 性能比较
use std::sync::Arc;
use std::rc::Rc;
use std::time::Instant;
fn main() {
let n = 1_000_000;
// Rc 性能测试
let rc_data = Rc::new(vec![1, 2, 3, 4, 5]);
let start = Instant::now();
let mut rc_refs = Vec::new();
for _ in 0..n {
rc_refs.push(Rc::clone(&rc_data));
}
println!("Rc 克隆时间: {:?}", start.elapsed());
// Arc 性能测试
let arc_data = Arc::new(vec![1, 2, 3, 4, 5]);
let start = Instant::now();
let mut arc_refs = Vec::new();
for _ in 0..n {
arc_refs.push(Arc::clone(&arc_data));
}
println!("Arc 克隆时间: {:?}", start.elapsed());
println!("Arc 由于原子操作会比 Rc 慢一些");
}
内存开销
use std::sync::Arc;
use std::rc::Rc;
use std::mem;
fn main() {
let data = vec![1, 2, 3, 4, 5];
let rc_data = Rc::new(data.clone());
let arc_data = Arc::new(data);
println!("Rc 大小: {} bytes", mem::size_of_val(&rc_data));
println!("Arc 大小: {} bytes", mem::size_of_val(&arc_data));
// Arc 和 Rc 的大小相同,但 Arc 使用原子操作
println!("两者大小相同,但 Arc 有原子操作开销");
}
最佳实践
1. 何时使用 Arc
// 适合使用 Arc 的场景:
// - 多线程环境下共享数据
// - 需要跨线程传递所有权
// - 线程池、缓存系统
// - 并发数据结构
// 不适合的场景:
// - 单线程环境(使用 Rc)
// - 需要频繁修改(考虑性能影响)
// - 简单的数据传递(考虑 channel)
2. 与同步原语结合
use std::sync::{Arc, Mutex, RwLock};
// 选择合适的同步原语:
// Arc<Mutex<T>> - 需要独占访问
// Arc<RwLock<T>> - 读多写少
// Arc<AtomicT> - 简单原子类型
3. 避免死锁
use std::sync::{Arc, Mutex};
fn avoid_deadlock_example() {
let data1 = Arc::new(Mutex::new(1));
let data2 = Arc::new(Mutex::new(2));
// 好的做法:始终按相同顺序获取锁
let _guard1 = data1.lock().unwrap();
let _guard2 = data2.lock().unwrap();
// 避免:不同顺序获取锁可能导致死锁
}
Arc 是多线程环境下共享数据的首选智能指针,但需要注意性能开销和正确的同步策略。下一节我们将学习 RefCell 智能指针。