Celery Redis Mongo 结合使用


celery的简介

celery简介

# 任务调度器 Celery
celery是Python开发的一个分布式任务调度模块
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。

celery的优点

1. 简单: celery的使用相对简单 并且在后期维护等操作都是比较方便的
2. 快速: 一个单进程的celery 每分钟可处理上百万个任务 在一些常用且等待时长比较大的时候 将会是最好的选择
3. 灵活: celery的组件 几乎都是可以被自定义以及扩展使用的
4. 高可用: 当任务执行过程中链接中断 那么celery会自动尝试重新执行任务

celery的Broker

当使用数据持久化的时候 选择RabbitMQ
abbitmq提供的队列和消息持久化机制更加稳定
当不在乎任务是否丢失的情况下 使用Redis
redis在服务崩溃的时候 可能导致数据丢失的情况

项目使用celery的操作记录版本

语言/模块 版本 安装方式
Python 3.6 DownloadPython
celery 3.1.22 pip install celery==3.1.22 -i https://pypi.douban.com/simple/
django 1.11.11 pip install django==1.11.11 -i https://pypi.douban.com/simple/
django-celery 3.1.17
redis 2.10.6 pip install redis==2.10.6 -i https://pypi.douban.com/simple/
pymongo 3.11.0 pip install mongo -i https://pypi.douban.com/simple/

celery里面 结合mongodb redis

新建celery 任务文件

import datetime
import pymongo
from celery import Celery, platforms
import redis

# 定义当前模块的名称 以及消息队列的存储路径
redistrend = Celery(
    'redistrend',  # 当前模块的名字
    broker='redis://:密码@localhost:6379/12',  # 消息队列的url
)

# 任务过期时间
redistrend.conf.update(
    result_expires=3600,
)

# celery不能使用最高权限用户启动的时候 配置一下 C_FORCE_ROOT
platforms.C_FORCE_ROOT = True

同文件下 配置mongodb 服务器配置

# mongodb服务器的配置
CONN_ADDR1 = 'd.com:9876'  # 公网
CONN_ADDR2 = 'd.com:9876'  # 内网
REPLICAT_SET = 'what
username = 'ikwlamks'
password = 'wajloselkzpp2la0'
#  #获取mongoclient
myclient = pymongo.MongoClient([CONN_ADDR2, CONN_ADDR1], replicaSet=REPLICAT_SET)
#  #授权。 这里的user基于admin数据库授权。
myclient.admin.authenticate(username, password)
mydb = myclient["joiki"] # 获取连接的集合名称

同文件下 创建任务函数

@redistrend.task
def redis_trend(key, date_list, uid):
    first_date, last_date = date_list
    first_date = datetime.datetime.strptime(first_date, "%Y-%m-%d")  # 第一天
    last_date = datetime.datetime.strptime(last_date, "%Y-%m-%d")  # 最后一天
    day_num = (last_date - first_date).days  # 相隔几天
    rank_list = {}
    for i in range(day_num + 1):
        # TODO 此处时间需要修改 本地测试windows--> %Y-%#m-%#d 远程Linux--? %Y-%-m-%-d
        new_date = (first_date + datetime.timedelta(days=i)).strftime("%Y-%-m-%-d")
        new_mycol = mydb[new_date]
        new_data = new_mycol.find({'search_key': {'$regex': key}}).sort('search_frequency')
        for data in new_data:
            if data['search_key'] not in rank_list:
                rank_list[data['search_key']] = [{'date': data['dated'], 'rank': data['search_frequency']}]
            else:
                rank_list[data['search_key']].append({'date': data['dated'], 
                                                      'rank':data['search_frequency']})
    # 数据写入redis
    r = redis.Redis(connection_pool=pool)
    pipe = r.pipeline()  # 默认事务开启,具备原子性
    pipe.set('%s_%s_%s_%s' % (key, date_list[0], date_list[1], uid), str(rank_list), 3 * 24 * 3600)
    pipe.execute()
    return rank_list

在需要使用到的地方 调用异步调度器就可以了

# 注意传递参数的时候的格式 args=[xxx,]
redistrend.send_task("redistrend.redis_trend", args=[key, date_list, user.id])

在task文件路径开启celery

Windows下面的开启

celery -A redistrend worker --loglevel=info

Linux下 后台启动

celery multi start a1 -A redistrend -l info   ⭐不建议

# 指定log文件以及pid文件的储存位置 ⭐⭐⭐⭐⭐
celery multi start a1 -A redistrend -l info --logfile=./log/1_post.log --pidfile=./log/1_post.pid

在task文件路径停止celery

Windows下面 直接ctrl + c 就可以停止了 / 清掉redis

# 不写点什么怎么觉怪怪的

清除celery的进程

ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9

停止celery stop

celery multi stop a1 -A redistrend -l info

重启 restart

celery multi restart a1 -A newtestfile -l info --pidfile=./log/1_post.pid

文章作者: 柒仔
文章链接: /article/13/
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 XiaoLiu!
侵权声明: 若无意对您的文章造成侵权,请您留言,博主看到后会及时处理,谢谢。
评论-----昵称和邮箱必填,网址选填
  目录