Golang被证明非常适合并发编程,goroutine比异步编程更易读、优雅、高效。本文提出一个适合由Golang实现的Pipeline执行模型,适合批量处理大量数据(ETL)的情景。

想象这样的应用情景: (推荐学习:go)

从数据库A(Cassandra)加载用户评论(量巨大,例如10亿条);根据每条评论的用户ID、从数据库B(MySQL)关联用户资料;调用NLP服务(自然语言处理),处理每条评论;将处理结果写入数据库C(ElasticSearch)。

由于应用中遇到的各种问题,归纳出这些需求:
需求一:应分批处理数据,例如规定每批100条。出现问题时(例如任意一个数据库故障)则中断,下次程序启动时使用checkpoint从中断处恢复。
需求二:每个流程设置合理的并发数、让数据库和NLP服务有合理的负载(不影响其它业务的基础上,尽可能占用更多资源以提高ETL性能)。例如,步骤(1)-(4)分别设置并发数1、4、8、2。

这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。

你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。

重用的Pipeline模块

为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。

使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:

package main import "log" func main() {    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。    checkpoint := loadCheckpoint()        //工序(1)在pipeline外执行,最后一个工序是保存checkpoint    pipeline := NewPipeline(4, 8, 2, 1)     for {        //(1)        //加载100条数据,并修改变量checkpoint        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。        data, err := extractReviewsFromA(&checkpoint, 100)         if err != nil {            log.Print(err)            break        }                //这里有个Golang著名的坑。        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。        curCheckpoint := checkpoint                ok := pipeline.Async(func() error {            //(2)            return joinUserFromB(data)        }, func() error {            //(3)            return nlp(data)        }, func() error {            //(4)            return loadDataToC(data)        }, func() error {            //(5)保存checkpoint            log.Print("done:", curCheckpoint)            return saveCheckpoint(curCheckpoint)        })        if !ok { break }                if len(data) < 100 { break } //处理完毕    }    err := pipeline.Wait()    if err != nil { log.Print(err) }}

更多相关文章

  1. Shell脚本高效检测主机存活
  2. 高效编写Dockerfile的几条准则
  3. 牛逼了!Python代码补全利器,提高效率告别996!
  4. 高效开发!借助 Mac + Windows 实现八屏办公!
  5. 高效方法 | Jupyter Notebook 比你想象中的还要强大
  6. 如何高效地远程部署?自动化运维利器 Fabric 教程
  7. 更高效的利用Jupyter+pandas进行数据分析,6种常用数据格式效率对
  8. 原创的20个Python自动化案例,一口一个,高效办公!
  9. 如何成为高效的学习高手-摘要

随机推荐

  1. SingleTask模式的Activity接受Intent注意
  2. android拍照获取图片
  3. Android(安卓)判断当前线程是否是主线程
  4. Android窗口机制(五)最终章:WindowManager.L
  5. android library project使用
  6. Android:Activity的4种启动模式
  7. android 滚动条 相关属性
  8. Android之布局参数
  9. Android重量级开发之--提高android启动速
  10. Android之常见问题集锦