简述
Luigi基于Python,可以快速搭建数据处理的任务管线。
在一定程度上支持分布式处理。
开始一个任务
使用luigi启动一个任务,有两种方法:
-
通过luigi命令行执行
类似luigi --module main MyTaskAll --id 1 --local-scheduler
,这里main
是我的.py文件名(为了可以找到整个main.py,执行前需要把main.py所在路径加到PythonPath里边),MyTaskAll
是main.py中我定义的Task类名,--id 1
是传给MyTaskAll
任务的具体参数。--local-scheduler
是配置调度器,这个后边会说。 -
直接在.py中执行
可以直接在main.py
中执行luigi.build([MyTaskAll(id=1)], local_scheduler=True)
,和上边方法的效果一样。
核心元素
Task
Luigi中的基本单位是Task
,一个Task又可以依赖其他的Task。
每个Task有一个id,截至撰文时的版本3.5.1
,这个id仅由任务类名和传入的具体参数组成。
每个Task,需要定义它的依赖、任务本身、输出。即,requires、run、output。
luigi框架中,有个任务的基础类luigi.Task
,当我们想定义一个任务时,需要继承这个类,然后分别重写上述的三个方法。
requires
requires()
方法应该返回一个Task对象,或者返回一个包含Task对象的容器。用于luigi中的调度器来分析当前任务需要哪些前置任务。
run
run()
方法内部是这个Task要做的具体事情,无需返回任何内容。需要注意的是,run方法必须是幂等的。
output
output()
方法应该返回一个Target对象,或者一个包含Target对象的容器。Target对象可以是具体的文件,也可以是数据库中的某个具体条目,核心要求是Target对象的exist方法,因为,luigi通过output().exist()判断任务是否需要再次执行。通常,可以用一些luigi内置的Target类型,比如:一般的文件类型luigi.LocalTarget
workers
当我们启动一个任务的时候,这个任务及其依赖的任务,都由workers负责具体执行,这个workers是一个集合,内部包含了至少1个worker(其实这里不应该叫worker,因为文章下面有一个worker的概念,这里暂且这么叫吧先)。启动任务的时候,我们可以指定worker的数量。
比如:
luigi --module main MyTaskAll --id 1 --local-scheduler --workers=10
luigi.build([MyTaskAll(id=1)], local_scheduler=True, workers=10)
数量有什么用?
想象一下,如果一个任务MyTaskAll,它依赖10个SubTask,同时,这10个SubTask之间没有相互依赖,并且10个SubTask各自都没有依赖其他任务。
理想情况下,10个SubTask同时执行最好,那么workers的数量应该设置为10,或者更多。
如果workers的数量为1,那么,10个SubTask就会被这一个worker顺序执行,累死了。
scheduler
中文翻译成:调度器
luigi中通过scheduler来调度任务,在开发时,可以使用--local-scheduler
参数来完成,比较方便。在部署业务时,luigi强烈建议使用“Central Scheduler”,即“中央调度器”。
luigi中的中央调度器最核心的作用是“防止同一个任务重复执行”,此外,还提供了web来查看任务的执行情况。
根据我的理解,整个调度过程如下:
- 本地启动任务时,附加启动了workers
- workers连接到scheduler,定期查询自己是否需要执行某些任务。
- scheduler根据任务的执行情况,决定是否要给workers分配任务、分配哪个任务。
- workers执行具体任务的时候,要不断的向scheduler汇报自己的进展(开始、失败、成功,也可以有其他自定义的信息)
worker
注意,这里是worker
,不是workers
。
这里的worker不是workers集合中的成员,而是在scheduler服务器里web上看到的。个人理解,如果一个任务,比如MyTaskAll(id=1),在一个地方被启动,那这就有产生了一个worker。如果同时在两个地方启动这个任务,并且他们使用共同的中央调度器,在这个调度器的眼里,此时这个任务就有两个worker。
对于中央调度器而言,如果一个任务有2个worker(多个)在工作,那么他们的工作结果是共享的。比如worker1里的workers成功执行了SubTask(id=1),那么worker2里的workers就不会再执行SubTask(id=1)了,因为,在中央调度器眼里,两个worker为同一个目标努力,那么,他们的结果应该是一样的,SubTask(id=1)不需要再被重复执行。