Go语言Context模块源码解析

Posted on Sun 30 September 2018 in 源码解析,Go

Go的标准库中的Context在日常的工作编码中的使用率很高。这个库是解决Goroutine之间裙带关系的利器,是channel之外的另一个较好的选择。这篇文章将通过源码来了解其实现。 本篇文章是基于1.10.3版本的Go源码, 源码位于src/context

context库有两个interface, Context和canceler:

type Context interface {
  Deadline()(deadline time.Time, ok bool)
  Done() <-chan struct{}
  Err() error
  Value(key interface{})interface{}
}

type canceler interface {
  cancel(removeFromParent bool, err error)
  Done() <-chan struct{}
}

并且为了方便使用,context库提供了如下几个方法来获取Context实例:

func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

Context接口的方法集决定了Context实例的行为,canceler是个Unexposed接口,扩展了Context接口的行为,可以看看cancelCtx和timerCtx的源码:

type cancelCtx struct {
    Context
    mu sync.Mutex
    done chan struct{}
    children map[canceler]struct{}
    err error
}

type timeCtx struct {
    cancelCtx
    timer *time.Timer
    deadline time.Timer
}

cancelCtx内嵌了Context接口, 关于interface Embedding,具体可以看effective_go, 看看WithCancel的实现:

func newCancelCtx(parent Context) cancelCtx {
  return cancelCtx{Context: parent}
}

func WithCancel(parent Context)(ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func(){c.cancel(true, Canceled)}
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
  if parent.Done() == nil {
    return // parent is never canceled
  }
  if p, ok := parentCancelCtx(parent); ok {
    p.mu.Lock()
    if p.err != nil {
      // parent has already been canceled 这里需要对传入的可cancel的child做cancel调用处理,但是不会吧child从parent中剔除
      child.cancel(false, p.err)
    } else {
      if p.children == nil {
        p.children = make(map[canceler]struct{})
      }
      p.children[child] = struct{}{}
    }
    p.mu.Unlock()
  } else {
    //这里的情况是可能是用户自己定义的一个context,才会走到这一步, 即当前的parent即不是cancelCtx,又不是timerCtx,又不是valueCtx
    go func() {
      select {
      case <-parent.Done():
        child.cancel(false, parent.Err())
      case <-child.Done():
      }
    }()
  }
}

propagateCancel就是把parent context和迭代出的context做关联,这个关联使用map数据结构来实现,Background()返回的*emptyCtx是最顶层的Context,然后调用 上面提供的WithXxx族函数迭代一层一层的child cotext, 类似数据结构中的树。当执行cancel的时候,当前context迭代出的child context都会被调用cancel。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  if err == nil {
    panic("context: internal error: missing cancel error")
  }
  c.mu.Lock()
  //错误不为空,说明已经cancel了
  if c.err != nil {
    c.mu.Unlock()
    return // already canceled
  }
  c.err = err
  if c.done == nil {
    c.done = closedchan
  } else {
    close(c.done)
  }
  for child := range c.children {
    // NOTE: acquiring the child's lock while holding parent's lock.
    // 根据调用栈,最底层的child context最先被执行cancel
    child.cancel(false, err)
  }
  c.children = nil
  c.mu.Unlock()

  if removeFromParent {
    removeChild(c.Context, c)
  }
}

timeCtx内嵌了cancelCtx,扩展了cancelCtx, WithDealine的实现逻辑和WithCancel的实现逻辑类似,不过就是通过time.AfterFunc主动调用成员函数cancel(当超过了deadline time)。

使用示例

服务平滑退出

运行中的服务平滑退出是一个基本的需求,强行kill掉服务进程可能会丢失数据,下面给出了一个通过监听信号的方式来让服务平滑退出。sync.WaitGroup的主要作用使所有的Goroutine都退出后,main Goroutine才退出,至此整个服务结束退出。

var c chan os.Signal

func main() {
  c = make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGUSR1, syscall.SIGUSR2)

  var (
    ctx    context.Context
    cancel context.CancelFunc
    wait   *sync.WaitGroup
    n      int
  )
  ctx, cancel = context.WithCancel(context.Background())
  wait = &sync.WaitGroup{}
  // to do init
  wait.Add(n)
  go SomeInputFunc(ctx, wait /*, args1, ...*/)
  go SomeProcessFunc(ctx, wait /*, args1, ...*/)
  go SomeOutputFunc(ctx, wait /*, args1, ...*/)
  // ...

  // wait signal
  select {
  case <-c:
    cancel()
  }
  wait.Wait()
}

Go