go编程时,goroutine是非常有用的特性。然而,实践中最好不要无限制的使用goroutine,例如一次性开一万个goroutine去读写文件是很危险的。为了控制goroutine的并行量,有很多框架或库实现了协程池,例如ants(很推荐)。 当小项目不想引入第三方库时,可以借助channel自己实现一个简易的协程池。
首先,创建一个package,不妨名为grpool
package grpool
import "sync"
type GoRoutinePool struct {
wg sync.WaitGroup
poolCh chan int8 // 用于控制并行数
resultCh chan interface{} // 子任务结果
}
// NewGoRoutinePool 创建一个协程池,池容量poolSize, 任务数taskCount
func NewGoRoutinePool(poolSize, taskCount int) *GoRoutinePool {
grPool := &GoRoutinePool{
wg: sync.WaitGroup{},
poolCh: make(chan int8, poolSize),
resultCh: make(chan interface{}, taskCount),
}
grPool.wg.Add(taskCount)
return grPool
}
// TaskStart 子任务开始时占用名额
func (g *GoRoutinePool) TaskStart() {
g.poolCh <- 1 // 占用一个名额
}
// TaskEnd 子任务结束时释放名额,建议在GoRoutine开头使用defer调用
func (g *GoRoutinePool) TaskEnd() {
<-g.poolCh // 释放一个名额
g.wg.Done()
}
// AddResult 收集一个任务结果
func (g *GoRoutinePool) AddResult(val interface{}) {
g.resultCh <- val
}
// Wait 阻塞等待所有子协程执行完成,并返回结果
func (g *GoRoutinePool) Wait() []interface{} {
g.wg.Wait()
close(g.resultCh)
close(g.poolCh)
results := make([]interface{}, 0, cap(g.resultCh))
for res := range g.resultCh {
results = append(results, res)
}
return results
}
使用实例:
package main
import (
"fmt"
"time"
"video-composer/common/grpool"
)
func main() {
grPool := grpool.NewGoRoutinePool(2, 7) // 创建一个协程池
for i := 0; i < 7; i++ {
go func(idx int) {
grPool.TaskStart() // 子任务开始时
defer grPool.TaskEnd() // 子任务结束时
// ================= 子任务 ======================
fmt.Println("Start ", idx)
time.Sleep(time.Second)
fmt.Println(" End ", idx)
// ==============================================
grPool.AddResult(idx) // 子任务结果
}(i)
}
results := grPool.Wait() // 等待所有子任务完成,并获取结果
for _, res := range results {
fmt.Println("result: ", res)
}
}