高性能Goroutine Pool
go调度器没有限制对goroutine的数量,在goroutine瞬时大规模爆发的场景下来不及复用goroutine从而导致大量goroutine被创建,会导致大量的系统资源占用,尝试池化。
go调度器本身不应该对goroutine数量有限制,因为语言层面无法界定需要限制多少,毕竟程序跑在不同性能的环境,在并发规模不太大的场景做限制甚至会降低性能,原生支持限制goroutine数量无疑是得不偿失的。如果只是中等规模和比较小规模的并发场景其实pool的性能并没有优势
目前设计上还需要加上周期性对空闲队列的prune,等写完再加看看benchmark会提升多少。目前来说对大规模goroutine异步并发的场景(1M, 10M)内存优化(10倍往上)和吞吐量优化效果(2-6倍)非常好。
需求场景与目标
- 限制并发goroutine的数量
- 复用goroutine,减轻runtime调度压力,提升程序性能
- 规避过多的goroutine创建侵占系统资源,cpu&内存
关键技术
- 锁同步: golang有CAS机制,用spin-lock替代mutex 原理, 讨论
- LIFO/FIFO队列: LIFO队列能直接有时间排序功能,方便对需要关联入队时间的操作进行处理
- Pool容量限制和弹性伸缩
代码实现
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
| package go_pool
import (
"errors"
"sync"
"sync/atomic"
"time"
)
const(
OPEN = iota
CLOSED
)
var (
ErrPoolClosed = errors.New("this pool has been closed")
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
ErrInvalidExpiryTime = errors.New("invalid expiration time")
ErrInvalidPoolCapacity = errors.New("invalid pool capacity")
DefaultScanInterval = time.Second
)
type Pool struct {
capacity int32
running int32
lock sync.Locker
scanDuration time.Duration
blockingTasksNum int
maxBlockingTasks int
state int32
cond *sync.Cond
workers WorkerQueue // LIFO queue
workerCache sync.Pool
}
func (p *Pool) Submit(task func()) error{
if atomic.LoadInt32(&p.state) == CLOSED{
return ErrPoolClosed
}
// retrieve worker to do the task
// return error if no workers available
var w *Worker
if w = p.retrieveWorker(); w == nil{
return ErrPoolOverload
}
w.task <- task
return nil
}
func (p *Pool) Shutdown() {
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
// reset worker queue
p.workers.reset()
p.lock.Unlock()
}
func (p *Pool) isClosed() bool{
return atomic.LoadInt32(&p.state) == CLOSED
}
// change the capacity of the pool
func (p *Pool) Resize(size int){
if p.Cap() == size{
return
}
atomic.StoreInt32(&p.capacity, int32(size))
// need to stop certain workers if #running_workers > #new_capacity
diff := p.Running() - size
if diff > 0{
for i := 0; i< diff; i++{
p.retrieveWorker().task <- nil
}
}
}
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPEN){
// initialize the purging go routine
go p.scavengerRoutine()
}
}
func (p *Pool) Running() int{
return int(atomic.LoadInt32(&p.running))
}
func (p *Pool) Cap() int{
return int(atomic.LoadInt32(&p.capacity))
}
func (p *Pool) Free() int{
return p.Cap() - p.Running()
}
func (p *Pool) incRunning(){
atomic.AddInt32(&p.running, 1)
}
func (p *Pool) decRunning(){
atomic.AddInt32(&p.running, -1)
}
// put the worker back into the pool for recycling
func (p *Pool) recycleWorker(worker *Worker) bool{
capacity := p.Cap()
if p.isClosed() || (capacity >= 0 && p.Running() > capacity){
return false
}
worker.recycleTime = time.Now()
p.lock.Lock()
// need to double check if state is CLOSED
if p.isClosed(){
p.lock.Unlock()
return false
}
err := p.workers.add(worker)
if err != nil{
p.lock.Unlock()
return false
}
// notify any request stuck in retrieveWorker that there is an available worker in pool
p.cond.Signal()
p.lock.Unlock()
return true
}
func (p *Pool) spawnWorker() *Worker{
worker := p.workerCache.Get().(*Worker)
worker.Run()
return worker
}
func (p *Pool) retrieveWorker() (worker *Worker){
p.lock.Lock()
worker = p.workers.detach()
// get worker from queue successfully
if worker != nil{
p.lock.Unlock()
}else if capacity := p.Cap();capacity == -1{
p.lock.Unlock()
// spawn worker
return p.spawnWorker()
}else if p.Running() < capacity{
// infinite pool
p.lock.Unlock()
// spawn worker
return p.spawnWorker()
}else{
// if the number of blocking tasks reaches the maximum blocking tasks threshold then returns nil
// and throw the ErrPoolOverload error in Submit method
if p.maxBlockingTasks != 0 && p.maxBlockingTasks <= p.blockingTasksNum{
p.lock.Unlock()
return
}
// the pool is full need to wait until worker is available for task handling
Retry:
// handle the number of blocking task handling requests
// wait until condition being notified
p.blockingTasksNum++
p.cond.Wait()
p.blockingTasksNum--
// ensure there is a worker available because you don't know if the recycled worker being closed then
if p.Running() == 0{
p.lock.Unlock()
// spawn worker
return p.spawnWorker()
}
worker = p.workers.detach()
if worker == nil{
goto Retry
}
p.lock.Unlock()
}
return
}
func (p *Pool) scavengerRoutine(){
heartbeat := time.NewTicker(p.scanDuration)
defer heartbeat.Stop()
for range heartbeat.C{
if p.isClosed(){
break
}
// all workers get cleaned up and some invokers still get stuck on cond.Wait()
// we need to wake up all invokers in that situation.
if p.Running() == 0{
p.cond.Broadcast()
}
}
}
func NewPool(capacity int)(*Pool, error){
if capacity <= 0{
capacity = -1
}
pool := &Pool{
capacity: int32(capacity),
lock: NewSpinLock(),
}
pool.workerCache.New = func() interface{}{
return &Worker{
pool: pool,
task: make(chan func(), 1),
}
}
pool.scanDuration = DefaultScanInterval
// initialize the worker queue
if capacity == -1{
return nil, ErrInvalidPoolCapacity
}
pool.workers = NewWorkerQueue(0)
pool.cond = sync.NewCond(pool.lock)
// initialize the purging goroutine
go pool.scavengerRoutine()
return pool, nil
}
|
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| package go_pool
import (
"time"
)
type Worker struct{
pool *Pool
task chan func()
recycleTime time.Time
}
func (w *Worker) Run(){
w.pool.incRunning()
go func(){
defer func(){
w.pool.decRunning()
w.pool.workerCache.Put(w)
// todo: panic recovery strategy
}()
for f := range w.task{
// receiving nil indicates that the worker should stop and quit go routine
if f == nil{
return
}
f()
// recycle worker back into the pool, if not success quit go routine
if success := w.pool.recycleWorker(w); !success{
return
}
}
}()
}
|
worker_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| package go_pool
type WorkerQueue interface {
len() int
isEmpty() bool
add(worker *Worker) error
detach() *Worker
reset()
}
func NewWorkerQueue(size int) WorkerQueue{
return NewSimpleWorkerQueue(size)
}
func NewSimpleWorkerQueue(size int) *simpleWorkerQueue{
return &simpleWorkerQueue{
size: size,
workers: make([]*Worker, 0, size),
}
}
type simpleWorkerQueue struct{
workers []*Worker
size int
}
func(sq *simpleWorkerQueue) len() int{
return len(sq.workers)
}
func(sq *simpleWorkerQueue) isEmpty() bool{
return sq.len() == 0
}
func (sq *simpleWorkerQueue) add(worker *Worker) error{
sq.workers = append(sq.workers, worker)
return nil
}
func (sq *simpleWorkerQueue) detach() *Worker{
length := sq.len()
if length == 0{
return nil
}
worker := sq.workers[length - 1]
sq.workers[length - 1] = nil // slice operation should avoid memory leak
sq.workers = sq.workers[:length-1]
return worker
}
func (sq *simpleWorkerQueue) reset(){
for i := 0;i < sq.len(); i++{
sq.workers[i].task <- nil
sq.workers[i] = nil
}
sq.workers = sq.workers[:0]
}
|
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package go_pool
import (
"runtime"
"sync"
"sync/atomic"
)
type spinLock uint32
func (sl *spinLock) Lock() {
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
runtime.Gosched()
}
}
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
|
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
| package go_pool
import (
"math"
"runtime"
"sync"
"testing"
"time"
)
const(
_ = 1 << (10 * iota)
KiB //1024
MiB // 1048578
)
const (
InfinitePoolSize = math.MaxInt32
PoolSize = 10000
SleepTime = 100
OverSizeTaskNum = 10 * PoolSize
UnderSizeTaskNum = 0.2 * PoolSize
)
var currentMem uint64
func demoTaskFunc(args interface{}){
n := args.(int)
time.Sleep(time.Duration(n) * time.Millisecond)
}
func TestPoolWaitToGetWorker(t *testing.T){
var wg sync.WaitGroup
p, err := NewPool(PoolSize)
defer p.Shutdown()
if err != nil {
t.Errorf("err: %s", err.Error())
}
for i:=0; i< OverSizeTaskNum; i++{
wg.Add(1)
_ = p.Submit(func(){
demoTaskFunc(SleepTime)
wg.Done()
})
}
wg.Wait()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
currentMem = mem.TotalAlloc/KiB - currentMem
t.Logf("memory usage: %d KB", currentMem)
}
func TestPoolGetWorkerFromCache(t *testing.T){
var currentMem uint64
var wg sync.WaitGroup
p, err := NewPool(PoolSize)
defer p.Shutdown()
if err != nil {
t.Errorf("err: %s", err.Error())
}
for i:=0; i< UnderSizeTaskNum; i++{
wg.Add(1)
_ = p.Submit(func(){
demoTaskFunc(SleepTime)
wg.Done()
})
}
wg.Wait()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
currentMem = mem.TotalAlloc/KiB - currentMem
t.Logf("memory usage: %d KB", currentMem)
}
func TestNoPool(t *testing.T){
var wg sync.WaitGroup
for i:=0; i<UnderSizeTaskNum; i++{
wg.Add(1)
go func(){
defer wg.Done()
demoTaskFunc(SleepTime)
}()
}
wg.Wait()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
currentMem = mem.TotalAlloc/KiB - currentMem
t.Logf("memory usage: %d KB", currentMem)
}
func TestWithInfinitePool(t *testing.T){
var wg sync.WaitGroup
p, err := NewPool(InfinitePoolSize)
defer p.Shutdown()
if err != nil {
t.Errorf("err: %s", err.Error())
}
for i:=0; i< UnderSizeTaskNum; i++{
wg.Add(1)
_ = p.Submit(func(){
demoTaskFunc(SleepTime)
wg.Done()
})
}
wg.Wait()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
currentMem = mem.TotalAlloc/KiB - currentMem
t.Logf("memory usage: %d KB", currentMem)
}
|
pool_benchmark_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| package go_pool
import (
"testing"
"time"
)
const (
RunTimes = 5000000
BenchParam = 10
BenchPoolSize = 200000
)
func demoFunc() {
time.Sleep(time.Duration(BenchParam) * time.Millisecond)
}
func BenchmarkPoolThroughput(b *testing.B) {
p, _ := NewPool(BenchPoolSize)
defer p.Shutdown()
b.StartTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Submit(demoFunc)
}
}
b.StopTimer()
}
func BenchmarkGoroutinesThroughput(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
go demoFunc()
}
}
}
|