rust concurrency

· 2435 words · 5 minute read

异步编程 🔗

  future, async: 定义一个可并发执行的任务; await 触发这个任务并发执行。
  std的Future + runtime Tokio
  lock free structure不用上锁,利用原子操作实现的并发安全的数据结构。

single producer, multi-consumer (broadcast, spmc) 🔗

仓库地址:bus, broadcast channel
依赖了crossbeam-channel

                            --> consumer
producer broadcast ==>      --> consumer
                            --> consumer

创建数据结构Bus::new()

添加consumerBus::add_rx() -> BusReader

发送广播try_broadcast()

接受消息try_recv()

Bus(Arc<BusInner>...)
BusInner(Vec<Seat>...)
Seat(read_num, data)

每一份数据`Seat(read_num, data)`需要额外记录被consumer读取的次数。

写时无锁,vec为缓行队列,当tail+1的位置Seat中read_num已到达consumer num时则覆盖写,否则等待
读时无锁,修改vec中seat中的read_num++。

multi producer, single consumer (mpsc) 🔗

标准库中std::sync::mpsc有实现

multi producer, multi consumer(mpmc) 🔗

crossbeam channel: unbounded channel(list), bounded channel(array)

《rust atomics and locks》 🔗

读书 和example 代码笔记

并发的基本概念 🔗

 线程 std::thread::spawn, std::thread::Builder::new(),scoped thread

static X: [i32; 3] = [1,2,3];
thread::spawn(||dbg!(&X));

let x: &'static [i32; 3] = Box::leak(Box::new([1,2,3]));
thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));

 共享所有权,引用计数

use std::rc::Rc;
let a = Rc::new([1,2,3]);
let b = a.clone();
assert_eq!(a.as_ptr(), b.as_ptr()); 

thread::spawn(move || dbg!(b)); // Error `Rc` cannot be sent between threads safely

use std::sync::Arc;
let a = Arc::new([1,2,3]);
let b = a.clone();
thread::spawn(move || dbg!(a));
thread::spawn(move || dbg!(b));

 内部可变性interior mutability Rc, Arc 涉及了内部可变性。只能在single thread中使用,通过不变引用可以修改对象内部数据

use std::cell::Cell;
fn f(v: &Cell<Vec<i32>>) {
    let mut v2 = v.take();
    v2.push(2);
    v.set(v2);
}

use std::cell::RefCell;
// 记录引用, may panic
fn f1(v: &RefCell<Vec<i32>>) {
    v.borrow_mut().push(2);
}

  UnsafeCell
get方法返回raw pointer。 UnsafeCell不直接使用, 在Rc,Cell, RefCell中作为内部可变性的封装结构使用。

 互斥Mutex and RwLock
RwLock也记录引用, no panic, 是RefCell的并发版本
std::sync::Mutex<T>只允许互斥的借用

mutex并没有unlock方法,为了保证unlock只能由lock的线程释放。lock() 返回MutexGuard结构,在dropMutexGuard时执行unlock。

lock poisoning: 当持有lock的线程panic了,mutex被标记为poisoned。

 原子Atomics 是Cell的并发版本, copy value,不允许借用。

 线程安全 Send and Sync

Send: Arc<i32> , !Send: Rc<i32>

!Sync: Cell<i32> 内部可变性的类型不能sync。(Cell<i32> is Send)

*const T, *mut T 不是send和sync

  thread park, std::sync::Condvar

原子操作atomics 🔗

Atomic Load, Store

Fetch and Modify

AtomicI32::new(100).fetch_add(23, Relaxed)

Compare and Exchange

memory ordering 🔗

Reordering and Optimizations

Memory Model

Happens-Before

Relaxed Ordering

Release and Acquire Ordering

Consume Ordering

Sequentially Consistent Ordering

Fences: std::sync::atomic::fence

build own spin lock, channels, Arc, 🔗

todo

processor 🔗

os primitives 🔗

build own locks 🔗

关于rust的异步编程 🔗

Async-await on stable Rust

async, .await, Future 🔗

async/.await是了为了更简单实现Future的语法糖
async -> 生成实现了Future trait的结构体, await -> 触发Future的执行

async fn foo1() -> usize{ 0 } 
// foo1 == foo2
fn foo2() -> impl std::future::Future<Output = usize> {
    async{0}
}

async语法糖把代码块或者函数转换为 实现了trait Future的enum结构的状态机

use futures::executor::block_on;
use std::time;
async fn do_something() {
    std::thread::sleep(time::Duration::from_secs(2));
    println!("do something... so print hello, world!");
}
fn main() {
    println!("starting main");
    let future = do_something();
    block_on(future);
    println!("ending main");
}

在async函数内,使用.await等待依赖的future的结束。

use std::thread;
use std::time;
use futures::executor::block_on;

#[derive(Debug)]
struct Song(String);
async fn learn_song(song_name: String) -> Song {
    Song(song_name)
}
async fn sing_song(song: Song) {
    thread::sleep(time::Duration::from_secs(3));
    println!("sing_song: {:?}", song);
}
async fn learn_and_sing(song_name: String) {
    let song = learn_song(song_name).await;
    sing_song(song).await;
}

async fn dance() {
    println!("dancing");
}

async fn async_main() {
    let f1 = learn_and_sing("茉莉花".to_string());
    let f2 = dance();

    futures::join!(f2, f1);
}
fn main() {
    block_on(async_main());
}

Future的结构

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

use futures::executor::block_on;

executor -> future的调度器。 常见的 executor 有:

  • futures 库自带的很简单的 executor,上面的代码就使用了它的 block_on 函数;
  • tokio 提供的 executor,当使用 #[tokio::main] 时,就隐含引入了 tokio 的 executor;

为什么需要Pin 🔗

生成的实现了Future的结构体不包含引用自身数据的引用

let fut_one = /* ... */;
let fut_two = /* ... */;
async move {
    fut_one.await;
    fut_two.await;
}

struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}
impl Future for AsyncFuture {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

生成的结构体包含了引用自身的数据的引用,如果AsyncFuture移动了, x也移动了地址,但是read_info_buf_fut指向的还是老x的引用,有问题。

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}
struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

需要增加Pin<&mut T>, Pin<&T>, Pin<Box>语义,保证T不能被移动。

generators 生成实现了Future类型的结构体 🔗

tasks, executors, reactors 🔗

futures-rs 🔗

tokio-rs 🔗


Send, Sync 🔗

Send Sync的引入是由于std::thread::spawn()调用时的类型约束。 谈到时必须牵涉到线程。
Send 表示跨线程 move(ownership), Sync 表示跨线程 share data(borrow)。

Send 🔗

Send是在规定一个type产生出来的值可以被move到其他线程中,跨线程的move。 那么这种type的值需要拥有所有权。

几乎所有的 Rust 类型都是Send 的, 例如Arc
但是也有例外: 实现了!Send的类型有:
Rc:这是不能 Send 的。克隆了 Rc 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。
MutexGuar是不能send的 引用不是Send的
所有的裸指针不是Send的, 也不是Sync
还有一些

细节看死灵书

// right, but Cell, RefCell are `Send`.  because Cell and RefCell 拥有所有权。
fn main() {
    let a_cell = Cell::new(5);
    
}

// error, but cell

Sync 🔗

A type is Sync if it is safe to share between threads
Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用
(T is Sync if and only if &T is Send).

UnsafeCell isn't Sync (and therefore Cell and RefCell aren't).
🔗

Cell, RefCell, UnsafeCell is not sync but is Send(因为拥有了所有权,可以move到thread中)。 so can move it into thread but not share between threads.
RefCell 和 Cell 系列类型不是 Sync 的。 RefCell 在运行时所进行的借用检查也不是线程安全的。

fn spawn<F, T>(f: F) -> JoinHandle<T>
    where F: FnOnce() -> T,
          F: Send + 'static,
          T: Send + 'static,

1 需要修改这个值, 2 需要把值赋予给多个所有者 。 Rc<RefCell<T>>。 多线程下Arc<Mutex<T>>