异步编程 🔗
future, async: 定义一个可并发执行的任务; await 触发这个任务并发执行。
std的Future + runtime Tokio
lock free structure不用上锁,利用原子操作实现的并发安全的数据结构。
single producer, multi-consumer (broadcast, spmc) 🔗
仓库地址:bus, broadcast channel
依赖了crossbeam-channel
--> consumer
producer broadcast ==> --> consumer
--> consumer
创建数据结构Bus::new()
添加consumerBus::add_rx() -> BusReader
发送广播try_broadcast()
接受消息try_recv()
Bus(Arc<BusInner>...)
BusInner(Vec<Seat>...)
Seat(read_num, data)
每一份数据`Seat(read_num, data)`需要额外记录被consumer读取的次数。
写时无锁,vec为缓行队列,当tail+1的位置Seat中read_num已到达consumer num时则覆盖写,否则等待
读时无锁,修改vec中seat中的read_num++。
multi producer, single consumer (mpsc) 🔗
标准库中std::sync::mpsc有实现
multi producer, multi consumer(mpmc) 🔗
crossbeam channel: unbounded channel(list), bounded channel(array)
《rust atomics and locks》 🔗
读书 和example 代码笔记
并发的基本概念 🔗
线程 std::thread::spawn, std::thread::Builder::new(),scoped thread
static X: [i32; 3] = [1,2,3];
thread::spawn(||dbg!(&X));
let x: &'static [i32; 3] = Box::leak(Box::new([1,2,3]));
thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));
共享所有权,引用计数
use std::rc::Rc;
let a = Rc::new([1,2,3]);
let b = a.clone();
assert_eq!(a.as_ptr(), b.as_ptr());
thread::spawn(move || dbg!(b)); // Error `Rc` cannot be sent between threads safely
use std::sync::Arc;
let a = Arc::new([1,2,3]);
let b = a.clone();
thread::spawn(move || dbg!(a));
thread::spawn(move || dbg!(b));
内部可变性interior mutability
Rc, Arc 涉及了内部可变性。只能在single thread中使用,通过不变引用可以修改对象内部数据
use std::cell::Cell;
fn f(v: &Cell<Vec<i32>>) {
let mut v2 = v.take();
v2.push(2);
v.set(v2);
}
use std::cell::RefCell;
// 记录引用, may panic
fn f1(v: &RefCell<Vec<i32>>) {
v.borrow_mut().push(2);
}
UnsafeCell
get方法返回raw pointer。 UnsafeCell不直接使用, 在Rc,Cell, RefCell中作为内部可变性的封装结构使用。
互斥Mutex and RwLock
RwLock也记录引用, no panic, 是RefCell的并发版本
std::sync::Mutex<T>只允许互斥的借用
mutex并没有unlock方法,为了保证unlock只能由lock的线程释放。lock() 返回MutexGuard结构,在dropMutexGuard时执行unlock。
lock poisoning: 当持有lock的线程panic了,mutex被标记为poisoned。
原子Atomics 是Cell的并发版本, copy value,不允许借用。
线程安全 Send and Sync
Send: Arc<i32> , !Send: Rc<i32>
!Sync: Cell<i32> 内部可变性的类型不能sync。(Cell<i32> is Send)
*const T, *mut T 不是send和sync
thread park, std::sync::Condvar
原子操作atomics 🔗
Atomic Load, Store
Fetch and Modify
AtomicI32::new(100).fetch_add(23, Relaxed)
Compare and Exchange
memory ordering 🔗
Reordering and Optimizations
Memory Model
Happens-Before
Relaxed Ordering
Release and Acquire Ordering
Consume Ordering
Sequentially Consistent Ordering
Fences: std::sync::atomic::fence
build own spin lock, channels, Arc, 🔗
todo
processor 🔗
os primitives 🔗
build own locks 🔗
关于rust的异步编程 🔗
async, .await, Future 🔗
async/.await是了为了更简单实现Future的语法糖
async -> 生成实现了Future trait的结构体, await -> 触发Future的执行
async fn foo1() -> usize{ 0 }
// foo1 == foo2
fn foo2() -> impl std::future::Future<Output = usize> {
async{0}
}
async语法糖把代码块或者函数转换为 实现了trait Future的enum结构的状态机
use futures::executor::block_on;
use std::time;
async fn do_something() {
std::thread::sleep(time::Duration::from_secs(2));
println!("do something... so print hello, world!");
}
fn main() {
println!("starting main");
let future = do_something();
block_on(future);
println!("ending main");
}
在async函数内,使用.await等待依赖的future的结束。
use std::thread;
use std::time;
use futures::executor::block_on;
#[derive(Debug)]
struct Song(String);
async fn learn_song(song_name: String) -> Song {
Song(song_name)
}
async fn sing_song(song: Song) {
thread::sleep(time::Duration::from_secs(3));
println!("sing_song: {:?}", song);
}
async fn learn_and_sing(song_name: String) {
let song = learn_song(song_name).await;
sing_song(song).await;
}
async fn dance() {
println!("dancing");
}
async fn async_main() {
let f1 = learn_and_sing("茉莉花".to_string());
let f2 = dance();
futures::join!(f2, f1);
}
fn main() {
block_on(async_main());
}
Future的结构
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use futures::executor::block_on;
executor -> future的调度器。 常见的 executor 有:
- futures 库自带的很简单的 executor,上面的代码就使用了它的 block_on 函数;
- tokio 提供的 executor,当使用 #[tokio::main] 时,就隐含引入了 tokio 的 executor;
为什么需要Pin 🔗
生成的实现了Future的结构体不包含引用自身数据的引用
let fut_one = /* ... */;
let fut_two = /* ... */;
async move {
fut_one.await;
fut_two.await;
}
struct AsyncFuture {
fut_one: FutOne,
fut_two: FutTwo,
state: State,
}
enum State {
AwaitingFutOne,
AwaitingFutTwo,
Done,
}
impl Future for AsyncFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::AwaitingFutOne => match self.fut_one.poll(..) {
Poll::Ready(()) => self.state = State::AwaitingFutTwo,
Poll::Pending => return Poll::Pending,
}
State::AwaitingFutTwo => match self.fut_two.poll(..) {
Poll::Ready(()) => self.state = State::Done,
Poll::Pending => return Poll::Pending,
}
State::Done => return Poll::Ready(()),
}
}
}
}
生成的结构体包含了引用自身的数据的引用,如果AsyncFuture移动了, x也移动了地址,但是read_info_buf_fut指向的还是老x的引用,有问题。
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
read_into_buf_fut.await;
println!("{:?}", x);
}
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], // points to `x` below
}
struct AsyncFuture {
x: [u8; 128],
read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
需要增加Pin<&mut T>, Pin<&T>, Pin<Box
generators 生成实现了Future类型的结构体 🔗
tasks, executors, reactors 🔗
futures-rs 🔗
tokio-rs 🔗
Send, Sync 🔗
Send Sync的引入是由于std::thread::spawn()调用时的类型约束。 谈到时必须牵涉到线程。
Send 表示跨线程 move(ownership), Sync 表示跨线程 share data(borrow)。
Send 🔗
Send是在规定一个type产生出来的值可以被move到其他线程中,跨线程的move。 那么这种type的值需要拥有所有权。
几乎所有的 Rust 类型都是Send 的, 例如Arc
但是也有例外: 实现了!Send的类型有:
Rc
MutexGuar是不能send的
引用不是Send的
所有的裸指针不是Send的, 也不是Sync
还有一些
// right, but Cell, RefCell are `Send`. because Cell and RefCell 拥有所有权。
fn main() {
let a_cell = Cell::new(5);
}
// error, but cell
Sync 🔗
A type is Sync if it is safe to share between threads
Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用
(T is Sync if and only if &T is Send).
UnsafeCell isn't Sync (and therefore Cell and RefCell aren't).
🔗
Cell, RefCell, UnsafeCell is not sync but is Send(因为拥有了所有权,可以move到thread中)。 so can move it into thread but not share between threads.
RefCell
fn spawn<F, T>(f: F) -> JoinHandle<T>
where F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
1 需要修改这个值, 2 需要把值赋予给多个所有者 。 Rc<RefCell<T>>。 多线程下Arc<Mutex<T>>