Celery Route 模块解读。
最近在线上观察时发现了有任务的队列堆积了,当时判断是同事写的 Celery crontab 任务堆积了,当时他给我看了线上的配置,他说没有指定定时任务的队列,会造成堆积,但是这个配置在线上已经运行了好几个月了,如果原因是这个,那很早之前就暴露问题了,不过本人对 Celery 还停留在会使用的阶段,一些内部细节不是很了解,所以在周末抽了时间来一探究竟。
在对源码进行 grep
和 rdb
(celery 的远程调试后),确定了能够解决的疑惑的位置在 Celery route
模块。
这里结合下面一段示例代码,对 Celery route 进行解读(使用 Celery 版本为 3.1.21)。
1 | # -*- coding: utf8 -*- |
PS: 这里示例代码虽然是演示 Periodic,但是对于普通 Task 也一样。
运行后截图:
- Celery beat
- Celery Periodic Task
可以看到控制台中,task 会每隔 30s 运行一次,并返回 hello world
。
那接下来,我们就开始讲解 Celery Route
吧。
在上面的代码中,有一段这样的配置代码:
1 | app.conf.CELERY_ROUTES = { |
上面的代码的意思是,在 celery_app
这个 Celery application 下,名为 hello
的任务,只消耗来自于队列 test
的任务。
Celery Beat
这里把 Celery Beat
的调用链抽象出来,大致的流程如下:
1 | tick -> send scheduler -> send to task |
这里主要关注 send to task
这步,而这步主要使用 Celery
对象中定义的 send_task
方法。
1 | def send_task(self, name, args=None, kwargs=None, countdown=None, |
上面和 route
相关的代码有:
- 获取 route: router = router or amqp.router
- 获取 route options: options = router.route(options, route_name or name, args, kwargs)
Router
一般情况下,这里的 router
就是 amqp.router
。而 amqp.router
,则是 celery.app.routes.Router
的实例化对象。接下来我们看创建该实例化对象都做了什么,代码如下:
1 | def Router(self, queues=None, create_missing=None): |
这里有四个初始化参数,主要着重以下 3 个:
- self.routes
其是一个描述符,里面存储的是路由表信息,这些路由表的信息使用以下代码获取:
1 | def flush_routes(self): |
其中 _routes
是 celery.app.routes
模块,而路由表的信息,则有 CELERY_ROUTES
提供,其中 prepare
方法定义如下:
1 | def prepare(routes): |
这里主要是预先生成好,route 表的类型,由 MapRoute
(包括了 Dict,且是默认路由类型),其它都是自定义的路由类型,上面的例子所使用的便是 MapRoute
类型的路由表。
- queues or self.queues
这里 queues
没有特别指明,都是默认的 celery.amqp.Queues
的实例化对象,其中 Queues
继承于 dict
:
1 | @cached_property |
在没有指定 CELERY_QUEUES
时,默认会创建一个名为 celery
默认队列:
1 | if not queues and conf.CELERY_DEFAULT_QUEUE: |
- create_missing
这个属性很重要,特别是在生产环境中,当 Celery
发现 task 需要的队列不存在会自行创建相应的队列。
1 | def __missing__(self, name): |
由于 Queues
是继承于 dict
,这里 override
了 magic method __missing__
,这样就可以在 self.create_missing
开启时,自行创建不存在的队列。
Router Options
1 | options = router.route(options, route_name or name, args, kwargs) |
options
是通过 router.route
进行获取的,其详细代码如下:
1 | def route(self, options, task, args=(), kwargs={}): |
这里先讲解以上代码的含义,对 lookup_route
和 expand_destination
放在后面。上面的含义是首先从 expand_destination
中获取扩展的选项。接着检查是否由路由信息,如果有,则查找该路由信息是否含有 task
对应的路由信息,如果有,则将对得到的路由信息 route
扩展之后,与 options
合并之后返回;如果 routes
不存在,并且 options
没有队列的信息,则使用默认的队列 celery
,并与原 options
合并之后返回。
其中使用到的 lookup_route
和 expand_destination
代码如下:
- lookup_route
1 | def lookup_route(self, task, args=None, kwargs=None): |
用于查询路由表信息,这里的 _first_route 方法定义如下:
1 | _first_route = firstmethod('route_for_task') |
而 firstmethod 的定义如下:
1 | def firstmethod(method): |
这两段代码的含义时,_first_route
使用 route
定义的方法 route_for_task
在 self.routes
路由表中是否存在 task
对应的路由信息,如果有则返回,否则返回 None
。
- expand_destination
1 | def expand_destination(self, route): |
该方法的主要作用是检查 self.queues
是否存在 queue
对应的队列:
1 | self.queues[queue] |
如果 create_missing = True
这里会创建 missing
的队列,返回的 Q
是 kombu.Queue
的实例化对象。
如果 task
指定了队列 queue
,这里会修改路由表里面的 queue
会 Q
后返回。
以上便是 Celery
中 Route
的解读。通过对源码的解读,验证了当时自己的判断是正确的堆积的并不是因为 Periodic task
的配置没有写 queue
信息。
目前,我司线上的 Route
的配置就像我上面使用 dict configuration
方式进行的,而 celery
官方并不推荐,相反推荐的是这样的方式:
1 | # -*- coding: utf8 -*- |
这样做是利用到了 CELERY_CREATE_MISSING_QUEUES
(默认为 True
),自动创建不存在队列的,这样可以避免写太多冗余的配置的情况。