Testing and monitoring Celery tasks

Reasons to use background tasks

There are many situations where we need to use background (asynchronous) tasks instead of synchronous — one of the particular reasons is to improve user experience (e.g. user doesn't need to wait til sending of registration email will finish, even if it's quite fast). It's especially usable for the long-lasting processes — calls to third-party APIs etc.

Another reason to use Celery tasks — we can schedule them for necessary time (e.g. each morning we can generate some reports based on data from previous day).

Despite the reason how we use our tasks we need to be able to check that all of them are properly configured in our project as well as to monitor them during runtime.

Celery configuration validation

To run scheduled tasks with Celery in Django project you need to specify CELERYBEAT_SCHEDULE in the settings. For example:

CELERYBEAT_SCHEDULE = {
    'task_1': {
        'task': 'project.apps.app_1.tasks.task_1',
        'schedule': crontab(minute='*/5'),  # Execute every 5 minutes
    },
    'task_2': {
        'task': 'project.apps.app_1.tasks.task_2',
        'schedule': crontab(minute='*/15'),  # Execute every 15 minutes
    },
    'task_3': {
        'task': 'project.apps.app_2.tasks.task_1',
        'schedule': crontab(hour=3, minute=0),  # Execute each day at 3 am
    },
    ...
}

The main idea here to check if all tasks listed in CELERYBEAT_SCHEDULE can be imported. We can do this with the next test case:

from django.test import TestCase
from django.utils.module_loading import import_string

from project.celery import app


class TestCeleryTasks(TestCase):

    def test_celery_beat_tasks_can_be_imported(self):
        beat_schedule = app.conf.beat_schedule
        for beat_name, beat_dict in beat_schedule.items():
            import_string(beat_dict['task'])

Regarding other (not scheduled) Celery tasks - we can check if configured Celery app will be able to autodiscover all of them. But firstly we need to list all our tasks and then check if Celery found them.

from django.test import TestCase
from django.utils.module_loading import import_string

from project.celery import app


TASKS_APP_1 = [
    'project.apps.app_1.tasks.task_1',
    'project.apps.app_1.tasks.task_2',
  ...
]
TASKS_APP_2 = [
    'project.apps.app_2.tasks.task_1',
    'project.apps.app_2.tasks.task_2',
  ...
]
...
ALL_TASKS = TASKS_APP_1 + TASKS_APP_2 + ...


class TestCeleryTasks(TestCase):

    def test_celery_autodiscover_all_expected_tasks(self):
        celery_tasks = list(app.tasks.keys())
        for dotted_task_path in ALL_TASKS:
            assert dotted_task_path in celery_tasks

            # Additionally we can check that all our tasks can be imported
            import_string(dotted_task_path)

One of cons with of approach is that each time you will add new task, you will need to add it to ALL_TASKS.

During development it is usual practice to set task_always_eager setting to True. In this case tasks will be executed immediately instead of being sent to the queue, so you do not need to set up broker etc. As mentioned in the Celery Docs — the preferred method to test task behavior in the unit tests is mocking with enabled "eager mode". Another option and good practice as well is to use tasks to call outer functions that can be tested w/o task. E.g.:

from django.contrib.auth import get_user_model

from project.app import utils


User = get_user_model()


@app.task
def send_welcome_email(user_id):
    user = User.objects.get(id=user_id)
    utils.send_welcome_email(user)

In this case you will be able to test send_welcome_email util in separate unit test and be sure that it behaves as expected.

All test cases described above can be used as addition to your other tests and can be used in your normal CI process. With this addition you will be sure that your tasks set up in correct way.

Monitoring Celery tasks

The first option is to save Celery logs. Generally this is a good practice to log all changes that was made with code.

Example of config for logging Celery logs in Django project:

LOGGING = {
    'version': 1,
    'formatters': {
        'simple': {
            'format': '%(levelname)s %(message)s',
            'datefmt': '%y %b %d, %H:%M:%S',
            },
        },
    'handlers': {
        'celery': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'celery.log',
            'formatter': 'simple',
            'maxBytes': 1024 * 1024 * 100,  # 100 mb
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery'],
            'level': 'DEBUG',
        },
    }
}

Good logging is a must have for any project, but usually manually analyzing logs is inconvenient and inefficient, especially in a real-time.

The best choice in this case is Flower. Flower adds ability to review in a real-time executed tasks with their status, arguments, start time, runtime etc. It is a good practice to return some meaningful values from tasks, so them you will be able to see the result of executed task (in other case None will be returned). For instance, the task that sends emails can return something like total number of emails, number of successful emails, number of failed emails. E.g.:

With Flower you will be able to monitor and control your workers as well as tasks.

Additionally Flower provides an API you can use for your own needs (control tasks, workers etc.)

Clearly is an alternative for Flower and can be useful as well.

Flower will show up the list of all executed tasks including the failed ones. In this case you will be able to see some short traceback for caused error. Sometimes this won't be enough to make decision about the reason that caused the error. It is better for errors handling use special tools, like Sentry, New Relic and others. Sentry for example can track Celery errors in the same way as Django errors, so it is easy to integrate it in any Django project, New Relic has similar functionality

Also you can always create your own solution and ass an example we suggest to read this blog post which explains how using Grafana for visualization it is possible to implement some slick and nice custom solution for monitoring Celery workers.

Monitoring that Celery workers are active

There are can be situations when all Celery workers broke: OS issues, not enough memory to start up etc. In this case you will know about this only in case you constantly check Flower or logs. Even though the failures are rare, it's still very important to keep an eye on them and to know about this as soon as possible, especially when your Celery tasks run payment related processes. E.g. your users would be waiting their payouts etc, but this will not happen because any of your background tasks couldn't be executed.

The best solution is to implement a simple script that will periodically check workers health and will inform you when there is a problem. The most simple and reliable solution would be bash/Python script and schedule it with crontab. Our main tool for communications is Slack, so we can send Slack notifications on Celery workers failure using Slack Python SDK. Sending a message to a necessary Slack channel in the simplest case can be implemented in the next way:

import os

from slack import WebClient


def send_slack_message(text, channel='#random'):
    """
    Sends `text` to provided `channel`.
    """
    client = WebClient(token=os.environ['SLACK_TOKEN'])
    client.chat_postMessage(channel=channel, text=text)

To make it work firstly you need to generate your own API Token.

Our main goal here is to check if Celery workers (at least one) are running. We can do this with psutil library.

import psutil


def is_process_running(process_name):
    """
    Checks if there is any running process that contains the given `process_name`.
    """
    # Iterate over the all running processes
    for proc in psutil.process_iter():
        try:
            if process_name.lower() == proc.name().lower():
                return True
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            pass
    return False

Now we can create simple Python script that will check if Celery is running — list of all processes contains "celery".

from utils import is_process_running, send_slack_message


def check_celery_workers_heartbeat():
    """
    Notifies through Slack when there are no active Celery workers.
    """
    if not is_process_running('celery'):
        send_slack_message(text='Error: Celery workers are not running.')


if __name__ == '__main__':
    check_celery_workers_heartbeat()

The final step here is to create a cron job to execute our Python script periodically. For example we can create file cronjob and put in to /etc/cron.d folder:

# Run (each minute) python script to check that there are active Celery workers.
* * * * * python /app/workers_heartbeat.py >> /logs/cron.log 2>&1

Be sure that you added empty line at the end of this file or you will have an error — new crontab file is missing newline before EOF, can't install. when you will try to start cron.

Also you can create cron job with command crontab -e - in this case a default system editor will be opened and you will be able to add to opened file the text above.

To test this approach you can check the repo with the above code. You can try to run it inside the Docker container with Celery and needed cron jobs to see how this can be implemented in environment similar to real/production.

Since not all teams use Slack, there are can be many other ways to inform developers about issues on a server: emails using this tutorial), SMS using Nexmo , Twilio or by this tutorial), push notifications with Firebase Cloud Messaging etc.

Final words

For more productive and efficient work with Celery you can check Celery tasks checklist

Notes about "Do not use complex objects in task as parameters..." based on our experience:

  • You need to use simple data types as task arguments, because broker usually cannot work with complex data (as Django model objects etc.).
  • Generally only data of JSON serializable types can be passed to a task. You will not be able to pass to the task: instance of timedelta, instance of bytes, instance of set, instance of complex, instance of any custom class, class and function.
  • You can pass instance of decimal.Decimal to a task, but in task passed value will be converted to instance of float.
  • With smaller arguments, message queue will be smaller.
  • Data in database may change before task is executed, so it is better to use object id and get "actual" data from DB.