go同步编码方式

· 3301 words · 7 minute read

go同步方式 🔗

  golang提供了比较便捷的协程并发编程方式。golang的并发单元是goroutine。
协程与线程: 线程是os创建和调度。协程是golang的runtime创建和调度。
  协程切换与线程切换的区别:

  1. 切换需要切换2个地方:栈和寄存器。协程切换是用户态的普通函数调用。calling convention约定了只需要保存少量寄存器,但线程切换需要保存更多的寄存器到栈上。
  2. 线程切换需要系统调用到系统态进行。

  实现多个goroutine同步的主要方式有: sync包, runtime包下的channel,context包。

sync源码包的结构 🔗

  sync包:sync.Mutex,sync.RWMutex,sync.Cond,sync.WaitGroup,sync.Once,sync.Pool 涉及的内容很多(cache line padding,no copy,lock-free,victim cache,gc 细节)

sync包中的sync.WaitGroup 🔗

  标准库sync里的Waitgroup,用来阻塞主协程去等待所有协程执行完。WaitGroup主要三个方法:Add方法添加等待的协程数量,Done方法等于Add(-1)减少等待的协程数量,Wait方法阻塞主协程。原理示意图如下:

image

WaitGroup使用 🔗

  WaitGroup在工作中经常会使用到。例如:需要并行处理一些子任务,需要起多个协程对一些复杂结构体的各个字段进行初始化等。简单的使用说明如下:

func fun() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done() 
        //process
    }()
    wg.Wait()
}

sync.Mutex 🔗

锁的实现 🔗

  cas是基于硬件lock and swap 实现,早期386会lock总线,实现swap时无其他竞争,后期lock总线细粒度到cpu的cache里,通过MESI一致性保证swap时的正确性。

type Mutex struct {
    // 1<<(32-3)个Goroutine waiters,  最低三位分别表示mutexLocked、mutexWoken、mutexStarving
    state int32
    sema  uint32
}

  加锁: fast path, slow path

  • sync.Mutex
    • 实现了公平锁,即排队等待锁
    • 2种模式,正常模式 和 饥饿模式
    • 第一次cas失败后,进入lockslow方法(为了主方法Lock()能inline进去上层代码里),spin几次失败后,进去队列等待被唤醒
func (m *Mutex) Lock() { 
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    } 
    m.lockSlow()
}
//CAS
for {
	复制旧数据
	基于旧数据构造新数据
	if CompareAndSwap(内存地址旧数据新数据) {
		break
	}
}
  • futex
  • spin vs sleep
    • cas spin几次,没拿到锁则进入futex syscall后sleep。
    • sync.Mutex的实现方式, cas spin几次,没拿到锁则进入enqueue waitqueue后sleep。

sync Map 🔗

如何实现一个高并发的map,insert和get是O1。 get时key存在即返回,不存在阻塞等待insert后再返回,or timeout后返回error。

type ConcurrentMap struct{
    sync.Mutex// 读写锁?
    mp map[int]int
    keyToChan map[int]chan struct{}
}
func NewConcurrentMap() *ConcurrentMap {
    return &ConcurrentMap{
        mp: make(map[int]int),
        keyToChan: make(map[int]chan struct{}),
    }
}

func (m *ConcurrentMap) Put(k, v int){
    m.lock()
    defer m.unlock()
    m.mp[k] = v
    ch, ok := m.keyToChan[k]
    if !ok {
        return 
    }
    // ch <- struct{}{}
    select {
    case <- ch:
        // has closed
        return
    default:
        // only close once
        close(ch)
    }
}
func (m *ConcurrentMap) Get(k int, timeout time.Duration) (int, error) {
    m.lock()
    v, ok := m.mp[k]
    if ok {
        m.unlock()
        return v, nil
    }
    ch, ok := m.keyToChan[k]
    if !ok {
        m.keyToChan[k] = make(chan struct{})
    }
    tctx, cancel := context.WithTimeout(context.Background(),  timeout)
    defer cancel()
    m.unlock()
    select {
    case <- m.keyToChan[k]:
        m.lock()
        v  = m.mp[k]
        m.unlock()
        return v, nil
    case <- tctx.Done():
        return -1, tctx.Err()
    }
    
}

channel 🔗

  通道是golang中协程通信的一种方式,支持协程之间的通信。
  通道与协程之间的交互流程图因是否存在缓冲区、缓冲区是否填满、是否存在发送等待队列等,会在过程上会存在不同。
  下面给出的示意图是针对利用无缓冲通道进行通信的其中一种情况。主要是为了简化流程和突出主要功能点。图中hchan是通道的底层数据结构,其中主要包含: buf为数据存储区, recvq和sendq为在此通道上等待接受和发送的协程队列,lock为保护通道数据的锁等。P为运行时的协程调度器。示意图如下:

image

channel 使用说明 🔗

通过无缓冲的通道,阻塞一个协程,等待另一个协程唤醒。例如:main协程在通道的接受队列中等待, 等worker协程写入数据后,main协程才会唤醒并继续执行。通信的数据是直接写一个协程的stack。这样可以省去去操作通道的buf,减少内存拷贝。简单的使用说明如下:

func worker(done chan bool) {
    //process
    done <- true
}
func main(){
    done := make(chan bool)
    go worker(done)
    <- done
}

channel读写超时的处理。Go 语言的 channel 本身是不支持 timeout 的,一般实现 channel 的读写超时采用 select + time的方式。实例说明如下:

func main() {
    var ch chan int
    go func(){
        //process   
        ch<-1
    }()
    select{
    case <-time.After(time.Second):
        return
    case <-ch 
        //process
    }
}

channel 错误示例 🔗

// 死锁
func main() {
    ch := make(chan int)
    <- ch //block
    go func() {
        ch <- 1
    }
}

// 死锁
func main() {
    ch := make(chan int)
    ch <- 1 // block
    go func() {
        <-ch
    }
}

// 死锁
func main() {
        var ch chan int  //nil channel

        go func(c chan int) {
                for v := range c { // <-ch  channel read
                        fmt.Println(v)
                }
        }(ch)

        ch <- 1  // channel write
}
// goroutine泄漏
func main() {
    ch := make(chan int)
    go func() {
        // process more than 100ms
        ch <- 1
    }
    select {
    case <-ch:
        // process done
        return    
    case <- time.After(100*time.Millisecond) 
        // process timeout
        return
    }
}

错误操作Panic 🔗

// send on close channel 往已关闭的 channel 写数据会 panic
// closing on close channel  写入端无法知道 channel 是否已经关闭
// send or read on nil channel channel初始化使用错误

1. 不要尝试在读取端关闭 channel 
2. 一个写入端,在这个写入端可以放心关闭 channel
3. 多个写入端时,不要在写入端关闭 channel 

context 🔗

image


名称 类型 是否可导出 解释
Context interface yes 声明四个方法的签名
canceler interface no 声明两个方法的签名
CancelFunc function yes 取消的函数签名
Background, TODO function yes 返回emptyCtx
emptyCtx structure no 实现了Context接口
cancelCtx structure no 被取消
valueCtx structure no store kv
timerCtx structure no 可被取消,被超时取消
WithXXX function yes 创建Context
propagateCancel function no 传递cancel的关系
parentCancelCtx function no 找到最先出现的cancel context

context是在context包里定义的一种接口。在context包中可以通过WithCancel、WithDeadline、WithTimeout、WithValue这四个方法生成新 Context。WithValue方法生成的context可以做到了跨API的传值功能。WithTimeout方法是对WithDeadline的封装。WithDeadline方法生成的context实现了定时cancel的功能。

下面以WithCancel为入口,以源码(go1.12.7)为内容作介绍。当 parent 的 Done() 关闭的时候,孩子 ctx 的 Done() 也会关闭, 这种情况是如何实现的?具体过程分两种情况: 1 当传入的context是库里支持的cancelCtx、timeCtx时,child加入parent的衍生map中; 2 当传入的context为自定义实现类时,标准库是新起goroutine监听信号并执行cancel子context的过程。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}
// context包 主要的实现是关于cancelCtx的
type cancelCtx struct{
    // embedding Context interface
    Context  // cancelCtx 重写了Value(), Done(), Err()。
    mu sync.Mutex 
    done chan struct{} // 没有close时select时 case <-done会阻塞 close后 <- done 返回struct{}
    children map[canceler]struct{} //树形结构
    err error // 在done被关闭时,返回error, 主动取消Canceled or DeadlineExceeded
}
// child 挂到 parent的children map中。
func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
         //parent不会cancel,则无需挂靠
        return
    }
    if p, ok := parentCancelCtx(parent); ok { 
        // 找到最近可挂靠的父节点
        p.mu.Lock()
        if p.err != nil {
            // 再次判断父节点被cancel
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            // 挂靠到p的children map
            p.children[child] = struct{}{} 
        }
        p.mu.Unlock()
    } else { 
        // 没有向上找到可挂靠的父节点
        // 无法通过挂靠来取消child context,所以只能新起goroutine完成parent取消时,child也取消。
        go func() {
            select {
            case <-parent.Done(): // parent关闭时,关闭child
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}
// parentCancelCtx 
func parentCancelCtx(parent context)(*cancelCtx, bool){

}

cancel方法的逻辑是关闭cancelCtx的Done通道,并递归地关闭context的所有子context。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil { // 外层传errors.New("context canceled") 这里不会进入
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock() //保护
    if c.err != nil { // 对已经cancel的context进行cancel 则返回
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err  //context的err为 context cancel error
    if c.done == nil { 
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children { //遍历子context对其执行cancel操作
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err) 
    }
    c.children = nil 
    c.mu.Unlock()
    if removeFromParent {
        removeChild(c.Context, c)
    }
}

context使用说明 🔗

etcd中有一些select , context, channel 的例子

参考 🔗

[1] Go Concurrency Patterns: Context
[2] Go Concurrency Patterns: Pipelines and cancellation
[3] Standard library context
[4] Using Context in Golang - Cancellation, Timeouts and Values (With Examples)
[5] Go by Example: Context