Celery Best Practices for Django Background Tasks

python backend django celery

Django handles requests synchronously. Sending emails, processing uploads, generating reports—these shouldn’t block users. Celery offloads work to background workers.

Basic Setup

pip install celery redis
# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

Writing Tasks

# myapp/tasks.py
from celery import shared_task

@shared_task
def send_welcome_email(user_id):
    user = User.objects.get(id=user_id)
    send_email(user.email, "Welcome!", "...")
    return f"Email sent to {user.email}"
# Calling the task
from myapp.tasks import send_welcome_email

# Async execution (non-blocking)
send_welcome_email.delay(user.id)

# With options
send_welcome_email.apply_async(
    args=[user.id],
    countdown=60,  # Delay 60 seconds
    expires=3600,  # Expire if not run in 1 hour
)

Best Practices

Pass IDs, Not Objects

# Bad - serializes entire object
@shared_task
def process_order(order):
    order.process()

# Good - fetch fresh from database
@shared_task
def process_order(order_id):
    order = Order.objects.get(id=order_id)
    order.process()

Why? Objects can be stale. Database state is truth.

Idempotency

Tasks may run more than once. Make them safe to retry:

@shared_task(bind=True, max_retries=3)
def charge_customer(self, order_id):
    order = Order.objects.get(id=order_id)
    
    # Check if already processed
    if order.payment_status == 'completed':
        return "Already charged"
    
    try:
        charge = process_payment(order)
        order.payment_status = 'completed'
        order.save()
        return charge.id
    except PaymentError as e:
        raise self.retry(exc=e, countdown=60)

Error Handling

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(TransientError,),
    retry_backoff=True,  # Exponential backoff
    retry_backoff_max=600,
)
def unreliable_task(self, data):
    try:
        external_api.call(data)
    except PermanentError:
        # Don't retry
        raise
    except TransientError:
        # Retry automatically via autoretry_for
        raise

Timeouts

@shared_task(soft_time_limit=300, time_limit=360)
def long_running_task():
    try:
        # Work that might take a while
        process_large_file()
    except SoftTimeLimitExceeded:
        # Clean up gracefully
        cleanup()
        raise

Task Routing

Separate queues for different task types:

# settings.py
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'emails'},
    'myapp.tasks.process_image': {'queue': 'images'},
    'myapp.tasks.generate_report': {'queue': 'reports'},
}
# Run workers for specific queues
celery -A myproject worker -Q emails -c 2
celery -A myproject worker -Q images -c 4
celery -A myproject worker -Q reports -c 1

Periodic Tasks (Celery Beat)

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'daily-cleanup': {
        'task': 'myapp.tasks.cleanup_old_records',
        'schedule': crontab(hour=3, minute=0),
    },
    'every-5-minutes': {
        'task': 'myapp.tasks.sync_external_data',
        'schedule': 300.0,  # seconds
    },
}
celery -A myproject beat

Or use django-celery-beat for database-backed schedules.

Task Chaining

from celery import chain, group, chord

# Sequential: A → B → C
chain(task_a.s(), task_b.s(), task_c.s())()

# Parallel: A, B, C run simultaneously
group(task_a.s(), task_b.s(), task_c.s())()

# Fan-out/fan-in: parallel tasks, then aggregate
chord(
    [task_a.s(), task_b.s(), task_c.s()],
    aggregate_results.s()
)()

Monitoring

Flower

pip install flower
celery -A myproject flower --port=5555

Web UI for monitoring tasks, workers, queues.

Django Admin Integration

pip install django-celery-results
# settings.py
INSTALLED_APPS = [..., 'django_celery_results']
CELERY_RESULT_BACKEND = 'django-db'

View task results in Django admin.

Production Configuration

# settings.py
CELERY_BROKER_URL = os.environ.get('REDIS_URL')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL')

# Reliability settings
CELERY_TASK_ACKS_LATE = True  # Ack after task completes
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # Fairer distribution
CELERY_TASK_REJECT_ON_WORKER_LOST = True  # Requeue on worker crash

# Visibility timeout
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'visibility_timeout': 43200,  # 12 hours
}

# Result expiration
CELERY_RESULT_EXPIRES = 86400  # 24 hours

Testing

from django.test import TestCase, override_settings

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
class TaskTests(TestCase):
    def test_task_execution(self):
        result = my_task.delay(arg)
        self.assertEqual(result.get(), expected)

Or use pytest-celery:

@pytest.fixture
def celery_config():
    return {'task_always_eager': True}

Anti-Patterns

Database Queries in Loops

# Bad
@shared_task
def process_all_users():
    for user in User.objects.all():
        process_user(user)  # Slow, blocks worker

# Good
@shared_task
def process_all_users():
    for user_id in User.objects.values_list('id', flat=True):
        process_user.delay(user_id)  # Spawns sub-tasks

No Rate Limiting

# Good - limit external API calls
@shared_task(rate_limit='10/m')  # 10 per minute
def call_external_api(data):
    api.call(data)

Ignoring Results You Don’t Need

# If you don't need the result
@shared_task(ignore_result=True)
def fire_and_forget():
    do_work()

Final Thoughts

Celery is powerful but has sharp edges. Start simple—basic tasks with retries. Add complexity as needed.

Monitor your queues. Set timeouts. Make tasks idempotent. Your sleeping hours depend on it.


Background tasks done right. Sleep peacefully.

All posts