test_schedule_jobs.py
2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf8 -*-
# This file is part of PyBossa.
#
# Copyright (C) 2015 SciFabric LTD.
#
# PyBossa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyBossa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PyBossa. If not, see <http://www.gnu.org/licenses/>.
from pybossa.jobs import schedule_job
from rq_scheduler import Scheduler
from redis import StrictRedis
def a_function():
return
def another_function():
return
a_job = dict(name=a_function, args=[], kwargs={},
interval=1, timeout=180)
another_job = dict(name=another_function, args=[], kwargs={},
interval=1, timeout=180)
class TestSetupScheduledJobs(object):
"""Tests for setup function 'schedule_job'"""
def setUp(self):
self.connection = StrictRedis()
self.connection.flushall()
self.scheduler = Scheduler('test_queue', connection=self.connection)
def test_adds_scheduled_job_with_interval(self):
a_job['interval'] = 7
schedule_job(a_job, self.scheduler)
sched_jobs = self.scheduler.get_jobs()
assert len(sched_jobs) == 1, sched_jobs
assert sched_jobs[0].meta['interval'] == 7 , sched_jobs[0].meta
a_job['interval'] = 1
def test_adds_several_jobs_(self):
schedule_job(a_job, self.scheduler)
schedule_job(another_job, self.scheduler)
sched_jobs = self.scheduler.get_jobs()
job_func_names = [job.func_name for job in sched_jobs]
module_name = 'test_jobs.test_schedule_jobs'
assert len(sched_jobs) == 2, sched_jobs
assert module_name + '.a_function' in job_func_names, job_func_names
assert module_name + '.another_function' in job_func_names, job_func_names
def test_does_not_add_job_if_already_added(self):
schedule_job(a_job, self.scheduler)
schedule_job(a_job, self.scheduler)
sched_jobs = self.scheduler.get_jobs()
assert len(sched_jobs) == 1, sched_jobs
def test_returns_log_messages(self):
success_message = schedule_job(a_job, self.scheduler)
failure_message = schedule_job(a_job, self.scheduler)
assert success_message == 'Scheduled a_function([], {}) to run every 1 seconds'
assert failure_message == 'WARNING: Job a_function([], {}) is already scheduled'
def test_failed_attempt_to_schedule_does_not_polute_redis(self):
schedule_job(a_job, self.scheduler)
schedule_job(a_job, self.scheduler)
stored_values = self.connection.keys('rq:job*')
assert len(stored_values) == 1, len(stored_values)