Back to Blog

Asynchronous Task Processing with Celery and Django

admin
December 2, 2025 4 min read
11 views
Master background task processing using Celery for email sending, report generation, and long-running operations.

# Asynchronous Task Processing with Celery

Background tasks are essential for any production Django application. Here's how we use Celery in DjangoZen for emails, notifications, and more.

## Why Celery?

- **Non-blocking operations**: Don't make users wait
- **Scheduled tasks**: Cron-like periodic tasks
- **Retry logic**: Automatic failure recovery
- **Scalability**: Distribute tasks across workers
- **Monitoring**: Track task status and results

## Installation & Setup

```bash
pip install celery redis
```

Create `celery.py` in your project:

```python
# djzen/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djzen.settings')

app = Celery('djzen')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# Optional: Configure task routes
app.conf.task_routes = {
'eshop.tasks.send_*': {'queue': 'emails'},
'eshop.tasks.generate_*': {'queue': 'reports'},
}
```

Configure settings:

```python
# settings.py
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

# Task settings
CELERY_TASK_ALWAYS_EAGER = False # Set True for testing
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
```

## Creating Tasks

### Email Sending Task

```python
# eshop/tasks.py
from celery import shared_task
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.conf import settings

@shared_task(bind=True, max_retries=3)
def send_order_confirmation(self, order_id):
"""Send order confirmation email"""
try:
from .models import Order
order = Order.objects.select_related('user').prefetch_related('items__product').get(id=order_id)

subject = f'Order Confirmation #{order.order_number}'

# Render templates
html_content = render_to_string('emails/order_confirmation.html', {
'order': order,
'site_url': settings.SITE_URL,
})
text_content = render_to_string('emails/order_confirmation.txt', {
'order': order,
})

# Send email
email = EmailMultiAlternatives(
subject=subject,
body=text_content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[order.user.email],
)
email.attach_alternative(html_content, 'text/html')
email.send()

# Update order
order.confirmation_sent = True
order.save(update_fields=['confirmation_sent'])

return f'Email sent to {order.user.email}'

except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@shared_task
def send_newsletter(subscriber_ids, subject, content):
"""Send newsletter to multiple subscribers"""
from .models import NewsletterSubscription

subscribers = NewsletterSubscription.objects.filter(
id__in=subscriber_ids,
is_active=True
)

sent_count = 0
for subscriber in subscribers:
try:
html_content = render_to_string('emails/newsletter.html', {
'content': content,
'email': subscriber.email,
'unsubscribe_url': f'{settings.SITE_URL}/newsletter/unsubscribe/?email={subscriber.email}',
})

email = EmailMultiAlternatives(
subject=subject,
body=content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=[subscriber.email],
)
email.attach_alternative(html_content, 'text/html')
email.send()
sent_count += 1

except Exception as e:
print(f'Failed to send to {subscriber.email}: {e}')

return f'Sent {sent_count}/{len(subscriber_ids)} emails'
```

### Report Generation

```python
@shared_task(bind=True, time_limit=300)
def generate_sales_report(self, user_id, date_from, date_to):
"""Generate sales report PDF"""
import io
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from django.core.files.base import ContentFile

from .models import Order, Report

# Query data
orders = Order.objects.filter(
created_at__range=[date_from, date_to],
status='completed'
).select_related('user')

# Generate PDF
buffer = io.BytesIO()
p = canvas.Canvas(buffer, pagesize=letter)

# Add content...
p.drawString(100, 750, f'Sales Report: {date_from} to {date_to}')
p.drawString(100, 730, f'Total Orders: {orders.count()}')
p.drawString(100, 710, f'Total Revenue: €{orders.aggregate(total=Sum("total"))["total"]:.2f}')

p.save()
buffer.seek(0)

# Save report
report = Report.objects.create(
user_id=user_id,
report_type='sales',
status='completed',
)
report.file.save(
f'sales_report_{date_from}_{date_to}.pdf',
ContentFile(buffer.read())
)

# Notify user
send_notification.delay(
user_id,
'Your sales report is ready',
f'/reports/{report.id}/'
)

return report.id
```

### License Generation

```python
@shared_task
def generate_license_keys(order_id):
"""Generate license keys for purchased products"""
import secrets
from .models import Order, License

order = Order.objects.prefetch_related('items__product').get(id=order_id)

licenses_created = []
for item in order.items.filter(product__requires_license=True):
for _ in range(item.quantity):
license_key = f'DJZ-{secrets.token_hex(4).upper()}-{secrets.token_hex(4).upper()}'

license = License.objects.create(
user=order.user,
product=item.product,
order=order,
license_key=license_key,
license_type=item.license_type,
is_active=True,
)
licenses_created.append(license_key)

return licenses_created
```

## Periodic Tasks

Schedule recurring tasks with Celery Beat:

```python
# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'eshop.tasks.cleanup_expired_sessions',
'schedule': crontab(hour=3, minute=0), # Daily at 3 AM
},
'send-trial-expiration-reminders': {
'task': 'eshop.tasks.send_trial_expiration_emails',
'schedule': crontab(hour=9, minute=0), # Daily at 9 AM
},
'update-site-statistics': {
'task': 'eshop.tasks.update_site_stats',
'schedule': crontab(minute='*/15'), # Every 15 minutes
},
'generate-sitemap': {
'task': 'eshop.tasks.generate_sitemap',
'schedule': crontab(hour=2, minute=0, day_of_week=0), # Weekly
},
}
```

```python
# tasks.py
@shared_task
def cleanup_expired_sessions():
"""Remove expired download links and sessions"""
from django.utils import timezone
from .models import DownloadLink

expired = DownloadLink.objects.filter(
expires_at__lt=timezone.now()
)
count = expired.count()
expired.delete()

return f'Cleaned up {count} expired download links'

@shared_task
def send_trial_expiration_emails():
"""Send reminder emails for expiring trials"""
from datetime import timedelta
from django.utils import timezone
from .models import License

# Trials expiring in 3 days
expiring_soon = License.objects.filter(
license_type='trial',
is_active=True,
expires_at__date=timezone.now().date() + timedelta(days=3),
expiration_reminder_sent=False
).select_related('user', 'product')

for license in expiring_soon:
send_trial_expiration_reminder.delay(license.id)
license.expiration_reminder_sent = True
license.save(update_fields=['expiration_reminder_sent'])

return f'Sent {expiring_soon.count()} expiration reminders'
```

## Running Celery

Development:

```bash
# Start worker
celery -A djzen worker -l INFO

# Start beat scheduler
celery -A djzen beat -l INFO

# Or both together
celery -A djzen worker -B -l INFO
```

Production with systemd:

```ini
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Worker
After=network.target

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/djzen
ExecStart=/var/www/djzen/venv/bin/celery -A djzen multi start worker1 \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log \
--loglevel=INFO
ExecStop=/var/www/djzen/venv/bin/celery multi stopwait worker1 \
--pidfile=/var/run/celery/%n.pid
ExecReload=/var/www/djzen/venv/bin/celery -A djzen multi restart worker1 \
--pidfile=/var/run/celery/%n.pid

[Install]
WantedBy=multi-user.target
```

## Monitoring with Flower

```bash
pip install flower
celery -A djzen flower --port=5555
```

## Best Practices

1. **Keep tasks small** - Break large tasks into subtasks
2. **Use task chains** - `chain(task1.s(), task2.s())`
3. **Handle failures** - Implement retry logic
4. **Set time limits** - Prevent runaway tasks
5. **Monitor queues** - Watch for backlogs
6. **Use priorities** - Critical tasks first

Celery makes your Django app production-ready. Check our templates with Celery pre-configured!

Comments (0)

Please login to leave a comment.

No comments yet. Be the first to comment!