How to run periodic tasks in Celery

Recently, I came to a situation where I had to schedule a series of tasks that will run periodically in order to do some jobs. A scenario like this looks like a good fit for crontab but I decided to give Celery a go and use Celery Beat.
First of all let’s make a quick recap of what Celery is and what it is good at.
Celery is a python library used to handle time consuming tasks and delegate them to separate processes or distributed network hosts in order to reduce the load of web servers/backend services.
Most of the use cases show Celery be a perfect fit for Web based project.
I am not a web developer and my main interest is designing and developing what is commonly identified with backend microservices. Celery is good solution to schedule tasks to run in a microservices architecture.
To fulfil our expectations Celery uses an architecture based on two main components, Queues and Workers. A Queue is the data structure where our tasks are pushed and from where are pulled from workers so as to be executed asynchronously.
A task is some work we tell Celery to run at a given time or periodically, such as sending an email or generate a report every end of month.
One important thing to mention here is that the Queue component does not come as part of the library. An external message broker, such as Redis or RabbitMQ, has to be used for this purpose. Using an external messages brokers as Queue makes our application reliable and capable of managing the enqueued tasks even after a crash of our application.
Celery implements the Workers using an execution pool, so the number of tasks that can be executed by each worker depends on the number of processes in the execution pool.
Celery allows us to keep track as well as to set up several different retry policies for tasks that fail. At the same time, Celery allows us to configure a rate limit for tasks in order to not allow more than a given amount of task every time.We can use Celery for
* Periodic tasks : jobs that we need to schedule at a specific time or after regular interval. A good example is a monthly report generation or a periodic check of resources
* 3-parties :The web app must serve users quickly without waiting for other actions to complete while the page loads, e.g., sending an email or notification or propagating updates to internal tools (such as gathering data for A/B testing or system logging).
* Long-running jobs — Jobs that are expensive in resources, where users need to wait while they compute their results, e.g., complex workflow execution (DAG workflows), graph generation, Map-Reduce like tasks, and serving of media content (video, audio).
Celery Beat
In order to achieve the goal of having a number of scheduled tasks that are executed asynchronously I will use Celery Beat. Celery Beat is a scheduler that announce tasks at regular intervals that will be executed by workers nodes in the cluster.
As the official website reports,
By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.
You have to ensure only a single scheduler is running for a schedule at a time, otherwise you’d end up with duplicate tasks. Using a centralized approach means the schedule doesn’t have to be synchronized, and the service can operate without using locks.

A good question that I asked myself before going through Celery is why should I use celery beat? Why not crontab? Well, first of all, in order to use a crond service you need be root of your *nix distro and sometimes you do not have that level. Not to mention, I’ve never be a great fan of using crontab script in a distributed application. Moreover, using an external solution to manage tasks allows me not to worry about where my application will be deployed as long as it can access Celery ( Every environment that can run Python can also access celery beat )
So portability is always a good reason to choose a solution.
Configure Celery
To getting started with Celery, just follow a step-by-step guide at the official docs.
Celery Scheduler :
To create periodic tasks, we need to define them using the beat_scheduler setting. Celery beat checks the beat_scheduler setting to manage the tasks that need to be executed periodically.
To the purpose of my example I use Redis as message broker. So the first step is to tell Celery who is his messages broker.
Let’s have a file, `task.py`
#task.pyfrom celery import Celeryapp = Celery(‘tasks’,broker=”redis://localhost:6379/0")
The first argument to Celery is the name of the current module, tasks in my example. This way names can be automatically generated.
The second argument is the broker keyword which specifies the URL of the message broker.
Let’s say we have to check for a given job every 10 seconds.
#task.pyfrom celery import Celeryapp = Celery(‘tasks’,broker=”redis://localhost:6379/0")@app.task
def check():
print(“I am checking your stuff”)app.conf.beat_schedule = {
“run-me-every-ten-seconds”: {
“task”: “tasks.check”,
“schedule”: 10.0
}
}
We are telling the beat_schedule to run the function check() every 10.0 seconds. The function to run is identified by the name of the current module {dot} name of the function.
If we run this with the following command
celery -A task beat — loginfo=info
In the console we will see multiple lines of output that every 10 seconds will trigger the tasks.check job.
celery beat v4.3.0 (rhubarb) is starting.
__ — … __ — _
LocalTime -> 2019–10–07 11:52:35
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2019–10–07 11:52:35,266: INFO/MainProcess] beat: Starting…
[2019–10–07 11:52:45,282: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:52:55,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:05,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:15,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:25,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
The tasks.check is triggered every 10 seconds but where is it actually executed? As said before, Celery uses the configured message broker to send and receive message, in our example Redis is the message broker.
Celery beat sends the `tasks.check` task to a queue in Redis every 10 seconds.
Let’s see the workers doing the job
Workers consume messages from a queue by pulling them out off. What we have a the moment is that Celery beat is adding tasks to the queue periodically. We want workers to to their job.
In another terminal we run our worker
celery -A task worker — loglevel=info
in the console we will see something like
[2019–10–07 11:53:17,310: INFO/ForkPoolWorker-8] Task tasks.check[db70d065-b4f6–48f3–8b52–5187090da703] succeeded in 0.00230525199999998s: None
[2019–10–07 11:53:17,312: INFO/MainProcess] Received task: tasks.check[08059687-d0b4–4bab-b824–28f11487c393]
[2019–10–07 11:53:17,313: INFO/MainProcess] Received task: tasks.check[aa215cac-fca6–4d2e-8ccc-d827a2946280]
[2019–10–07 11:53:17,312: WARNING/ForkPoolWorker-2] I am checking your stuff
[2019–10–07 11:53:17,314: INFO/ForkPoolWorker-2] Task tasks.check[3cd58782-c592–49e1–8d6f-5955aa843583] succeeded in 0.0018000820000001472s: None
[2019–10–07 11:53:17,315: INFO/MainProcess] Received task: tasks.check[84b5bc2a-f0fc-4d47–9601-ed3651c8fb7c]
[2019–10–07 11:53:17,316: WARNING/ForkPoolWorker-4] I am checking your stuff
[2019–10–07 11:53:17,316: WARNING/ForkPoolWorker-5] I am checking your stuff
[2019–10–07 11:53:17,317: INFO/ForkPoolWorker-4] Task tasks.check[08059687-d0b4–4bab-b824–28f11487c393] succeeded in 0.002428212999999957s: None
[2019–10–07 11:53:17,318: INFO/ForkPoolWorker-5] Task tasks.check[aa215cac-fca6–4d2e-8ccc-d827a2946280] succeeded in 0.0025583710000001147s: None
As we can see, the worker received the task and soon after it is executed. It’s just a very basic example but in the real world facts are different.
Now, Celery allows us to have of the needed configuration in a folder. Let’s create a folder `checker` so as to have the following structure
- checker
— __init__.py
— celery.py
— celeryconfig.py
— tasks.py
Here we see how the different configuraton opetions can be split to separate files.
celery.py tells Celery where to find the startup configuration app.config_from_object and where is the task to run.
#celery.pyfrom celery import Celeryapp = Celery(‘checker’,
include=[‘checker.tasks’])
app.config_from_object(‘checker.celeryconfig’)app.conf.beat_schedule = {
“run-me-every-ten-seconds”: {
“task”: “checker.tasks.check”,
“schedule”: 10.0
}
}
# celeryconfig.pybroker_url = ‘redis://localhost:6379/0’
task_serializer = ‘json’
result_serializer = ‘json’
accept_content = [‘json’]
timezone = ‘Europe/Dublin’
enable_utc = True#tasks.pyfrom checker.celery import app@app.task
def check():
print(“I am checking your stuff”)
So, let’s run our celery beat. Open a terminal and go just a level up to the folder named checker we have just created.
In order to start celery beat, we call the name of the module we have configured, which in my `checker`
celery -A checker beat -l info
celery beat v4.3.0 (rhubarb) is starting.
__ — … __ — _
LocalTime -> 2019–10–07 12:02:41
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2019–10–07 12:02:41,741: INFO/MainProcess] beat: Starting…
[2019–10–07 12:02:41,853: INFO/MainProcess] Scheduler: Sending due task see-you-in-ten-seconds-task (checker.tasks.check)
[2019–10–07 12:02:51,842: INFO/MainProcess] Scheduler: Sending due task see-you-in-ten-seconds-task (checker.tasks.check)
and in order to run our worker we do
celery -A checker worker -l info — — — — — — — celery@ie-macp-andi v4.3.0 (rhubarb)
— — **** — — —
— — * *** * — Darwin-18.7.0-x86_64-i386–64bit 2019–10–07 12:04:36
— * — **** — —
- ** — — — — — [config]
- ** — — — — — .> app: checker:0x1044185f8
- ** — — — — — .> transport: redis://localhost:6379/0
- ** — — — — — .> results: disabled://
- *** — — * — — .> concurrency: 8 (prefork)
— ******* — — .> task events: OFF (enable -E to monitor tasks in this worker)
— — ***** — — —
— — — — — — — [queues]
.> celery exchange=celery(direct) key=celery[tasks]
. checker.tasks.check[2019–10–07 12:04:36,487: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019–10–07 12:04:36,495: INFO/MainProcess] mingle: searching for neighbors
[2019–10–07 12:04:37,519: INFO/MainProcess] mingle: all alone
[2019–10–07 12:04:37,530: INFO/MainProcess] celery@ie-macp-andi ready.
[2019–10–07 12:04:37,841: INFO/MainProcess] Received task: checker.tasks.check[7321f00d-8365–4207–88dc-18507f00a561]
[2019–10–07 12:04:37,842: INFO/MainProcess] Received task: checker.tasks.check[241aab9d-8efb-429c-ac32–9e368a749a0d]
[2019–10–07 12:04:37,844: INFO/MainProcess] Received task: checker.tasks.check[6650454c-2bef-4462–93fc-c8eef6faf48b]
[2019–10–07 12:04:37,845: INFO/MainProcess] Received task: checker.tasks.check[f27ac97a-2f31–44b6-b39a-c39fa3df6f2e]
[2019–10–07 12:04:37,844: WARNING/ForkPoolWorker-1] I am checking your stuff
[2019–10–07 12:04:37,844: WARNING/ForkPoolWorker-8] I am checking your stuff
[2019–10–07 12:04:37,845: INFO/ForkPoolWorker-1] Task checker.tasks.check[241aab9d-8efb-429c-ac32–9e368a749a0d] succeeded in 0.0018057599999998786s: None
[2019–10–07 12:04:37,845: INFO/ForkPoolWorker-8] Task checker.tasks.check[7321f00d-8365–4207–88dc-18507f00a561] succeeded in 0.0019296549999998636s: None
[2019–10–07 12:04:37,847: INFO/MainProcess] Received task: checker.tasks.check[53d4a73e-c6fd-461a-80e6-b59fdf947772]
[2019–10–07 12:04:37,847: WARNING/ForkPoolWorker-3] I am checking your stuff
[2019–10–07 12:04:37,848: WARNING/ForkPoolWorker-4] I am checking your stuff
[2019–10–07 12:04:37,849: INFO/MainProcess] Received task: checker.tasks.check[ea4481ca-a6ff-4ab7–99a6–54ac0006ddc7]
[2019–10–07 12:04:37,849: INFO/ForkPoolWorker-3] Task checker.tasks.check[6650454c-2bef-4462–93fc-c8eef6faf48b] succeeded in 0.0018835969999999591s: None
[2019–10–07 12:04:37,849: INFO/ForkPoolWorker-4] Task checker.tasks.check[f27ac97a-2f31–44b6-b39a-c39fa3df6f2e] succeeded in 0.0019496850000000343s: None
[2019–10–07 12:04:37,850: INFO/MainProcess] Received task: checker.tasks.check[c57c6e03-c988–46e1-bf99-e7e110a2fa67]
[2019–10–07 12:04:37,851: INFO/MainProcess] Received task: checker.tasks.check[4523917c-2218–42fb-9dd1–773584fa3fd4]
[2019–10–07 12:04:37,851: WARNING/ForkPoolWorker-6] I am checking your stuff
[2019–10–07 12:04:37,851: WARNING/ForkPoolWorker-7] I am checking your stuff
[2019–10–07 12:04:37,852: WARNING/ForkPoolWorker-1] I am checking your stuff
[2019–10–07 12:04:37,853: INFO/MainProcess] Received task: checker.tasks.check[e61076cc-822f-45b3–832f-3813bd4d3749]
Please keep in mind that in a high traffic environment like production, it would probably be better to run multiple workers so to handle multiple requests.
Another way to work with scheduled task is to use the Crontab Schedules of Celery beat. If we need more control on the task to execute we can use
from celery import Celery
from celery.schedules import crontabapp = Celery(‘tasks’,broker=”redis://localhost:6379/0")app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
‘add-every-monday-morning’: {
‘task’: ‘tasks.add’,
‘schedule’: crontab(hour=7, minute=30, day_of_week=1),
‘args’: (16, 16),
},
}
Let’s say we have a list of users to who we need to send a daily message with the tasks to do.
So I first modify my celery.py
# celery.py
from celery import Celery
from celery.schedules import crontabapp = Celery(‘checker’,
include=[‘checker.tasks’])
app.config_from_object(‘checker.celeryconfig’)app.conf.beat_schedule = {
“everyday-task”: {
“task”: “checker.tasks.remember_tasks_to_do”,
“schedule”: crontab(hour=7, minute=0)
}
}
Here I am telling Celery to run the checker.tasks.remember_tasks_to_do every day at 7:00. So my new tasks.py is
from checker.celery import app
import json@app.task
def remember_tasks_to_do():
file_object = open(‘employees.json’, ‘r’)
# Load JSON file data to a list of python dict object.
employees_list = json.load(file_object)
for each_employee in employees_list:
print(“Good morning %s. This is your list of tasks to do %s”%(each_employee.get(‘username’),each_employee.get(‘tasks’)))
You can find all the example in my github repo.