Django Channels: Real-time WebSockets
python django websockets
HTTP is request-response. But sometimes you need real-time: chat, notifications, live updates. Django Channels extends Django to handle WebSockets and other protocols.
What are Channels?
Django Channels adds:
- WebSocket support
- Background tasks
- Long-running connections
- Multiple protocol handling
Traditional Django:
HTTP Request → View → HTTP Response
Django Channels:
HTTP Request → View → HTTP Response
WebSocket Connect → Consumer → Bidirectional Messages
Background Event → Consumer → Action
Setup
Installation
pip install channels channels-redis
Configuration
# settings.py
INSTALLED_APPS = [
'daphne', # ASGI server
'channels',
# ... other apps
]
ASGI_APPLICATION = 'myproject.asgi.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
},
},
}
ASGI Application
# myproject/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
django_asgi_app = get_asgi_application()
from chat import routing # Import after Django setup
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
),
})
Building a Chat Consumer
Basic Consumer
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# Join room group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
username = self.scope['user'].username
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'username': username,
}
)
async def chat_message(self, event):
# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': event['message'],
'username': event['username'],
}))
Routing
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
Frontend JavaScript
const roomName = 'general';
const chatSocket = new WebSocket(
'ws://' + window.location.host + '/ws/chat/' + roomName + '/'
);
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
console.log('Message:', data.message, 'From:', data.username);
// Update UI
};
chatSocket.onclose = function(e) {
console.error('Chat socket closed unexpectedly');
};
function sendMessage(message) {
chatSocket.send(JSON.stringify({
'message': message
}));
}
Channel Layers
Channel layers enable communication between consumers:
Group Messaging
# Send to everyone in a group
await self.channel_layer.group_send(
"notifications",
{
"type": "notification.new",
"content": "Hello everyone!"
}
)
# Handler method (type converted to method name)
async def notification_new(self, event):
await self.send(text_data=json.dumps({
"notification": event["content"]
}))
Direct Messaging
# Send to specific channel
await self.channel_layer.send(
"specific_channel_name",
{
"type": "private.message",
"content": "Just for you"
}
)
Authentication
Access User in Consumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope['user']
if self.user.is_anonymous:
await self.close()
return
await self.accept()
Token Authentication
# middleware.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
@database_sync_to_async
def get_user(token_key):
try:
token = Token.objects.get(key=token_key)
return token.user
except Token.DoesNotExist:
return AnonymousUser()
class TokenAuthMiddleware:
def __init__(self, inner):
self.inner = inner
async def __call__(self, scope, receive, send):
query_string = scope.get('query_string', b'').decode()
# Parse token from query string
params = dict(x.split('=') for x in query_string.split('&') if '=' in x)
token = params.get('token')
scope['user'] = await get_user(token) if token else AnonymousUser()
return await self.inner(scope, receive, send)
Sending from Views
Send WebSocket messages from regular Django views:
# views.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def create_order(request):
order = Order.objects.create(...)
# Notify via WebSocket
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{request.user.id}",
{
"type": "order.created",
"order_id": order.id,
}
)
return JsonResponse({"status": "ok"})
Practical Example: Live Notifications
Consumer
class NotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
if self.scope['user'].is_anonymous:
await self.close()
return
self.user_group = f"notifications_{self.scope['user'].id}"
await self.channel_layer.group_add(
self.user_group,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.user_group,
self.channel_name
)
async def notification(self, event):
await self.send(text_data=json.dumps({
'type': event['notification_type'],
'message': event['message'],
'data': event.get('data', {}),
}))
Signal Integration
# signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
@receiver(post_save, sender=Comment)
def notify_comment(sender, instance, created, **kwargs):
if created:
channel_layer = get_channel_layer()
user_id = instance.post.author_id
async_to_sync(channel_layer.group_send)(
f"notifications_{user_id}",
{
"type": "notification",
"notification_type": "new_comment",
"message": f"New comment on your post",
"data": {"comment_id": instance.id}
}
)
Running
Development
# Daphne (ASGI server)
daphne myproject.asgi:application
# Or with Django
python manage.py runserver # Uses Daphne automatically
Production
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application
Or with uvicorn:
uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000
Best Practices
Error Handling
async def receive(self, text_data):
try:
data = json.loads(text_data)
await self.handle_message(data)
except json.JSONDecodeError:
await self.send_error("Invalid JSON")
except Exception as e:
logger.exception("WebSocket error")
await self.send_error("Internal error")
Connection Limits
# Track active connections
connected_users = set()
async def connect(self):
if len(connected_users) > MAX_CONNECTIONS:
await self.close()
return
connected_users.add(self.channel_name)
await self.accept()
Heartbeat
# Keep connection alive
async def heartbeat(self):
while True:
await asyncio.sleep(30)
await self.send(text_data=json.dumps({"type": "ping"}))
Final Thoughts
Django Channels opens Django to real-time use cases. Chat, notifications, live updates—all within Django’s familiar patterns.
Start simple. Add complexity as needed. Redis is your friend.
Real-time Django, finally.