celery的简介
celery简介
# 任务调度器 Celery
celery是Python开发的一个分布式任务调度模块
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。
celery的优点
1. 简单: celery的使用相对简单 并且在后期维护等操作都是比较方便的
2. 快速: 一个单进程的celery 每分钟可处理上百万个任务 在一些常用且等待时长比较大的时候 将会是最好的选择
3. 灵活: celery的组件 几乎都是可以被自定义以及扩展使用的
4. 高可用: 当任务执行过程中链接中断 那么celery会自动尝试重新执行任务
celery的Broker
当使用数据持久化的时候 选择RabbitMQ
abbitmq提供的队列和消息持久化机制更加稳定
当不在乎任务是否丢失的情况下 使用Redis
redis在服务崩溃的时候 可能导致数据丢失的情况
项目使用celery的操作记录版本
语言/模块 | 版本 | 安装方式 |
---|---|---|
Python | 3.6 | DownloadPython |
celery | 3.1.22 | pip install celery==3.1.22 -i https://pypi.douban.com/simple/ |
django | 1.11.11 | pip install django==1.11.11 -i https://pypi.douban.com/simple/ |
django-celery | 3.1.17 | |
redis | 2.10.6 | pip install redis==2.10.6 -i https://pypi.douban.com/simple/ |
pymongo | 3.11.0 | pip install mongo -i https://pypi.douban.com/simple/ |
celery里面 结合mongodb redis
新建celery 任务文件
import datetime
import pymongo
from celery import Celery, platforms
import redis
# 定义当前模块的名称 以及消息队列的存储路径
redistrend = Celery(
'redistrend', # 当前模块的名字
broker='redis://:密码@localhost:6379/12', # 消息队列的url
)
# 任务过期时间
redistrend.conf.update(
result_expires=3600,
)
# celery不能使用最高权限用户启动的时候 配置一下 C_FORCE_ROOT
platforms.C_FORCE_ROOT = True
同文件下 配置mongodb 服务器配置
# mongodb服务器的配置
CONN_ADDR1 = 'd.com:9876' # 公网
CONN_ADDR2 = 'd.com:9876' # 内网
REPLICAT_SET = 'what
username = 'ikwlamks'
password = 'wajloselkzpp2la0'
# #获取mongoclient
myclient = pymongo.MongoClient([CONN_ADDR2, CONN_ADDR1], replicaSet=REPLICAT_SET)
# #授权。 这里的user基于admin数据库授权。
myclient.admin.authenticate(username, password)
mydb = myclient["joiki"] # 获取连接的集合名称
同文件下 创建任务函数
@redistrend.task
def redis_trend(key, date_list, uid):
first_date, last_date = date_list
first_date = datetime.datetime.strptime(first_date, "%Y-%m-%d") # 第一天
last_date = datetime.datetime.strptime(last_date, "%Y-%m-%d") # 最后一天
day_num = (last_date - first_date).days # 相隔几天
rank_list = {}
for i in range(day_num + 1):
# TODO 此处时间需要修改 本地测试windows--> %Y-%#m-%#d 远程Linux--? %Y-%-m-%-d
new_date = (first_date + datetime.timedelta(days=i)).strftime("%Y-%-m-%-d")
new_mycol = mydb[new_date]
new_data = new_mycol.find({'search_key': {'$regex': key}}).sort('search_frequency')
for data in new_data:
if data['search_key'] not in rank_list:
rank_list[data['search_key']] = [{'date': data['dated'], 'rank': data['search_frequency']}]
else:
rank_list[data['search_key']].append({'date': data['dated'],
'rank':data['search_frequency']})
# 数据写入redis
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline() # 默认事务开启,具备原子性
pipe.set('%s_%s_%s_%s' % (key, date_list[0], date_list[1], uid), str(rank_list), 3 * 24 * 3600)
pipe.execute()
return rank_list
在需要使用到的地方 调用异步调度器就可以了
# 注意传递参数的时候的格式 args=[xxx,]
redistrend.send_task("redistrend.redis_trend", args=[key, date_list, user.id])
在task文件路径开启celery
Windows下面的开启
celery -A redistrend worker --loglevel=info
Linux下 后台启动
celery multi start a1 -A redistrend -l info ⭐不建议
# 指定log文件以及pid文件的储存位置 ⭐⭐⭐⭐⭐
celery multi start a1 -A redistrend -l info --logfile=./log/1_post.log --pidfile=./log/1_post.pid
在task文件路径停止celery
Windows下面 直接ctrl + c 就可以停止了 / 清掉redis
# 不写点什么怎么觉怪怪的
清除celery的进程
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9
停止celery stop
celery multi stop a1 -A redistrend -l info
重启 restart
celery multi restart a1 -A newtestfile -l info --pidfile=./log/1_post.pid