Using Celery with Django for Asynchronous Tasks
Introduction
Celery is a powerful, distributed task queue that can be used with Django to handle asynchronous tasks and background processing. This guide will walk you through setting up Celery with Django and demonstrate how to use it effectively in your projects.
Table of Contents
Installation
First, you need to install Celery and its dependencies:
pip install celery redis django-celery-results
We’ll use Redis as our message broker, but you can also use RabbitMQ or other brokers supported by Celery.
Configuration
Django Settings
Add the following to your Django settings.py
:
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
# Add django_celery_results to INSTALLED_APPS
INSTALLED_APPS = [
...
'django_celery_results',
]
Celery Configuration
Create a new file celery.py
in your Django project directory (where settings.py
is located):
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
# Use a string here to make sure the worker doesn't serialize the object.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Update your project’s __init__.py
:
from .celery import app as celery_app
__all__ = ('celery_app',)
Creating Tasks
Create a tasks.py
file in your Django app directory:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def multiply(x, y):
return x * y
@shared_task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Running Tasks
You can run tasks in your Django views or anywhere else in your Django code:
from django.http import HttpResponse
from .tasks import add, multiply
def index(request):
add.delay(7, 8)
multiply.delay(7, 8)
return HttpResponse("Tasks are running in the background!")
The delay()
method is used to execute the task asynchronously.
Periodic Tasks
To set up periodic tasks, you can use Celery Beat. First, install the Django Celery Beat package:
pip install django-celery-beat
Add it to your INSTALLED_APPS
:
INSTALLED_APPS = [
...
'django_celery_beat',
]
Then, you can define periodic tasks in your Django admin interface or in your code:
from celery.schedules import crontab
from django.conf import settings
settings.CELERY_BEAT_SCHEDULE = {
'add-every-minute-contrab': {
'task': 'your_app.tasks.add',
'schedule': crontab(minute='*/1'),
'args': (16, 16),
},
}
Monitoring
You can use Flower, a web-based tool for monitoring Celery:
pip install flower
celery -A your_project flower
Then visit http://localhost:5555
to see the Flower dashboard.
Best Practices
-
Use
shared_task
decorator: This allows your tasks to be used by multiple apps. -
Handle exceptions: Wrap your task logic in try-except blocks to handle errors gracefully.
-
Set timeouts: Use the
soft_time_limit
andtime_limit
options to prevent tasks from running indefinitely. -
Use task states: Update task states to track progress and provide feedback.
-
Use task priority: Set task priorities to ensure critical tasks are processed first.
-
Optimize database queries: Ensure your tasks are not causing N+1 query problems.
-
Use task queues: Separate your tasks into different queues based on their nature (e.g., email queue, data processing queue).
Example of a more robust task:
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
logger = get_task_logger(__name__)
@shared_task(bind=True, max_retries=3, soft_time_limit=20)
def send_notification_email(self, user_id, subject, message):
try:
user = User.objects.get(id=user_id)
send_mail(
subject,
message,
'from@example.com',
[user.email],
fail_silently=False,
)
except User.DoesNotExist:
logger.error(f"User with id {user_id} does not exist")
except Exception as exc:
logger.error(f"Error sending email to user {user_id}: {exc}")
raise self.retry(exc=exc, countdown=60) # Retry after 1 minute