Django Channels: Real-time WebSockets

python django websockets

HTTP is request-response. But sometimes you need real-time: chat, notifications, live updates. Django Channels extends Django to handle WebSockets and other protocols.

What are Channels?

Django Channels adds:

Traditional Django:
  HTTP Request → View → HTTP Response

Django Channels:
  HTTP Request → View → HTTP Response
  WebSocket Connect → Consumer → Bidirectional Messages
  Background Event → Consumer → Action

Setup

Installation

pip install channels channels-redis

Configuration

# settings.py
INSTALLED_APPS = [
    'daphne',  # ASGI server
    'channels',
    # ... other apps
]

ASGI_APPLICATION = 'myproject.asgi.application'

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

ASGI Application

# myproject/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

django_asgi_app = get_asgi_application()

from chat import routing  # Import after Django setup

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": AuthMiddlewareStack(
        URLRouter(
            routing.websocket_urlpatterns
        )
    ),
})

Building a Chat Consumer

Basic Consumer

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        
        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']
        username = self.scope['user'].username
        
        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message,
                'username': username,
            }
        )
    
    async def chat_message(self, event):
        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': event['message'],
            'username': event['username'],
        }))

Routing

# chat/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

Frontend JavaScript

const roomName = 'general';
const chatSocket = new WebSocket(
    'ws://' + window.location.host + '/ws/chat/' + roomName + '/'
);

chatSocket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    console.log('Message:', data.message, 'From:', data.username);
    // Update UI
};

chatSocket.onclose = function(e) {
    console.error('Chat socket closed unexpectedly');
};

function sendMessage(message) {
    chatSocket.send(JSON.stringify({
        'message': message
    }));
}

Channel Layers

Channel layers enable communication between consumers:

Group Messaging

# Send to everyone in a group
await self.channel_layer.group_send(
    "notifications",
    {
        "type": "notification.new",
        "content": "Hello everyone!"
    }
)

# Handler method (type converted to method name)
async def notification_new(self, event):
    await self.send(text_data=json.dumps({
        "notification": event["content"]
    }))

Direct Messaging

# Send to specific channel
await self.channel_layer.send(
    "specific_channel_name",
    {
        "type": "private.message",
        "content": "Just for you"
    }
)

Authentication

Access User in Consumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user = self.scope['user']
        
        if self.user.is_anonymous:
            await self.close()
            return
        
        await self.accept()

Token Authentication

# middleware.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token

@database_sync_to_async
def get_user(token_key):
    try:
        token = Token.objects.get(key=token_key)
        return token.user
    except Token.DoesNotExist:
        return AnonymousUser()

class TokenAuthMiddleware:
    def __init__(self, inner):
        self.inner = inner
    
    async def __call__(self, scope, receive, send):
        query_string = scope.get('query_string', b'').decode()
        # Parse token from query string
        params = dict(x.split('=') for x in query_string.split('&') if '=' in x)
        token = params.get('token')
        
        scope['user'] = await get_user(token) if token else AnonymousUser()
        return await self.inner(scope, receive, send)

Sending from Views

Send WebSocket messages from regular Django views:

# views.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

def create_order(request):
    order = Order.objects.create(...)
    
    # Notify via WebSocket
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{request.user.id}",
        {
            "type": "order.created",
            "order_id": order.id,
        }
    )
    
    return JsonResponse({"status": "ok"})

Practical Example: Live Notifications

Consumer

class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        if self.scope['user'].is_anonymous:
            await self.close()
            return
        
        self.user_group = f"notifications_{self.scope['user'].id}"
        
        await self.channel_layer.group_add(
            self.user_group,
            self.channel_name
        )
        await self.accept()
    
    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.user_group,
            self.channel_name
        )
    
    async def notification(self, event):
        await self.send(text_data=json.dumps({
            'type': event['notification_type'],
            'message': event['message'],
            'data': event.get('data', {}),
        }))

Signal Integration

# signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@receiver(post_save, sender=Comment)
def notify_comment(sender, instance, created, **kwargs):
    if created:
        channel_layer = get_channel_layer()
        user_id = instance.post.author_id
        
        async_to_sync(channel_layer.group_send)(
            f"notifications_{user_id}",
            {
                "type": "notification",
                "notification_type": "new_comment",
                "message": f"New comment on your post",
                "data": {"comment_id": instance.id}
            }
        )

Running

Development

# Daphne (ASGI server)
daphne myproject.asgi:application

# Or with Django
python manage.py runserver  # Uses Daphne automatically

Production

daphne -b 0.0.0.0 -p 8000 myproject.asgi:application

Or with uvicorn:

uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000

Best Practices

Error Handling

async def receive(self, text_data):
    try:
        data = json.loads(text_data)
        await self.handle_message(data)
    except json.JSONDecodeError:
        await self.send_error("Invalid JSON")
    except Exception as e:
        logger.exception("WebSocket error")
        await self.send_error("Internal error")

Connection Limits

# Track active connections
connected_users = set()

async def connect(self):
    if len(connected_users) > MAX_CONNECTIONS:
        await self.close()
        return
    
    connected_users.add(self.channel_name)
    await self.accept()

Heartbeat

# Keep connection alive
async def heartbeat(self):
    while True:
        await asyncio.sleep(30)
        await self.send(text_data=json.dumps({"type": "ping"}))

Final Thoughts

Django Channels opens Django to real-time use cases. Chat, notifications, live updates—all within Django’s familiar patterns.

Start simple. Add complexity as needed. Redis is your friend.


Real-time Django, finally.

All posts