(转)工程师应该如何高效学习

此文转自曹春晖blog,也用于自己学习的一些参考 博客地址 阅读书籍 对于工程师来说,从书籍得来的知识是必不可少的。现在很多年轻的程序员会从网络博客来学习技术,但博客内容大多缺乏体系(主要说总结性质的博客内容),不系统。很多博主为了掩饰自己的未知,遇到不知道的关键点就一笔带过,进而导致缺失。即使原作者非常努力,内容上没有缺失,你能从中获取的也只是别人总结好的知识,没有自己的主动思考,这中间便缺少过程式的沉淀,一味地满足于背诵别人总结好的知识,最后也只不过沦为他人的复读机而已。 对于工程师来说,书籍依然是最重要的知识获取媒介。即使只是通过目录概览,也能获取某个领域的大致蓝图。 目前大部分优秀的技术书籍依然以英文为主,能够读懂英文技术书籍是工程师的硬实力。英语阅读能力怎么训练呢?如果不是为了应试,可以尝试逼迫自己去翻译一些英文文档/文章来进行专门训练。举个例子,为了学习 Go,笔者曾经和社区的小伙伴合作翻译过《The Go Programming Language》,后来为了深入学习 es,参与了社区的 《es 权威指南》 的翻译和校对工作。如果某篇技术文档你从生理上很厌恶它,但是又觉得内容不得不学习的话,那你就逼迫自己去翻译它。千字以内的文档,周末抽一个下午就可以搞定。这里已经说是逼迫了,自然可以想见这个过程并不像打游戏那么轻松愉快,但只要熬过了这个阶段,阅读技术相关的英文文档可以显著提升速度。 具备一定的英语能力之后,接下来就是从什么渠道去获取内容。如果对电子书不排斥的话,目前比较经济的选择是申请一个国内的 ACM 会员,并且用该会员去注册oreily 在线书店。一年大概 20 美元,可以及时地阅读到大部分出版社的技术出版物。因为现在出版社的网站大多还支持 early preview,所以你甚至可以在书籍还没有上市之前就预先学习内容,第一时间获取整个业界的一线情报,能够帮助你站在时代的潮头。等两年后同事拿到翻译生涩的中文版的时候,你已经可以从容地 diss 他在信息源上落后了自己整整两年。(当然,如果你有兴趣,国内的出版社一般在英文新书出版的时候会招募翻译志愿者,如果你对一个领域特别感兴趣,关注出版社的这些招募消息也可以去参与。这里要指出,不要对做翻译这件事情抱太高期望,重在提升自己的能力。想赚钱的话就算了。) 如果喜欢一边阅读一边在页边写写画画,电子书还是稍微困难点,当然这个问题你也可以用 ipad pro + apple pencil 来解决。但有些人比较喜欢实体书捧在手上的实感,国外有些书甚至是个人出版物,例如笔者之前购入的《timeless law of software development》,这些书在互联网上正盗均无,只能考虑通过海淘渠道入手。前些年淘宝的海淘服务也可以用来淘书,书籍大多比较重,运费不菲。有些老书运费可能会到书费的一半让人格外肉疼。又因为国内对出版物管制比较严格,所以为了避险,这两年这些海淘服务商基本上都不帮忙代购出版物了。对于个人用户来说,也就只剩下了直邮和转运两种选择。直邮虽然比较便宜,但是万国联盟的 EMS 一走到中国可能就上了牛车,速度慢到突破极限。基本脑子稍微正常的都会选择转运。除了速度之外,有些出版社的书籍是只在美帝本土销售的,不支持 global delivery。 走转运的话,需要办一张 visa 或者 mastercard 的多币种信用卡,这样才能在海外网站进行支付 。 上面这些问题都解决了以后,书籍的获取就不再是问题。 信息源 虽然文章开头对于传统的书籍大赞特赞,但书籍的缺点也是显而易见的。在技术领域,这个缺点就尤为明显:时效问题。如果我们所学习的是相对比较老的领域知识,那只要读书基本上就够了。 但技术的发展日新月异,如果你想要成为知晓领域内所有新技术的那个人。你还是需要关注一些书籍以外的新闻源,下面是一些例子: Github Trending Github Trending 代表的是一种风向,一般一个项目能上 trending 的话,可能是作者自己去 hacker news、reddit 做了宣传,也可能是被某个业界大佬带了流量。因为现在 Github 上的国人越来越多,很多国人学习技术比较显著的诉求其实只是面试(汗,一旦什么 xx interview/xx road to 架构师之类的仓库开了就会吸引一大批国人点星,近两年 trending 的质量有下跌趋势。 不过最近 Github Trending 增加了按照 Spoken Language 筛选功能,所以你可以过滤特定的语言,相对比之前还是好多了。 ...

February 1, 2021 · 1 min · 194 words · Me

A Million WebSocket and Go

这篇文章是我研究高负载网络服务器架构看到的的一个有趣的story,添加了我自身学习websocket的感受和记录,希望我能在飞机落地前写完:-) Preface 我们先描述一个问题作为讨论的中心:用户邮件的存储方法。 对于这种主题,有很多种方式在系统内对邮件状态进行持续的追踪,比如系统事件是一个方式,另一种方式可以通过定期的系统轮询有关状态变化。 这两种方式各有利弊,不过当我们讨论到邮件的时候,用户希望收到新邮件的速度越快越好。邮件轮询每秒约有50000个HTTP请求,其中60%返回304状态,也就是邮箱内没有任何修改。 因此,为了减少服务器的负载并加快向用户传递邮件的速度,我们决定通过编写publisher-subscriber服务器(即bus, message broker, event channel)来重新发明轮子。一方面接受有关状态变更的通知,另外一个方面接受此类通知的订阅。 改进前: +--------------+ (2) +-------------+ (1) +-----------+ | | <--------+ | | <--------+ | | | Storage | | API | HTTP | Browser | | | +--------> | | +--------> | | +--------------+ (3) +-------------+ (4) +-----------+ 改进后: +--------------+ +-------------+ WebSocket +-----------+ | Storage | | API | +----------> | Browser | +--------------+ +-------------+ (3) +-----------+ + ^ | (1) | (2) v + +-----------------------------------------+ | Bus | +-----------------------------------------+ 改进前的方案也就是browser定期去查询api并访问存储更改 ...

January 16, 2021 · 2 min · 361 words · Me

fasthttp对性能的优化压榨

最近在看网络模型和go net的源码,以及各web框架例如fasthttp, weaver, gnet(更轻量)源码。fasthttp在github上已经写上了一个go开发的best practices examples,这里我也记录一些在源码中看到的一些技巧 []byte buffer的tricks 下面的一些tricks在fasthttp中被使用,自己的代码也可以用 标准Go函数能够处理nil buffer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 var ( // both buffers are uninitialized dst []byte src []byte ) dst = append(dst, src...) // is legal if dst is nil and/or src is nil copy(dst, src) // is legal if dst is nil and/or src is nil (string(src) == "") // is true if src is nil (len(src) == 0) // is true if src is nil src = src[:0] // works like a charm with nil src // this for loop doesn't panic if src is nil for i, ch := range src { doSomething(i, ch) } 所以可以去掉一些对[]bytebuffer的nil校验: ...

January 10, 2021 · 6 min · 1136 words · Me

[源码分析]sync pool

- 当多个goroutine都需要创建同一个对象,如果gorountine数过多,导致对象的创建数目剧增,进而导致GC压力增大,形成“并发大-占用内存大-GC缓慢-并发处理能力弱-并发更大”这样的恶性循环 - 在这个时候,需要一个对象池,每个goroutine不再自己单独创建对象,而是从对象池中取出一个对象(如果池中已有)

January 1, 2021 · 1 min · 4 words · Me

[自建轮]高性能Goroutine Pool

高性能Goroutine Pool go调度器没有限制对goroutine的数量,在goroutine瞬时大规模爆发的场景下来不及复用goroutine从而导致大量goroutine被创建,会导致大量的系统资源占用,尝试池化。 go调度器本身不应该对goroutine数量有限制,因为语言层面无法界定需要限制多少,毕竟程序跑在不同性能的环境,在并发规模不太大的场景做限制甚至会降低性能,原生支持限制goroutine数量无疑是得不偿失的。如果只是中等规模和比较小规模的并发场景其实pool的性能并没有优势 目前设计上还需要加上周期性对空闲队列的prune,等写完再加看看benchmark会提升多少。目前来说对大规模goroutine异步并发的场景(1M, 10M)内存优化(10倍往上)和吞吐量优化效果(2-6倍)非常好。 需求场景与目标 限制并发goroutine的数量 复用goroutine,减轻runtime调度压力,提升程序性能 规避过多的goroutine创建侵占系统资源,cpu&内存 关键技术 锁同步: golang有CAS机制,用spin-lock替代mutex 原理, 讨论 LIFO/FIFO队列: LIFO队列能直接有时间排序功能,方便对需要关联入队时间的操作进行处理 Pool容量限制和弹性伸缩 代码实现 pool.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 package go_pool import ( "errors" "sync" "sync/atomic" "time" ) const( OPEN = iota CLOSED ) var ( ErrPoolClosed = errors.New("this pool has been closed") ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") ErrInvalidExpiryTime = errors.New("invalid expiration time") ErrInvalidPoolCapacity = errors.New("invalid pool capacity") DefaultScanInterval = time.Second ) type Pool struct { capacity int32 running int32 lock sync.Locker scanDuration time.Duration blockingTasksNum int maxBlockingTasks int state int32 cond *sync.Cond workers WorkerQueue // LIFO queue workerCache sync.Pool } func (p *Pool) Submit(task func()) error{ if atomic.LoadInt32(&p.state) == CLOSED{ return ErrPoolClosed } // retrieve worker to do the task // return error if no workers available var w *Worker if w = p.retrieveWorker(); w == nil{ return ErrPoolOverload } w.task <- task return nil } func (p *Pool) Shutdown() { atomic.StoreInt32(&p.state, CLOSED) p.lock.Lock() // reset worker queue p.workers.reset() p.lock.Unlock() } func (p *Pool) isClosed() bool{ return atomic.LoadInt32(&p.state) == CLOSED } // change the capacity of the pool func (p *Pool) Resize(size int){ if p.Cap() == size{ return } atomic.StoreInt32(&p.capacity, int32(size)) // need to stop certain workers if #running_workers > #new_capacity diff := p.Running() - size if diff > 0{ for i := 0; i< diff; i++{ p.retrieveWorker().task <- nil } } } func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPEN){ // initialize the purging go routine go p.scavengerRoutine() } } func (p *Pool) Running() int{ return int(atomic.LoadInt32(&p.running)) } func (p *Pool) Cap() int{ return int(atomic.LoadInt32(&p.capacity)) } func (p *Pool) Free() int{ return p.Cap() - p.Running() } func (p *Pool) incRunning(){ atomic.AddInt32(&p.running, 1) } func (p *Pool) decRunning(){ atomic.AddInt32(&p.running, -1) } // put the worker back into the pool for recycling func (p *Pool) recycleWorker(worker *Worker) bool{ capacity := p.Cap() if p.isClosed() || (capacity >= 0 && p.Running() > capacity){ return false } worker.recycleTime = time.Now() p.lock.Lock() // need to double check if state is CLOSED if p.isClosed(){ p.lock.Unlock() return false } err := p.workers.add(worker) if err != nil{ p.lock.Unlock() return false } // notify any request stuck in retrieveWorker that there is an available worker in pool p.cond.Signal() p.lock.Unlock() return true } func (p *Pool) spawnWorker() *Worker{ worker := p.workerCache.Get().(*Worker) worker.Run() return worker } func (p *Pool) retrieveWorker() (worker *Worker){ p.lock.Lock() worker = p.workers.detach() // get worker from queue successfully if worker != nil{ p.lock.Unlock() }else if capacity := p.Cap();capacity == -1{ p.lock.Unlock() // spawn worker return p.spawnWorker() }else if p.Running() < capacity{ // infinite pool p.lock.Unlock() // spawn worker return p.spawnWorker() }else{ // if the number of blocking tasks reaches the maximum blocking tasks threshold then returns nil // and throw the ErrPoolOverload error in Submit method if p.maxBlockingTasks != 0 && p.maxBlockingTasks <= p.blockingTasksNum{ p.lock.Unlock() return } // the pool is full need to wait until worker is available for task handling Retry: // handle the number of blocking task handling requests // wait until condition being notified p.blockingTasksNum++ p.cond.Wait() p.blockingTasksNum-- // ensure there is a worker available because you don't know if the recycled worker being closed then if p.Running() == 0{ p.lock.Unlock() // spawn worker return p.spawnWorker() } worker = p.workers.detach() if worker == nil{ goto Retry } p.lock.Unlock() } return } func (p *Pool) scavengerRoutine(){ heartbeat := time.NewTicker(p.scanDuration) defer heartbeat.Stop() for range heartbeat.C{ if p.isClosed(){ break } // all workers get cleaned up and some invokers still get stuck on cond.Wait() // we need to wake up all invokers in that situation. if p.Running() == 0{ p.cond.Broadcast() } } } func NewPool(capacity int)(*Pool, error){ if capacity <= 0{ capacity = -1 } pool := &Pool{ capacity: int32(capacity), lock: NewSpinLock(), } pool.workerCache.New = func() interface{}{ return &Worker{ pool: pool, task: make(chan func(), 1), } } pool.scanDuration = DefaultScanInterval // initialize the worker queue if capacity == -1{ return nil, ErrInvalidPoolCapacity } pool.workers = NewWorkerQueue(0) pool.cond = sync.NewCond(pool.lock) // initialize the purging goroutine go pool.scavengerRoutine() return pool, nil } worker.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package go_pool import ( "time" ) type Worker struct{ pool *Pool task chan func() recycleTime time.Time } func (w *Worker) Run(){ w.pool.incRunning() go func(){ defer func(){ w.pool.decRunning() w.pool.workerCache.Put(w) // todo: panic recovery strategy }() for f := range w.task{ // receiving nil indicates that the worker should stop and quit go routine if f == nil{ return } f() // recycle worker back into the pool, if not success quit go routine if success := w.pool.recycleWorker(w); !success{ return } } }() } worker_queue.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package go_pool type WorkerQueue interface { len() int isEmpty() bool add(worker *Worker) error detach() *Worker reset() } func NewWorkerQueue(size int) WorkerQueue{ return NewSimpleWorkerQueue(size) } func NewSimpleWorkerQueue(size int) *simpleWorkerQueue{ return &simpleWorkerQueue{ size: size, workers: make([]*Worker, 0, size), } } type simpleWorkerQueue struct{ workers []*Worker size int } func(sq *simpleWorkerQueue) len() int{ return len(sq.workers) } func(sq *simpleWorkerQueue) isEmpty() bool{ return sq.len() == 0 } func (sq *simpleWorkerQueue) add(worker *Worker) error{ sq.workers = append(sq.workers, worker) return nil } func (sq *simpleWorkerQueue) detach() *Worker{ length := sq.len() if length == 0{ return nil } worker := sq.workers[length - 1] sq.workers[length - 1] = nil // slice operation should avoid memory leak sq.workers = sq.workers[:length-1] return worker } func (sq *simpleWorkerQueue) reset(){ for i := 0;i < sq.len(); i++{ sq.workers[i].task <- nil sq.workers[i] = nil } sq.workers = sq.workers[:0] } lock.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package go_pool import ( "runtime" "sync" "sync/atomic" ) type spinLock uint32 func (sl *spinLock) Lock() { for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { runtime.Gosched() } } func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32)(sl), 0) } // NewSpinLock instantiates a spin-lock. func NewSpinLock() sync.Locker { return new(spinLock) } pool_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package go_pool import ( "math" "runtime" "sync" "testing" "time" ) const( _ = 1 << (10 * iota) KiB //1024 MiB // 1048578 ) const ( InfinitePoolSize = math.MaxInt32 PoolSize = 10000 SleepTime = 100 OverSizeTaskNum = 10 * PoolSize UnderSizeTaskNum = 0.2 * PoolSize ) var currentMem uint64 func demoTaskFunc(args interface{}){ n := args.(int) time.Sleep(time.Duration(n) * time.Millisecond) } func TestPoolWaitToGetWorker(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< OverSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestPoolGetWorkerFromCache(t *testing.T){ var currentMem uint64 var wg sync.WaitGroup p, err := NewPool(PoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestNoPool(t *testing.T){ var wg sync.WaitGroup for i:=0; i<UnderSizeTaskNum; i++{ wg.Add(1) go func(){ defer wg.Done() demoTaskFunc(SleepTime) }() } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } func TestWithInfinitePool(t *testing.T){ var wg sync.WaitGroup p, err := NewPool(InfinitePoolSize) defer p.Shutdown() if err != nil { t.Errorf("err: %s", err.Error()) } for i:=0; i< UnderSizeTaskNum; i++{ wg.Add(1) _ = p.Submit(func(){ demoTaskFunc(SleepTime) wg.Done() }) } wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) currentMem = mem.TotalAlloc/KiB - currentMem t.Logf("memory usage: %d KB", currentMem) } pool_benchmark_test.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package go_pool import ( "testing" "time" ) const ( RunTimes = 5000000 BenchParam = 10 BenchPoolSize = 200000 ) func demoFunc() { time.Sleep(time.Duration(BenchParam) * time.Millisecond) } func BenchmarkPoolThroughput(b *testing.B) { p, _ := NewPool(BenchPoolSize) defer p.Shutdown() b.StartTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { _ = p.Submit(demoFunc) } } b.StopTimer() } func BenchmarkGoroutinesThroughput(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { go demoFunc() } } }

December 30, 2020 · 9 min · 1751 words · Me