The more microservices you have the more you need asynchronous messaging between the microservices or scheduled jobs or background tasks.

Background tasks are good for many reasons; lifting off the heavy load from specific microservices, run stateless jobs and scheduled tasks, asynchronous messaging etc…

But failed background tasks -true to its name- can go unnoticed if you do not set up a proper monitoring or a highly available system. So you need those tasks (in short) to be fast, highly available and easy to manage.

Hence, you need a distributed, fast-paced development environment with low latency while communicating.

Therefore, I humbly suggest;

  • Python, due to its simplicity.
  • Celery: A distributed task queue which is actively maintained on github (with lots of contributors), fast and highly available.
  • Redis: In-memory key-value store with incredibly low latency. It can be used as both (message) broker and (result) backend for Celery.

A Simple Task Queue Example

I will explain scheduled tasks and triggered tasks in this example and I’ll be using python 3.8.5 and celery==5.1.1.

Let’s consider that we have active users using our service with a paid subscription. We have to renew the expired subscriptions and send invoices to users with emails (scheduled task), send custom emails on events like registration, password recovery etc… (triggered tasks)

Scheduled Tasks

When to renew the expired subscriptions is based on the current time and the users' expiration dates stored in databases. Obviously, it would not be a good practice to check whether the users are expired or not one by one. However, we can check the database regularly for users whose expiration date has passed.

Thus, we have to set up a cron job working every X second to get the expired subscriptions , try to renew them and finally send invoice to user if the renewal was successful.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
import datetime
import smtplib
import ssl
from typing import Optional

from celery import Celery
#from celery.schedules import crontab

SMTP_SERVER = "your.smtp.server"
SENDER_EMAIL = "your@email.com"
EMAIL_PASSWORD = "Secret_Email_Password"
REDIS_BASE_URL = 'redis://localhost:6379'
app = Celery(
    'runner',
    broker=f"{REDIS_BASE_URL}/0",
    backend=f"{REDIS_BASE_URL}/1"
)

app.conf.beat_schedule = {
    'renew-expired-subscriptions': {
        'task': 'runner.renew_expired_subscriptions',
        'schedule': 5.0, # Runs in every 5 seconds
        #'schedule': crontab(minute=5) # Runs in every 5 minutes
        #'args': (arg_1, arg_2, ...), # Run the function with arguments
    },
}

@app.task(name='renew-expired-subscriptions')
def renew_expired_subscriptions():
    # Get expired user informations from your db
    # Try to renew subscription
    _test_user_info = {
        'name': 'Test User',
        'subscription_price': 14.99,
        'renewal_date': datetime.datetime.strftime(
            datetime.datetime.utcnow(),
            '%Y-%m-%d %H:%M:%S'
        )
    }
    _renewed = True # Very optimistic assumption
    _test_user_email = 'test@user.com'
    if _renewed:
        _sent = send_email(
            _test_user_email,
            replace_fields_with_values(
                'invoice_email_template',
                _test_user_info
            )
        )
    else:
        _sent = send_email(
            _test_user_email,
            replace_fields_with_values(
                'failed_to_renew_subscription_template',
                _test_user_info
            )
        )

    if _sent:
        logging.info(f"Invoice sent to user {_test_user_email}")

    return {
        "email": _test_user_email,
        "subscription_renewed": _renewed,
        "email_status": _sent
    }

def send_email(to: str, message: str):
    try:
        context = ssl.create_default_context()
        with smtplib.SMTP(SMTP_SERVER, 587) as server:
            server.ehlo()
            server.starttls(context=context)
            server.ehlo()
            server.login(SENDER_EMAIL, EMAIL_PASSWORD)
            server.sendmail(SENDER_EMAIL, to, message)
            return True
    except Exception as _ex:
        logging.error(str(_ex))
        logging.critical(
            "Error occured while trying to"\
            f"send invoice to: <{to}>"
        )
        return False

def replace_fields_with_values(
        email_template: str,
        extra_info: dict
    ):
    try:
        with open(f"{email_template}.txt", 'r') as _template:
            _template_text = _template.read()
            _email = _template_text.format(**extra_info)
        return _email
    except FileNotFoundError:
        logging.critical(f"Email template not found: <{email_template}.txt>")

Our cron job renew_expired_subscriptions is defined and has some (dummy) functionality with the helper functions replace_fields_with_values and send_email.

Starting from the foundation; firstly we need to define our celery worker application:

1
2
3
4
5
6
REDIS_BASE_URL = 'redis://localhost:6379'
app = Celery(
    'runner',
    broker=f"{REDIS_BASE_URL}/0",
    backend=f"{REDIS_BASE_URL}/1"
)

'runner' is the main name for our application. See Main Names for more detailed information.

And we have some helper functions; replace_fields_with_values function replaces the placeholders in the given email template with the given values and outputs the custom email message. send_email function does as it’s name promises.

Finally you can see that we’ve also defined the cron task renew_expired_subscriptions, which has some dummy subscription renewal logic inside it. Every celery task is defined under a @app.task decorator.

We also need to define beat configuration for scheduled tasks:

1
2
3
4
5
6
7
8
app.conf.beat_schedule = {
    'renew-expired-subscriptions': {
        'task': 'runner.renew_expired_subscriptions',
        'schedule': 5.0, # Runs in every 5 seconds
        #'schedule': crontab(minute=5) # Runs in every 5 minutes
        #'args': (arg_1, arg_2, ...), # Run the function with arguments
    },
}

This configuration object can be extended for added cron tasks. We set the crontab to 5 seconds for our subscription renewal task.

Now, we can run the workers:

1
2
3
4
5
6
7
8
# window 1
docker run -p 6379:6379 redis:alpine

# window 2
celery --app=runner worker --loglevel=info

# window 3
celery --app=runner beat --loglevel=info

Scheduled Task Result

Triggered Tasks

Apart from the scheduled tasks, there might also a need for triggered tasks. These tasks are triggered when an event is fired.

When the event is fired, some message (task data) is pushed to the message broker (a.k.a bus). Then the workers get the tasks in the queue from the message broker and process them.

In our case, we need to send emails to users when some events occur, like registration, new subscription, password recovery and so on.

With the retry settings and a soft timeout limit, our email sending task should look something like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@app.task(
    name='send-email-to-user',
    default_retry_delay=300,
    max_retry=5,
    soft_time_limit=30
)
def send_email_to_user(
        user_email: str,
        email_template: str,
        extra_info: dict
    ):
    _message = replace_fields_with_values(email_template, extra_info)
    _sent = send_email(user_email, _message)
    logging.info(f"Sent {email_template} email to user <{user_email}>")
    return {
        "email": user_email,
        "email_type": email_template,
        "email_status": _sent
    }

To trigger this task manually, spin up your workers, a local redis and run a Python command at the project directory:

1
2
3
4
5
6
7
8
# window 1
docker run -p 6379:6379 redis:alpine

# window 2
celery --app=runner worker --loglevel=info

# window 3
python3 -c "import runner; runner.send_email_to_user.delay('a@b.c', 'register', {'a': 'b'})"

You can see the result at the worker:

Triggered Task Result

Conclusion

With use of task queues, you may benefit concurrent/parallel computation, microservices with less workload, resilient workers.

Now, can you run both scheduled tasks, trigger other tasks manually at the same time and monitor your workers on the other hand? You should definitely try this out. It will give you the basic understanding of how does a task queue mechanism work.

Best Practices on Tasks Queues

  • Set time limits to tasks. CELERYD_TASK_SOFT_TIME_LIMIT
  • Use least arguments as possible. Stateless tasks are better.
  • Don’t trust your broker for security. You have to take security seriously if you have secrets in your tasks' data.
  • Be aware of the limits of your connection pool limits.
  • You have to make different settings for different brokers or backends. No broker or backend is the same with others.
  • Exponential retry intervals are better than linear retry intervals.
  • Categorize and prioritize your queues.
  • Monitor your workers and log properly.

Further Read

References