go同步方式 🔗
golang提供了比较便捷的协程并发编程方式。golang的并发单元是goroutine。
协程与线程:
线程是os创建和调度。协程是golang的runtime创建和调度。
协程切换与线程切换的区别:
- 切换需要切换2个地方:栈和寄存器。协程切换是用户态的普通函数调用。calling convention约定了只需要保存少量寄存器,但线程切换需要保存更多的寄存器到栈上。
- 线程切换需要系统调用到系统态进行。
实现多个goroutine同步的主要方式有: sync包, runtime包下的channel,context包。
sync源码包的结构 🔗
sync包:sync.Mutex,sync.RWMutex,sync.Cond,sync.WaitGroup,sync.Once,sync.Pool 涉及的内容很多(cache line padding,no copy,lock-free,victim cache,gc 细节)
sync包中的sync.WaitGroup 🔗
标准库sync里的Waitgroup,用来阻塞主协程去等待所有协程执行完。WaitGroup主要三个方法:Add方法添加等待的协程数量,Done方法等于Add(-1)减少等待的协程数量,Wait方法阻塞主协程。原理示意图如下:
WaitGroup使用 🔗
WaitGroup在工作中经常会使用到。例如:需要并行处理一些子任务,需要起多个协程对一些复杂结构体的各个字段进行初始化等。简单的使用说明如下:
func fun() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
//process
}()
wg.Wait()
}
sync.Mutex 🔗
锁的实现 🔗
cas是基于硬件lock and swap 实现,早期386会lock总线,实现swap时无其他竞争,后期lock总线细粒度到cpu的cache里,通过MESI一致性保证swap时的正确性。
type Mutex struct {
// 1<<(32-3)个Goroutine waiters, 最低三位分别表示mutexLocked、mutexWoken、mutexStarving
state int32
sema uint32
}
加锁: fast path, slow path
- sync.Mutex
- 实现了公平锁,即排队等待锁
- 2种模式,正常模式 和 饥饿模式
- 第一次cas失败后,进入lockslow方法(为了主方法Lock()能inline进去上层代码里),spin几次失败后,进去队列等待被唤醒
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
//CAS
for {
复制旧数据
基于旧数据构造新数据
if CompareAndSwap(内存地址,旧数据,新数据) {
break
}
}
- futex
- spin vs sleep
- cas spin几次,没拿到锁则进入futex syscall后sleep。
- sync.Mutex的实现方式, cas spin几次,没拿到锁则进入enqueue waitqueue后sleep。
sync Map 🔗
如何实现一个高并发的map,insert和get是O1。 get时key存在即返回,不存在阻塞等待insert后再返回,or timeout后返回error。
type ConcurrentMap struct{
sync.Mutex// 读写锁?
mp map[int]int
keyToChan map[int]chan struct{}
}
func NewConcurrentMap() *ConcurrentMap {
return &ConcurrentMap{
mp: make(map[int]int),
keyToChan: make(map[int]chan struct{}),
}
}
func (m *ConcurrentMap) Put(k, v int){
m.lock()
defer m.unlock()
m.mp[k] = v
ch, ok := m.keyToChan[k]
if !ok {
return
}
// ch <- struct{}{}
select {
case <- ch:
// has closed
return
default:
// only close once
close(ch)
}
}
func (m *ConcurrentMap) Get(k int, timeout time.Duration) (int, error) {
m.lock()
v, ok := m.mp[k]
if ok {
m.unlock()
return v, nil
}
ch, ok := m.keyToChan[k]
if !ok {
m.keyToChan[k] = make(chan struct{})
}
tctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
m.unlock()
select {
case <- m.keyToChan[k]:
m.lock()
v = m.mp[k]
m.unlock()
return v, nil
case <- tctx.Done():
return -1, tctx.Err()
}
}
channel 🔗
通道是golang中协程通信的一种方式,支持协程之间的通信。
通道与协程之间的交互流程图因是否存在缓冲区、缓冲区是否填满、是否存在发送等待队列等,会在过程上会存在不同。
下面给出的示意图是针对利用无缓冲通道进行通信的其中一种情况。主要是为了简化流程和突出主要功能点。图中hchan是通道的底层数据结构,其中主要包含: buf为数据存储区, recvq和sendq为在此通道上等待接受和发送的协程队列,lock为保护通道数据的锁等。P为运行时的协程调度器。示意图如下:
channel 使用说明 🔗
通过无缓冲的通道,阻塞一个协程,等待另一个协程唤醒。例如:main协程在通道的接受队列中等待, 等worker协程写入数据后,main协程才会唤醒并继续执行。通信的数据是直接写一个协程的stack。这样可以省去去操作通道的buf,减少内存拷贝。简单的使用说明如下:
func worker(done chan bool) {
//process
done <- true
}
func main(){
done := make(chan bool)
go worker(done)
<- done
}
channel读写超时的处理。Go 语言的 channel 本身是不支持 timeout 的,一般实现 channel 的读写超时采用 select + time的方式。实例说明如下:
func main() {
var ch chan int
go func(){
//process
ch<-1
}()
select{
case <-time.After(time.Second):
return
case <-ch
//process
}
}
channel 错误示例 🔗
// 死锁
func main() {
ch := make(chan int)
<- ch //block
go func() {
ch <- 1
}
}
// 死锁
func main() {
ch := make(chan int)
ch <- 1 // block
go func() {
<-ch
}
}
// 死锁
func main() {
var ch chan int //nil channel
go func(c chan int) {
for v := range c { // <-ch channel read
fmt.Println(v)
}
}(ch)
ch <- 1 // channel write
}
// goroutine泄漏
func main() {
ch := make(chan int)
go func() {
// process more than 100ms
ch <- 1
}
select {
case <-ch:
// process done
return
case <- time.After(100*time.Millisecond)
// process timeout
return
}
}
错误操作Panic 🔗
// send on close channel 往已关闭的 channel 写数据会 panic
// closing on close channel 写入端无法知道 channel 是否已经关闭
// send or read on nil channel channel初始化使用错误
1. 不要尝试在读取端关闭 channel
2. 一个写入端,在这个写入端可以放心关闭 channel
3. 多个写入端时,不要在写入端关闭 channel
context 🔗
| 名称 | 类型 | 是否可导出 | 解释 |
|---|---|---|---|
| Context | interface | yes | 声明四个方法的签名 |
| canceler | interface | no | 声明两个方法的签名 |
| CancelFunc | function | yes | 取消的函数签名 |
| Background, TODO | function | yes | 返回emptyCtx |
| emptyCtx | structure | no | 实现了Context接口 |
| cancelCtx | structure | no | 被取消 |
| valueCtx | structure | no | store kv |
| timerCtx | structure | no | 可被取消,被超时取消 |
| WithXXX | function | yes | 创建Context |
| propagateCancel | function | no | 传递cancel的关系 |
| parentCancelCtx | function | no | 找到最先出现的cancel context |
context是在context包里定义的一种接口。在context包中可以通过WithCancel、WithDeadline、WithTimeout、WithValue这四个方法生成新 Context。WithValue方法生成的context可以做到了跨API的传值功能。WithTimeout方法是对WithDeadline的封装。WithDeadline方法生成的context实现了定时cancel的功能。
下面以WithCancel为入口,以源码(go1.12.7)为内容作介绍。当 parent 的 Done() 关闭的时候,孩子 ctx 的 Done() 也会关闭, 这种情况是如何实现的?具体过程分两种情况: 1 当传入的context是库里支持的cancelCtx、timeCtx时,child加入parent的衍生map中; 2 当传入的context为自定义实现类时,标准库是新起goroutine监听信号并执行cancel子context的过程。
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// context包 主要的实现是关于cancelCtx的
type cancelCtx struct{
// embedding Context interface
Context // cancelCtx 重写了Value(), Done(), Err()。
mu sync.Mutex
done chan struct{} // 没有close时select时 case <-done会阻塞 close后 <- done 返回struct{}
children map[canceler]struct{} //树形结构
err error // 在done被关闭时,返回error, 主动取消Canceled or DeadlineExceeded
}
// child 挂到 parent的children map中。
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
//parent不会cancel,则无需挂靠
return
}
if p, ok := parentCancelCtx(parent); ok {
// 找到最近可挂靠的父节点
p.mu.Lock()
if p.err != nil {
// 再次判断父节点被cancel
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// 挂靠到p的children map
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 没有向上找到可挂靠的父节点
// 无法通过挂靠来取消child context,所以只能新起goroutine完成parent取消时,child也取消。
go func() {
select {
case <-parent.Done(): // parent关闭时,关闭child
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx
func parentCancelCtx(parent context)(*cancelCtx, bool){
}
cancel方法的逻辑是关闭cancelCtx的Done通道,并递归地关闭context的所有子context。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil { // 外层传errors.New("context canceled") 这里不会进入
panic("context: internal error: missing cancel error")
}
c.mu.Lock() //保护
if c.err != nil { // 对已经cancel的context进行cancel 则返回
c.mu.Unlock()
return // already canceled
}
c.err = err //context的err为 context cancel error
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children { //遍历子context对其执行cancel操作
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
context使用说明 🔗
etcd中有一些select , context, channel 的例子
参考 🔗
[1] Go Concurrency Patterns: Context
[2] Go Concurrency Patterns: Pipelines and cancellation
[3] Standard library context
[4] Using Context in Golang - Cancellation, Timeouts and Values (With Examples)
[5] Go by Example: Context