Перейти к содержанию

Celery, блокировка периодических задач.

·164 слова·1 минута

Проблема #

Представьте что у вас есть задача, выполняющаяся недетерминированное время, в таком случае при запуске новой задачи необходимо дождаться выполнения предыдущей, дабы предотвратить утечки памяти, а также состояние гонки.

Решение #

Для celery есть специальный модуль - celery-once, позволяющий блокировать задачи так, чтобы таск выполнял только 1 воркер за промежуток времени. Однако, лично мне, не удалось настроить его на периодические задачи. Для блокировки периодических задач был использован следующий декоратор (честно взятый со stack overflow)

import functools

from django.core.cache import cache


def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func()
                finally:
                    release_lock()
        return wrapper
    return task_exc

Далее мы можем использовать декоратор для необходимы нам задач:

@periodic_task(bind=True, run_every=timedelta(seconds=5), )
@single_instance_task(5*60)
def send_verify_email():
    print(current_task, 'started')
    sleep(randint(1, 15))
    print(current_task, 'ended')

Примечание #

Стоит отметить, что для cache нельзя использовать дефолтную память Django (LocMemCache), т.к. у каждого воркера она своя. Я использовал redis.

Для фласка и других фреймворков можно использовать свой модуль кеширования.