Django is one of the best web frameworks to build some thing fast which comes with a lot of features. Even if it doesn’t appeal in terms of raw performance, it is one of the fastest frameworks in terms of development iteration.

Celery is a distributed task queue that helps us to run tasks asynchronously. It is widely used in Django applications to run background tasks such as sending emails, processing images, etc.

In this blog, we will see how to run async tasks in Django using Celery.

Setting up Celery in Django

To use celery in Django, we need to install the celery package. We can install it using pip:

pip install celery

We need a broker to store the task messages. We can use Redis, RabbitMQ, or any other broker supported by Celery. In this example, we will use Redis as the broker.

Docker Composer file for Redis:

services:
  redis:
    image: redis
    container_name: redis
    ports:
      - "6379:6379"

We can add the celery configuration in the Django settings file. Here is an example configuration:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'

Next, we need to create a celery configuration file. Create a file named celery.py in the Django project directory.

# celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Creating a Celery Task

To create a Celery task we can add tasks.py in the Django app directory. Here is an example task that sends an email:

# tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_email_task(subject, message, recipient_list):
    send_mail(subject, message, 'test@email.com', recipient_list)

@shared_task decorator is used to define a reusable task. with this decorator, we can run the task without importing the celery app instance.

The task can be called using .delay() method. eg: send_email_task.delay('Subject', 'Message', ['uesr@email.com'])

Running Celery Worker

To run the celery worker, we can use the celery command:

celery -A myproject worker --loglevel=info

Here, myproject is the name of the Django project.

Scheduling with Celery Beat

Install django-celery-beat package to schedule tasks using Celery Beat:

pip install django-celery-beat

Add django_celery_beat to the INSTALLED_APPS in the Django settings file:

# settings.py

INSTALLED_APPS = [
    ...
    'django_celery_beat',
]

Adding a static configuration to the celery config file:

# celery.py
from __future__ import absolute_import, unicode_literals
import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject')

# Create a Celery instance
app = Celery

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# Load the Celery Beat scheduler
app.conf.beat_schedule = {
    'send-email-every-5-minutes': {
        'task': 'myapp.tasks.send_email_task',
        'schedule': 300.0,
    },
}

Run the migrations:

python manage.py migrate

Start the Celery Beat scheduler:

celery -A myproject beat --loglevel=info

Adding Tasks from Database

We can add tasks to the Celery Beat scheduler from the database. We can use the Django admin interface or directly add tasks to the database.

To avail this we need to add the following config:

# settings.py

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

Here is an example to add a task to the database:

for a task in tasks.py:

# tasks.py
from celery import shared_task

@shared_task
def send_email_task():
    ...

We can add the task to the database using the following code:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule, created = IntervalSchedule.objects.get_or_create(
    every=10,
    period=IntervalSchedule.SECONDS,
)

PeriodicTask.objects.create(
    interval=schedule,
    name='Send email every 10 seconds
    task='myapp.tasks.send_email_task',
)

This will add a task to the database that runs every 10 seconds.

Conclusion

Using celery and celery-beat with django is a good combination to run background tasks and schedule tasks.

It’s quite easy to setup and configure, For both static configuration and dynamic configuration.