单发送者,单接受者

使用 std::sync::mpsc::channel() (multiple producer, single consumer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::sync::mpsc;
use std::thread;

fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();

// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();

// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});

// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}

接受数据的时候,recv() 是阻塞的;try_recv() 是不阻塞的,当通道中没有消息时,它会立刻返回一个错误 Result<> (Ok 或者 Err())。

多发送者

mpsc 支持多发送者,我们需要将 sender 进行 clone() 后,给每一个线程一个拷贝即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});

thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});

for received in rx {
println!("Got: {}", received);
}
}

需要注意的是:

  1. 需要所有的发送者都被 drop 掉后,接收者 rx 才会收到错误,进而跳出 for 循环,最终结束主线程

通道与所有权的转移

  • 如果值的类型实现了 Copy,则直接复制该值,传输到 channel 里.
  • 如果没有实现 Copy,则其所有权会转移到 channel,随后给 receiver.

同步通道、异步通道

  • 异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞
  • 同步通道:发送消息是阻塞的,只有在消息被接收后才解除阻塞 mpsc::sync_channel(N)

这里可以通过设置 N,使得发送者可以无阻塞地发送 N 条消息。当消息缓冲队列满了后,新发送的消息将被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx)= mpsc::sync_channel(0);
// 设置 0 条消息可以无阻塞发送

let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap();
println!("发送之后");
});

println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");

println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}

关闭通道

所有发送者被 drop 或者所有接收者被 drop 后,通道会自动关闭。