diff --git a/colab/plugins/data/tasks.py b/colab/plugins/data/tasks.py index d9a7626..79b8d72 100644 --- a/colab/plugins/data/tasks.py +++ b/colab/plugins/data/tasks.py @@ -3,6 +3,8 @@ import importlib import logging +import redis + from django.conf import settings from colab.celery import app @@ -11,6 +13,20 @@ LOGGER = logging.getLogger('colab.plugins.data') TASKS = set() +def lock(method, name): + def wrapped_method(self, *args, **kwargs): + lock_id = 'colab-data-importer-{}'.format(name) + lock = redis.Redis().lock(lock_id) + + if lock.acquire(blocking=False): + try: + return method(*args, **kwargs) + finally: + lock.release() + + return wrapped_method + + def register_tasks(): global TASKS @@ -31,7 +47,8 @@ def register_tasks(): continue instance = item() task_name = '{}.{}'.format(module.__name__, item_name) - task = app.task(name=task_name, bind=True)(instance.fetch_data) + thread_safe_method = lock(instance.fetch_data, task_name) + task = app.task(name=task_name, bind=True)(thread_safe_method) TASKS.add(task) LOGGER.debug('Registered task: %s', task_name) -- libgit2 0.21.2