test_sched_2.py
3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# -*- coding: utf8 -*-
# This file is part of PyBossa.
#
# Copyright (C) 2015 SciFabric LTD.
#
# PyBossa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyBossa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PyBossa. If not, see <http://www.gnu.org/licenses/>.
from helper import sched
from default import with_context
import json
from mock import patch
class TestSched(sched.Helper):
def setUp(self):
super(TestSched, self).setUp()
self.endpoints = ['project', 'task', 'taskrun']
# Tests
@with_context
@patch('pybossa.api.task_run.request')
def test_incremental_tasks(self, mock_request):
""" Test incremental SCHED strategy - second TaskRun receives first given answer"""
self.create_2(sched='incremental')
mock_request.remote_addr = '127.0.0.0'
# Del previous TaskRuns
self.del_task_runs()
# Register
self.register(fullname="John Doe", name="johndoe", password="p4ssw0rd")
self.register(fullname="Marie Doe", name="mariedoe", password="dr0wss4p")
self.signin()
# Get the only task with no runs!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
# Check that we received a clean Task
assert data.get('info'), data
assert not data.get('info').get('last_answer')
# Submit an Answer for the assigned task
tr = dict(project_id=data['project_id'], task_id=data['id'], info={'answer': 'No'})
tr = json.dumps(tr)
self.app.post('/api/taskrun', data=tr)
# No more tasks available for this user!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
assert not data
#### Get the only task now with an answer as Anonimous!
self.signout()
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No'
# Submit a second Answer as Anonimous
tr = dict(project_id=data['project_id'], task_id=data['id'],
info={'answer': 'No No'})
tr = json.dumps(tr)
self.app.post('/api/taskrun', data=tr)
#### Get the only task now with an answer as User2!
self.signin(email="mariedoe@example.com", password="dr0wss4p")
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No No'