Commit 8a883b80770cb39d541e30e6436da3d6f8314126
1 parent
34773388
Exists in
master
and in
29 other branches
Implemented locking to allow only one importing task at time
Signed-off-by: Sergio Oliveira <sergio@tracy.com.br>
Showing
1 changed file
with
18 additions
and
1 deletions
Show diff stats
colab/plugins/data/tasks.py
@@ -3,6 +3,8 @@ | @@ -3,6 +3,8 @@ | ||
3 | import importlib | 3 | import importlib |
4 | import logging | 4 | import logging |
5 | 5 | ||
6 | +import redis | ||
7 | + | ||
6 | from django.conf import settings | 8 | from django.conf import settings |
7 | 9 | ||
8 | from colab.celery import app | 10 | from colab.celery import app |
@@ -11,6 +13,20 @@ LOGGER = logging.getLogger('colab.plugins.data') | @@ -11,6 +13,20 @@ LOGGER = logging.getLogger('colab.plugins.data') | ||
11 | TASKS = set() | 13 | TASKS = set() |
12 | 14 | ||
13 | 15 | ||
16 | +def lock(method, name): | ||
17 | + def wrapped_method(self, *args, **kwargs): | ||
18 | + lock_id = 'colab-data-importer-{}'.format(name) | ||
19 | + lock = redis.Redis().lock(lock_id) | ||
20 | + | ||
21 | + if lock.acquire(blocking=False): | ||
22 | + try: | ||
23 | + return method(*args, **kwargs) | ||
24 | + finally: | ||
25 | + lock.release() | ||
26 | + | ||
27 | + return wrapped_method | ||
28 | + | ||
29 | + | ||
14 | def register_tasks(): | 30 | def register_tasks(): |
15 | 31 | ||
16 | global TASKS | 32 | global TASKS |
@@ -31,7 +47,8 @@ def register_tasks(): | @@ -31,7 +47,8 @@ def register_tasks(): | ||
31 | continue | 47 | continue |
32 | instance = item() | 48 | instance = item() |
33 | task_name = '{}.{}'.format(module.__name__, item_name) | 49 | task_name = '{}.{}'.format(module.__name__, item_name) |
34 | - task = app.task(name=task_name, bind=True)(instance.fetch_data) | 50 | + thread_safe_method = lock(instance.fetch_data, task_name) |
51 | + task = app.task(name=task_name, bind=True)(thread_safe_method) | ||
35 | TASKS.add(task) | 52 | TASKS.add(task) |
36 | LOGGER.debug('Registered task: %s', task_name) | 53 | LOGGER.debug('Registered task: %s', task_name) |
37 | 54 |