tRPC-Go中的tRPC.Go()方法使用了ants协程池,做个简单剖析
panjf2000/ants协程池
在tRPC.Go方法(异步启动goroutine)中看到里面使用了ants协程池去实现(具体位置:g.pool.Invoke(p)
)
前置知识:
我们想异步完成一个任务,首先创建一个任务,然后需要从协程池(PoolWithFunc)中获取worker(goWorkerWithFunc),假设目前队列为空,这时一个worker和一个goroutine会一起创建出来,可以认为他俩就是绑一起的,然后处理完这个任务后,处于当前goroutine中的worker会放入全局的队列中,等待被其他协程去获取这个worker。
于是深入探索了下pool.Invoke等方法:
// github.com/panjf2000/ants/v2@v2.4.6/pool_func.go:162
// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {// 这个func的caller:p就是一个协程池对象
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorkerWithFunc
if w = p.retrieveWorker(); w == nil {// <----
return ErrPoolOverload
}
w.args <- args // 这里展现了channel的异步通信能力:告知,具体可往下看**callback**
return nil
}
首先执行p.retrieveWorker()获取一个worker,这个方法里面会根据当前worker队列的数量去做不同的逻辑:
- 若队列为空,表示无可用的worker,则需要新建(其中包含使用原生go去启动新协程)
// 会执行下面这个函数
spawnWorker := func() {
w = p.workerCache.Get().(*goWorkerWithFunc)// workerCache是个sync.pool,从中获取goWorkerWithFunc对象
w.run()// 启动一个 Goroutine 来重复执行传入的函数
}
// worker 的结构如下:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and performs function calls.
type goWorkerWithFunc struct {
// pool who owns this worker.
pool *PoolWithFunc // 该 worker 所在协程池对象的指针
// args is a job should be done.
args chan interface{} // 待执行的任务
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
}
为什么说这个Goroutine可以重复执行传入的函数?
答案:使用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止。
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorkerWithFunc) run() {
w.pool.incRunning()
go func() {
defer func() {
// ...
}()
for args := range w.args {// w.args类型是chan any,用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止
if args == nil {
return
}
w.pool.poolFunc(args)// args对象就是每个传入的任务,poolFunc是这个协程池对象的执行任务的接口func
if ok := w.pool.revertWorker(w); !ok {// 将该worker放入队列中,供其他人后续获取这个worker并利用当前这个协程去执行任务
return
}
}
}()
}
所以说:新开的这个协程会一直处于for循环中不断等待并执行新的任务。
callback:g.pool.Invoke(p)
方法中的w.args <- args
就是用来通知某个任务协程中的for args := range w.args
去执行新的任务。
这里的p和args对象就是一个任务,结构如下:
p := &goerParam{
ctx: newCtx,// context.Context
cancel: cancel,// context.CancelFunc
handler: handler,// func(context.Context) 调用方传入的闭包函数
}
sync.pool
上面可以看到 p.workerCache.Get().(*goWorkerWithFunc)
中,任务p包含了一个workerCache属性,它是sync.pool类型,一个并发安全的对象池。说明worker都保存在一个对象池中,目的是减少内存分配和垃圾回收的开销。
有待深入…