Celery and its batch processing feature
What is Celery?
Celery is task queuing mechanism which works in async manner to complete your task. Let’s take an example, you have created an user for web application and you want some background processing like sending email to user, generate report for that particular user etc etc.. That place celery come in the picture. Here just tell the celery to send emails and generate report in the background by creating tasks. In celery also you can execute your task like scheduler, you can schedule your task and celery will execute at configure time.
Working of celery
Celery have 3 steps Producer, Queue Broker and Consumer.
Producer: Producers are basically create the task and insert the events into queue broker. Suppose User create request come in back-end so your logic will just create the user and also create celery task to send email.
Queue Broker: Queue Broker can be redis or RabbitMQ. When Producer will create task for celery the task will first go into the broker and then Consumer will take from broker at given configuration.
Consumer: Consumer is basically called worker which will execute the task.
Let’s Implement Celery
Requirements:
pip install django
pip install celery
pip install redis
pip install celery-batches (For batch Processing)
Create Django Project by using
django-admin startproject celery_demoCreate Django app by using(cd celery_demo)
python manage.py startapp celery_app
Add celery_app inside INSTALLED_APPS array in celery_demo/settings.py file
Now Project Structure Looks like below
Update the below files to run the django server and can render html
Update the celery_demo/urls.py (add new path to send the request in views.py method)
Create templates folder and add simple index.html which will just render html file.
Update celery_app/views.py (just render index,html file)
Now Come to Celery Part
Add Below Line in settings.py
CELERY_BROKER_URL = ‘redis://127.0.0.1:6379’CELERY_ACCEPT_CONTENT = [‘application/json’]CELERY_RESULT_SERIALIZER = ‘json’CELERY_TASK_SERIALIZER = ‘json’CELERY_TIMEZONE = ‘Asia/Kolkata’
Create celery.py file at the same location where your settings.py is there
from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celery# set the default Django settings module for the ‘celery’ program.
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’,‘celery_demo.settings’)app = Celery(‘celery_demo’)
app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Now we need to start celery at the same time when our django server load so we need to edit __init__.py file which is present at same location where our settings.py is there
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = (‘celery_demo’,)
After that we need to define the task which we need to perform, for that we need to create a tasks.py file inside our app directory(where views.py file is there)
from celery_demo.celery import app
@app.task(bind=True)
def send_email(self, data):
print("Sending Email")
# Your logic to send email
Schedule sending email task from views.py once your api invoke
from django.shortcuts import render
from .tasks import send_email
# Create your views here.
def schedule_task(request):
send_email.delay("UserID") # This will create task for celery
return render(request, "index.html")
Start the celery worker by using below command
celery -A celery_demo.celery worker — loglevel=debug
Boom ! Your celery worker started lets hit the request and see the task
Now come to celery Batches
Celery batches workers take events in batches on the basis of time or number of request. Suppose you have configured that it take events every 15 seconds so it will take events from buffer(redis-broker) every 15 seconds, similar for number of requests as well.
To Implement Celery worker we need to update the our tasks.py as below configuration.
from celery_demo.celery import app
from celery_batches import Batches
@app.task(base=Batches, flush_every=3, flush_interval=10)
def send_email(requests):
for request in requests:
print("Sending Email")
# Your logic to send email
Run the celery worker again and see the some awesome results. Your worker are taking events in batches either 3 requests batches(because of flush_every=3) or every 10 seconds(because of flush_interval=10)
Enjoy Happy Coding….. Suggestions are Welcome :-)