We have discussed what are the different ways to execute asynchronous tasks concurrently and why you should use celery to avoid all the complexities while processing your asynchronous tasks in the last blog. In this blog, we will discuss how you can use celery with django and common problems people face and their solution.
Contents
Process tasks simultaneously with Celery
Now when we have discussed concepts of asynchronous processing and how celery can avoid all these complexity, we can further discuss the ways celery provides to run tasks simultaneously.
I am assuming that you have already gone through the celery documentation regarding what are the basic concepts of celery and how to create a task in celery. If you have not gone through that, i suggest to please read about some concepts from here.
To design the flow of your tasks, celery provides primitives, primitives in its simplest form could be the wrapper which takes the task as function and arguments which needs to be provided to that function at the time of actual executions.
group
The Group takes the list of tasks and executes all of them in parallel.]
from celery import group
res = group(add.s(i, i) for i in range(10))()
res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
chain
In simple words, Chain let us set the sequence of callbacks with async tasks. In the following example, chain first process add.s(2, 2) and then passes the result to add.s(4) as an argument and so on.
from celery import chain
# 2 + 2 + 4 + 8
res = chain(add.s(2, 2), add.s(4), add.s(8))()
res.get()
16
chord
A chord is just like a group but with a callback. A chord consists of 2 parts, header group and body group, where the header group is the list of the tasks which are executed in parallel as they would be executed in parallel with the using of group and the resulted list of results from these tasks are then passed to the body group which is simply a callback celery task.
from celery import chord
res = chord((add.s(i, i) for i in range(10)), xsum.s())()
res.get()
90
map
The map primitive works like the built-in map function, but creates a temporary task where a list of arguments is applied to the task. For example, task.map([1, 2]) – results in a single task being called, applying the arguments in order to the task function so that the result is:
res = [task(1), task(2)]
startmap
Works exactly like map except the arguments are applied as *args. For example add.starmap([(2, 2), (4, 4)]) results in a single task calling:
res = [add(2, 2), add(4, 4)]
chunks
Chunking splits a long list of arguments into parts, for example the operation:
items = zip(range(1000), range(1000)) # 1000 items
add,chunks(items, 10)It will split the list of items into chunks of 10, resulting in 100 tasks.
(each processing 10 items in sequence).
Common Problems: Database Transactions, Deadlocks and Race Conditions
The most common problem we face when executing tasks in parallel is often we get stuck in database transactions whether due to deadlock or race conditions.
So let’s discuss the way we can avoid them:
Deadlocks –
We should know that when we query on a relational database, the records on which the query is executed get locked to enforce ACID(Atomicity, Consistency, Isolation, Durability) property of the database.
To bypass this you can simply change the Transaction Isolation Level of your database to read committed, which will only hold lock when a record is updated or deleted. There is also one more lenient isolation level that is read uncommitted, that will let you query on the rows even when a transaction doing update or delete hasn’t committed the changes on the database. So it will let you read uncommitted changes, that is why it’s known as dirty reading and it’s not recommended, as it can violate ACID property. So if your tasks are updating or deleting common records, you must reconsider refactoring your task.
The other way of handling this is by changing the way you are querying with django’s ORM.
For example, if you are querying something like this:
ModelOne.objects.filter(fk1=fk1_obj, fk2=fk2_obj).update(something=new_val)
This will hold the lock on all the objects of ModelOne because of the way django’s ORM creates queries for this.
rows_list = list(ModelOne.objects.filter(fk1=fk1_obj, fk2=fk2_obj).values_list(‘id’, flat=True))
ModelOne.objects.filter(id__in=rows_list).update(something=new_val)Due to change in query, db will not hold for all the rows of the ModelOne table.
Race condition –
As these tasks can sometimes spawn a new process from the main flow which means that they will have different database transactions, due to which sometimes, race conditions happen. The most common example of race condition is, say, on signup you want to send the welcome email to a newly onboarded user, as this is not related to the main logic, you decided to execute sending of the email as a non blocking celery task which will accept the id of the newly created profile. So sometimes the task got executed before the creation of the profile object in the database.
To fix this, django has provided a package django.db.transaction, It consists of various high and low level API to let you control the database transactions.
Following is the demo code of handling the scenario of above situation:
from django.db import transaction
transaction.on_commit(lambda : send_email.delay(profile_obj.id))
Conclusion
Now, You know all that are needed for creating asynchronous tasks and running them concurrently to increase the efficiency of your code. So it’s up to us and our imagination where we want to apply all these, whether to create notification service or optimizing cleaning services.
[“source=technoarchsoftwares”]