Celery Best Practices for Django Background Tasks
python backend django celery
Django handles requests synchronously. Sending emails, processing uploads, generating reports—these shouldn’t block users. Celery offloads work to background workers.
Basic Setup
pip install celery redis
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
Writing Tasks
# myapp/tasks.py
from celery import shared_task
@shared_task
def send_welcome_email(user_id):
user = User.objects.get(id=user_id)
send_email(user.email, "Welcome!", "...")
return f"Email sent to {user.email}"
# Calling the task
from myapp.tasks import send_welcome_email
# Async execution (non-blocking)
send_welcome_email.delay(user.id)
# With options
send_welcome_email.apply_async(
args=[user.id],
countdown=60, # Delay 60 seconds
expires=3600, # Expire if not run in 1 hour
)
Best Practices
Pass IDs, Not Objects
# Bad - serializes entire object
@shared_task
def process_order(order):
order.process()
# Good - fetch fresh from database
@shared_task
def process_order(order_id):
order = Order.objects.get(id=order_id)
order.process()
Why? Objects can be stale. Database state is truth.
Idempotency
Tasks may run more than once. Make them safe to retry:
@shared_task(bind=True, max_retries=3)
def charge_customer(self, order_id):
order = Order.objects.get(id=order_id)
# Check if already processed
if order.payment_status == 'completed':
return "Already charged"
try:
charge = process_payment(order)
order.payment_status = 'completed'
order.save()
return charge.id
except PaymentError as e:
raise self.retry(exc=e, countdown=60)
Error Handling
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(TransientError,),
retry_backoff=True, # Exponential backoff
retry_backoff_max=600,
)
def unreliable_task(self, data):
try:
external_api.call(data)
except PermanentError:
# Don't retry
raise
except TransientError:
# Retry automatically via autoretry_for
raise
Timeouts
@shared_task(soft_time_limit=300, time_limit=360)
def long_running_task():
try:
# Work that might take a while
process_large_file()
except SoftTimeLimitExceeded:
# Clean up gracefully
cleanup()
raise
Task Routing
Separate queues for different task types:
# settings.py
CELERY_TASK_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.process_image': {'queue': 'images'},
'myapp.tasks.generate_report': {'queue': 'reports'},
}
# Run workers for specific queues
celery -A myproject worker -Q emails -c 2
celery -A myproject worker -Q images -c 4
celery -A myproject worker -Q reports -c 1
Periodic Tasks (Celery Beat)
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'daily-cleanup': {
'task': 'myapp.tasks.cleanup_old_records',
'schedule': crontab(hour=3, minute=0),
},
'every-5-minutes': {
'task': 'myapp.tasks.sync_external_data',
'schedule': 300.0, # seconds
},
}
celery -A myproject beat
Or use django-celery-beat for database-backed schedules.
Task Chaining
from celery import chain, group, chord
# Sequential: A → B → C
chain(task_a.s(), task_b.s(), task_c.s())()
# Parallel: A, B, C run simultaneously
group(task_a.s(), task_b.s(), task_c.s())()
# Fan-out/fan-in: parallel tasks, then aggregate
chord(
[task_a.s(), task_b.s(), task_c.s()],
aggregate_results.s()
)()
Monitoring
Flower
pip install flower
celery -A myproject flower --port=5555
Web UI for monitoring tasks, workers, queues.
Django Admin Integration
pip install django-celery-results
# settings.py
INSTALLED_APPS = [..., 'django_celery_results']
CELERY_RESULT_BACKEND = 'django-db'
View task results in Django admin.
Production Configuration
# settings.py
CELERY_BROKER_URL = os.environ.get('REDIS_URL')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL')
# Reliability settings
CELERY_TASK_ACKS_LATE = True # Ack after task completes
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Fairer distribution
CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue on worker crash
# Visibility timeout
CELERY_BROKER_TRANSPORT_OPTIONS = {
'visibility_timeout': 43200, # 12 hours
}
# Result expiration
CELERY_RESULT_EXPIRES = 86400 # 24 hours
Testing
from django.test import TestCase, override_settings
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
class TaskTests(TestCase):
def test_task_execution(self):
result = my_task.delay(arg)
self.assertEqual(result.get(), expected)
Or use pytest-celery:
@pytest.fixture
def celery_config():
return {'task_always_eager': True}
Anti-Patterns
Database Queries in Loops
# Bad
@shared_task
def process_all_users():
for user in User.objects.all():
process_user(user) # Slow, blocks worker
# Good
@shared_task
def process_all_users():
for user_id in User.objects.values_list('id', flat=True):
process_user.delay(user_id) # Spawns sub-tasks
No Rate Limiting
# Good - limit external API calls
@shared_task(rate_limit='10/m') # 10 per minute
def call_external_api(data):
api.call(data)
Ignoring Results You Don’t Need
# If you don't need the result
@shared_task(ignore_result=True)
def fire_and_forget():
do_work()
Final Thoughts
Celery is powerful but has sharp edges. Start simple—basic tasks with retries. Add complexity as needed.
Monitor your queues. Set timeouts. Make tasks idempotent. Your sleeping hours depend on it.
Background tasks done right. Sleep peacefully.