https://segmentfault.com/a/
fasthttp中的协程池实现
协程池可以控制并行度,复用协程。fasthttp 比 net/http 效率高很多倍的重要原因,就是利用了协程池。实现并不复杂,我们可以参考他的设计,写出高性能的应用。
入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| func (s *Server) Serve(ln net.Listener) error { var lastOverflowErrorTime time.Time var lastPerIPErrorTime time.Time var c net.Conn var err error maxWorkersCount := s.getConcurrency() s.concurrencyCh = make(chan struct{}, maxWorkersCount) wp := &workerPool{ WorkerFunc: s.serveConn, MaxWorkersCount: maxWorkersCount, LogAllErrors: s.LogAllErrors, Logger: s.logger(), } wp.Start() for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { return nil } return err } if !wp.Serve(c) { s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served because Server.Concurrency limit exceeded") c.Close() if time.Since(lastOverflowErrorTime) > time.Minute { s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+ "Try increasing Server.Concurrency", maxWorkersCount) lastOverflowErrorTime = CoarseTimeNow() } time.Sleep(100 * time.Millisecond) } c = nil } } type workerPool struct { WorkerFunc func(c net.Conn) error MaxWorkersCount int LogAllErrors bool MaxIdleWorkerDuration time.Duration Logger Logger lock sync.Mutex workersCount int mustStop bool ready []*workerChan stopCh chan struct{} workerChanPool sync.Pool }
|
goroutine status:
- main0: wp.Start()
break-00
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (wp *workerPool) Start() { if wp.stopCh != nil { panic("BUG: workerPool already started") } wp.stopCh = make(chan struct{}) stopCh := wp.stopCh go func() { var scratch []*workerChan for { wp.clean(&scratch) select { case <-stopCh: return default: time.Sleep(wp.getMaxIdleWorkerDuration()) } } }() }
|
goroutine status:
main0: wp.Start()
g1: for loop to clean idle workerChan
break-01
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() currentTime := time.Now() wp.lock.Lock() ready := wp.ready n := len(ready) i := 0 for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { i++ } *scratch = append((*scratch)[:0], ready[:i]...) if i > 0 { m := copy(ready, ready[i:]) for i = m; i < n; i++ { ready[i] = nil } wp.ready = ready[:m] } wp.lock.Unlock() tmp := *scratch for i, ch := range tmp { ch.ch <- nil tmp[i] = nil } }
|
break-02
acceptConn(s, ln, &lastPerIPErrorTime)
主要处理 ln.Accept(),判断err是否是 Temporary 的,最终返回一个 net.Conn
break-03
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true } type workerChan struct { lastUseTime time.Time ch chan net.Conn }
|
wp.getCh() 返回一个 *workerChan, 可以看到, workerChan 有一个 ch 属性,参数传入的 net.Conn 直接往里面塞。
break-04
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() if vch == nil { vch = &workerChan{ ch: make(chan net.Conn, workerChanCap), } } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch }
|
这里我们只看创建的流程。如果ready为空,说明ready被耗尽,并且小于 MaxWorkersCount,那么需要创建新的 workerChan。
创建时,先从 Pool 中取出复用,如果为nil,再创建新的。
可以预测到,这里 wp.workerFunc(ch) 必定包含一个 for 循环,处理 workerChan 中的 net.Conn。
goroutine status:
main0: wp.Start()
g1: for loop to clean idle workerChan
g2: wp.workerFunc(ch) blocks for handling connection
break-05
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error for c = range ch.ch { if c == nil { break } if err = wp.WorkerFunc(c); err != nil && err != errHijacked { errStr := err.Error() if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "reset by peer") || strings.Contains(errStr, "i/o timeout")) { wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } } if err != errHijacked { c.Close() } c = nil if !wp.release(ch) { break } } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock() }
|
for range 不断从 chan net.Conn 中获取连接。大家是否还记得 在 func (wp *workerPool) Serve(c net.Conn) bool
函数中,一个重要操作就是把 accept 到的connection,放入 channel.
最后,需要把当前的 workerChan 释放回 workerPool 的 ready 中。
break-06
1 2 3 4 5 6 7 8 9 10 11
| func (wp *workerPool) release(ch *workerChan) bool { ch.lastUseTime = CoarseTimeNow() wp.lock.Lock() if wp.mustStop { wp.lock.Unlock() return false } wp.ready = append(wp.ready, ch) wp.lock.Unlock() return true }
|
释放操作中,注意到 修改了 ch.lastUseTime , 还记得 clean 操作吗?在 g1 协程中运行着呢。
所以最后的运行状态是:
goroutine status:
main0: wp.Start()
g1: for loop to clean idle workerChan
g2: wp.workerFunc(ch) blocks for handling connection
g3: ….
g4: ….
按需增长 goroutine 数量,但是也有一个最大值, 所以并行度是可控的。当请求密集时,一个 worker goroutine 可能会串行处理多个 connection。
wokerChan 在 Pool 中被复用,对GC的压力会减小很多。
而对比原生的 net/http 包,并行度不可控(可能不确定,runtime 会有控制? ),goroutine 不可被复用,体现在一个请求一个goroutine, 用完就销毁了,对机器压力更大。