golang-goroutine pool源码解读

简介

https://github.com/panjf2000/ants

这是一个 goroutine pool 的包,
可以生成固定 goroutine 数量的 pool.
重复使用,提高效率

 示例

package main

import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/panjf2000/ants"
)

var sum int32

func myFunc(i interface{}) error {
n := i.(int)
atomic.AddInt32(&sum, int32(n))
fmt.Printf("run with %d\n", n)
return nil
}

func demoFunc() error {
// time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
panic("error vincent!")
return nil
}

func main() {
runTimes := 10

// use the common pool
var wg sync.WaitGroup
for i := 0; i < runTimes; i++ {
wg.Add(1)
ants.Submit(func() error {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in f", r)
wg.Done()
// return nil
}
}()
demoFunc()
wg.Done()
return nil
})
}
wg.Wait()

// ants.Release()
time.Sleep(10 * time.Millisecond)
fmt.Println(ants.Running())
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")

// use the pool with a function
// set 10 the size of goroutine pool
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error {
myFunc(i)
wg.Done()
return nil
})
// submit tasks
for i := 0; i < runTimes; i++ {
wg.Add(1)
p.Serve(i)
fmt.Println(p.Running())
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}

源码

从示例看使用场景分为两种情况:

  • 默认 pool 或者固定大小的 pool 运行 func
  •  带 func 的 pool  可以传入参数

新建 pool

  • 默认 pool 可以直接使用
  • 创建固定大小的 pool
func NewPool(size int) (*Pool, error)
  • 创建携带 func 的 pool
func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error)

注入要执行的 func

func (p *Pool) Submit(task f) error {
// 如果pool已经释放
if len(p.release) > 0 {
return ErrPoolClosed
}
// 获取worker
w := p.getWorker()
// 发送任务
w.sendTask(task)
return nil
}

获取可用 worker

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
waiting := false
// 锁
p.lock.Lock()
// 获取可用worker的结合
workers := p.workers
n := len(workers) - 1
if n < 0 {
// 如果没有可用的
if p.running >= p.capacity {
// 达到最大容量, 等待
waiting = true
} else {
// 没有达到最大容量 -- 后续会生成
p.running++
}
} else {
// 收到可用的发送信号
<-p.freeSignal
取出来最后一个
w = workers[n]
workers[n] = nil
// 更新集合
p.workers = workers[:n]
}
// 更新结束解锁
p.lock.Unlock()

if waiting {
// 如果有人在等,并且有的goroutine可用了
<-p.freeSignal
p.lock.Lock()
// 取出一个worker并更新集合
workers = p.workers
l := len(workers) - 1
w = workers[l]
workers[l] = nil
p.workers = workers[:l]
// 解锁
p.lock.Unlock()
} else if w == nil {
// 如果没人等,并且没有可用的集合并且没有超过容量
// 新建一个
w = &Worker{
pool: p,
task: make(chan f),
}
w.run()
}
return w
}

启动任务

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
//atomic.AddInt32(&w.pool.running, 1)
go func() {
// 新建的goroutine 接到任务就会执行
for f := range w.task {
if f == nil {
atomic.AddInt32(&w.pool.running, -1)
return
}
f()
// 归还workers
w.pool.putWorker(w)
// fmt.Println(w.pool.Running())
}
}()
}

释放 worker

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
p.lock.Lock()
// 放入可用集合
p.workers = append(p.workers, worker)
p.lock.Unlock()
// 发送有可用的worker的信号
p.freeSignal <- sig{}
}

核心的代码看懂这些就足够了.

有一些小知识点补充一下:

 锁和解锁
中间成为了同步操作,针对并发的时候,对任何东西的修改,添加都应该加锁,让操作成为原子操作

  • sync.Mutex.Lock()
  • sync.Mutex.Unlock()
"sync" | "sync/atomic"

这两个包里直接可以调用的方法,可以不用锁的方式,就可以保证是同步的原子操作