单发送者,单接受者

使用 std::sync::mpsc (multiple producer, single consumer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::sync::mpsc;
use std::thread;

fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();

// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();

// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});

// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}

接受数据的时候,recv() 是阻塞的;try_recv() 是不阻塞的,当通道中没有消息时,它会立刻返回一个错误 Result<> (Ok 或者 Err())。

多发送者

mpsc 支持多发送者,我们需要将 sender 进行 clone() 后,给每一个线程一个拷贝即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});

thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});

for received in rx {
println!("Got: {}", received);
}
}

需要注意的是:

  1. 需要所有的发送者都被 drop 掉后,接收者 rx 才会收到错误,进而跳出 for 循环,最终结束主线程

通道与所有权的转移

  • 如果值的类型实现了 Copy,则直接复制该值,传输到 channel 里.
  • 如果没有实现 Copy,则其所有权会转移到 channel,随后给 receiver.