发布于

Python 分布式任务队列管理库 - Celery

一、Celery基本介绍

Celery 是一个强大的分布式任务队列管理系统,它允许你将工作分发给多个工作节点,这些节点可以是同一台机器上的进程,也可以是网络中的其他机器。Celery 主要用于处理后台任务,提高应用程序的性能和用户体验。

Celery的官方文档 http://www.celeryproject.org/ 学习文档(中文): http://docs.jinkan.org/docs/celery/getting-started/introduction.html

二、Celery的使用方法

安装 Celery 非常简单,你可以使用 easy_install 或 pip 来安装:

easy_install -U Celery
# 或者
pip install -U Celery

安装完成后,你可以通过以下命令来启动Celery worker:

nohup celery -A tasks worker --loglevel=info -E -B >> log/celeryd.log 2>&1 &

这条命令会启动一个Celery worker,并将日志输出到log/celeryd.log文件中。其中-A tasks指定了任务模块,--loglevel=info设置日志级别为info,-E启用事件,-B表示也要启动beat服务。

下面是一个简单的tasks.py示例内容:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

三、Celery的配置

Celery的配置可以在Python模块中指定,例如创建一个celeryconfig.py文件,并在Celery实例中加载它:

app.config_from_object('celeryconfig')

你可以在配置文件中定义broker的URL、结果后端、任务序列化格式等。

Python 数据聚合计算

在使用Celery进行数据聚合计算时,你可能需要执行以下步骤:

  1. 切换到项目目录:
cd /home/celery_mtm2db/

  1. 停止当前正在运行的计算任务:
sh stop_calculate.sh

  1. 启动一个新的Celery任务:
sh start_calculate.sh

  1. 提交生成宽表的任务:
python generate_wide_table.py -c SJwOPMF_Z

其中-c参数用于指定连锁appid,如果省略,则对全部数据重新计算。计算结果可以通过以下命令查看:

tail -n 20 log/celeryd_calculate.log

如果在日志中看到“OK”,则表示宽表计算已经结束。

  1. 在宽表计算完成后,执行以下命令来计算生成最终结果:
nohup sh generate_aggregate_statistics.sh &

如果第4步和第5步在执行过程中出现失败,再次执行时会清空数据库并重新计算。

定时任务设置

要设置定时任务,可以按照以下步骤操作:

  1. 切换到项目目录:
cd /home/celery_mtm2db/

  1. 停止所有计算任务:
sh stop_calculate.sh

  1. 启动一个新的Celery任务:
sh start_calculate.sh

  1. 启动定时计算任务:
sh start_autoindexcalculate_timer.sh