Celery, блокировка периодических задач.

Содержание
Проблема #
Представьте что у вас есть задача, выполняющаяся недетерминированное время, в таком случае при запуске новой задачи необходимо дождаться выполнения предыдущей, дабы предотвратить утечки памяти, а также состояние гонки.
Решение #
Для celery есть специальный модуль - celery-once, позволяющий блокировать задачи так, чтобы таск выполнял только 1 воркер за промежуток времени. Однако, лично мне, не удалось настроить его на периодические задачи. Для блокировки периодических задач был использован следующий декоратор (честно взятый со stack overflow)
import functools
from django.core.cache import cache
def single_instance_task(timeout):
def task_exc(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func()
finally:
release_lock()
return wrapper
return task_exc
Далее мы можем использовать декоратор для необходимы нам задач:
@periodic_task(bind=True, run_every=timedelta(seconds=5), )
@single_instance_task(5*60)
def send_verify_email():
print(current_task, 'started')
sleep(randint(1, 15))
print(current_task, 'ended')
Примечание #
Стоит отметить, что для cache нельзя использовать дефолтную память Django (LocMemCache), т.к. у каждого воркера она своя. Я использовал redis.
Для фласка и других фреймворков можно использовать свой модуль кеширования.