celery 安装及使用

1. centos7 下安装celery

1.1 安装pip

1
2
yum -y install epel-release
yum install python-pip

1.2 更新pip

1
2
pip install --upgrade pip
pip install --upgrade setuptools

1.3 安装celery

1
pip install -U Celery

1.4 中间人安装,选择使用redis

1.4.1 安装redis

1
2
3
4
yum -y install redis
vim /etc/redis.conf
#bind 127.0.0.1 // 注释掉,使redis允许远程访问
requirepass 密码 // 修改这行,redis登录密码

1.4.2 安装额外依赖

1
pip install -U celery[redis]

1.4.3 redis在celery下的使用配置

1)使用配置,只需要设置redis的位置:

1
2
例如:BROKER_URL = 'redis://localhost:6379/0'
格式解释:redis://:password@hostname:port/db_number

2)可见性超时

可见性超时定义了等待职程在消息分派到其他程序之前确认收到任务的秒数
通过以下方式配置:

1
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  // 默认为1小时

这个时间会导致一种情况:任务循环重新执行;可以通过延长可见性时间来解决这个状况,例如

1
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}

3)存储任务的状态和返回值

1
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' 			//配置这个参数

4)广播信息默认对所有虚拟主机可见,配置成只能被活跃的虚机接受

1
BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}

2. celery 架构理解

2.1 模块各功能

1)任务生产者(task producer)

负责生产计算任务,交给消息队列进行处理。只要是调用了celery的api,我们都可以称为是任务生产者。

2)任务调度器(celery beat)

任务调度器是以独立的进程形式存在,它会读取配置文件的内容,周期性的将任务的请求发给任务队列。celery beat是celery自带的任务生产者,系统管理员可以选择关闭或者开启它,同事一个celery系统只能有一个celery beat调度器。

3)任务代理(broker)

任务代理方负责接收任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方。因为任务处理是基于message的,所以一般选择使用rabbitmq或者redis等消息队列或者数据库作为celery的message boker

4)任务消费方(celery worker)

celery worker就是执行任务的一方,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。可多节点部署celery worker。

5)结果保存

celery支持任务处理完之后将状态信息和结果的保存,以供查询。

2.2 功能尝试

2.1.1 任务生产者入口

1)创建celery对象

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
import time

app = Celery('notify_friends', backend='redis://:123123@localhost:6379/0',broker = 'redis://:123123@localhost:6379/0')

@app.task
def notify_friends(userId,newsId):
print 'Start at task at {0}, userId: {1}, newsId: {2} .'.format(time.ctime(),userId,newsId)
time.sleep(2)
print 'Task notify_friends success at {0}'.format(time.ctime())
return True

2)启动celery worker

1
celery -A notify_friends worker --loglevel=info

3)调用celery api

1
2
3
4
5
6
7
8
9
10
11
from notify_friends import notify_friends
import time

def notify(userId,newsId):
result = notify_friends.delay(userId,newsId)
while not result.ready():
time.sleep(1)
print result.get(timeout=10)
if __name__ == '__main__':
for i in range(10):
notify('root',str(i))

2.1.2 任务调度器入口(定时任务调用celery api)
1)创建配置文件,定义schedule

1
2
3
4
5
6
7
8
from datetime import timedelta

CELERYBEAT_SCHEDULE = {
'select_popular_book':{
'task': 'favourite_book.select_popular_book',
'schedule': timedelta(seconds=10),
},
}

2)创建celery对象,并且加载配置文件

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery
import time

app = Celery('select_popular_book',backend='redis://:123123@localhost:6379/0',broker = 'redis://:123123@localhost:6379/0',)
app.config_from_object('config')

@app.task
def select_popular_book():
print 'Start at {0}'.format(time.ctime())
time.sleep(2)
print 'Task ... success at {0}'.format(time.ctime())
return True

3)启动celery worker

1
celery -A notify_friends worker --loglevel=info

4)启动celery beat

1
celery -A favorite_book beat

欢迎打赏!您的鼓励将支持我继续前行!