Connecting Django to n8n Webhooks
django python n8n automation
n8n is a powerful workflow automation tool. Connecting it to Django opens up possibilities: trigger workflows from Django events, receive processed data back. Here’s how.
The Architecture
Django App n8n
│ │
├── Event occurs ──────────▶ Webhook trigger
│ │
│ ├── Process data
│ │
│◀─── Webhook response ─────┤
│ │
├── or async callback ──────┤
Django → n8n: Sending Webhooks
Simple Webhook Call
# utils/n8n.py
import requests
from django.conf import settings
def trigger_n8n_workflow(workflow_name: str, data: dict) -> dict:
"""Send data to n8n webhook."""
webhook_url = f"{settings.N8N_BASE_URL}/webhook/{workflow_name}"
response = requests.post(
webhook_url,
json=data,
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status()
return response.json()
Using with Django Signals
# signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import Order
from .utils.n8n import trigger_n8n_workflow
@receiver(post_save, sender=Order)
def order_created(sender, instance, created, **kwargs):
if created:
trigger_n8n_workflow("order-created", {
"order_id": instance.id,
"customer_email": instance.customer.email,
"total": str(instance.total),
"items": [
{"name": item.name, "quantity": item.quantity}
for item in instance.items.all()
]
})
Async Webhook Calls
For better performance, use Celery:
# tasks.py
from celery import shared_task
import requests
@shared_task
def send_to_n8n(workflow_name: str, data: dict):
"""Async webhook to n8n."""
webhook_url = f"{settings.N8N_BASE_URL}/webhook/{workflow_name}"
try:
response = requests.post(
webhook_url,
json=data,
timeout=30
)
response.raise_for_status()
return {"status": "success", "response": response.json()}
except requests.RequestException as e:
return {"status": "error", "message": str(e)}
# In your view or signal
from .tasks import send_to_n8n
send_to_n8n.delay("order-created", order_data)
n8n → Django: Receiving Webhooks
Webhook Endpoint
# views.py
import json
import hmac
import hashlib
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
@csrf_exempt
@require_POST
def n8n_webhook(request):
"""Receive webhooks from n8n."""
# Verify signature
if not verify_n8n_signature(request):
return JsonResponse({"error": "Invalid signature"}, status=401)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
# Process based on event type
event_type = data.get("event_type")
if event_type == "enrichment_complete":
handle_enrichment(data)
elif event_type == "notification_sent":
handle_notification_sent(data)
else:
return JsonResponse({"error": "Unknown event"}, status=400)
return JsonResponse({"status": "received"})
def verify_n8n_signature(request):
"""Verify webhook signature."""
signature = request.headers.get("X-N8N-Signature")
if not signature:
return False
expected = hmac.new(
settings.N8N_WEBHOOK_SECRET.encode(),
request.body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
URL Configuration
# urls.py
from django.urls import path
from . import views
urlpatterns = [
path("webhooks/n8n/", views.n8n_webhook, name="n8n_webhook"),
]
Practical Example: Order Processing
Django Side
# models.py
class Order(models.Model):
STATUS_CHOICES = [
("pending", "Pending"),
("processing", "Processing"),
("enriched", "Enriched"),
("completed", "Completed"),
]
customer = models.ForeignKey(User, on_delete=models.CASCADE)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default="pending")
total = models.DecimalField(max_digits=10, decimal_places=2)
metadata = models.JSONField(default=dict)
# views.py
@csrf_exempt
@require_POST
def n8n_order_enriched(request):
"""Receive enriched order data from n8n."""
data = json.loads(request.body)
order_id = data.get("order_id")
enrichment_data = data.get("enrichment")
try:
order = Order.objects.get(id=order_id)
order.metadata["enrichment"] = enrichment_data
order.status = "enriched"
order.save()
return JsonResponse({"status": "updated"})
except Order.DoesNotExist:
return JsonResponse({"error": "Order not found"}, status=404)
n8n Workflow
{
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "order-created",
"httpMethod": "POST"
}
},
{
"name": "Enrich Data",
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"url": "https://api.clearbit.com/v2/companies/find",
"qs": {
"domain": "={{ $json.customer_email.split('@')[1] }}"
}
}
},
{
"name": "Send Back to Django",
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"method": "POST",
"url": "https://your-django-app.com/webhooks/n8n/",
"body": {
"order_id": "={{ $node['Webhook'].json.order_id }}",
"enrichment": "={{ $node['Enrich Data'].json }}"
}
}
}
]
}
Security Best Practices
1. Signature Verification
# settings.py
N8N_WEBHOOK_SECRET = os.environ.get("N8N_WEBHOOK_SECRET")
# n8n: Configure webhook to send signature header
2. IP Allowlisting
# middleware.py
class N8NIPAllowlistMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.allowed_ips = settings.N8N_ALLOWED_IPS
def __call__(self, request):
if request.path.startswith("/webhooks/n8n/"):
client_ip = request.META.get("HTTP_X_FORWARDED_FOR", "").split(",")[0].strip()
if client_ip not in self.allowed_ips:
return JsonResponse({"error": "Forbidden"}, status=403)
return self.get_response(request)
3. Rate Limiting
from django.core.cache import cache
def rate_limit_webhook(request):
"""Simple rate limiting for webhooks."""
ip = request.META.get("REMOTE_ADDR")
key = f"webhook_rate:{ip}"
count = cache.get(key, 0)
if count >= 100: # 100 requests per minute
return False
cache.set(key, count + 1, 60)
return True
Error Handling
Retry Logic in Django
@shared_task(bind=True, max_retries=3)
def send_to_n8n_with_retry(self, workflow_name: str, data: dict):
try:
response = requests.post(
f"{settings.N8N_BASE_URL}/webhook/{workflow_name}",
json=data,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
Dead Letter Queue
@shared_task
def send_to_n8n_dlq(workflow_name: str, data: dict, error: str):
"""Store failed webhook for manual review."""
FailedWebhook.objects.create(
workflow_name=workflow_name,
data=data,
error=error,
created_at=timezone.now()
)
Final Thoughts
n8n + Django is a powerful combination. Use Django for your core application logic, n8n for integrations and workflows that would be tedious to code.
Start with simple webhooks, add complexity as needed.
Automate the boring stuff, code the interesting parts.