test_result_repository.py
6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf8 -*-
# This file is part of PyBossa.
#
# Copyright (C) 2015 SciFabric LTD.
#
# PyBossa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyBossa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PyBossa. If not, see <http://www.gnu.org/licenses/>.
# Cache global variables for timeouts
from default import Test, db
from factories import TaskFactory, TaskRunFactory
from pybossa.repositories import ResultRepository
from pybossa.core import task_repo, result_repo
from nose.tools import assert_raises
from pybossa.exc import WrongObjectError, DBIntegrityError
class TestResultRepository(Test):
def setUp(self):
super(TestResultRepository, self).setUp()
self.result_repo = ResultRepository(db)
def create_result(self, n_answers=1, filter_by=False):
task = TaskFactory.create(n_answers=n_answers)
TaskRunFactory.create(task=task)
if filter_by:
return self.result_repo.filter_by(project_id=1)
else:
return self.result_repo.get_by(project_id=1)
def test_get_return_none_if_no_result(self):
"""Test get method returns None if there is no result with the
specified id"""
result = self.result_repo.get(2)
assert result is None, result
def test_get_returns_result(self):
"""Test get method returns a result if exists"""
n_answers = 1
task = TaskFactory.create(n_answers=n_answers)
task_run = TaskRunFactory.create(task=task)
result = self.result_repo.filter_by(project_id=1)
err_msg = "There should be a result"
assert len(result) == 1, err_msg
result = result[0]
assert result.project_id == 1, err_msg
assert result.task_id == task.id, err_msg
assert len(result.task_run_ids) == n_answers, err_msg
err_msg = "The task_run id is missing in the results array"
for tr_id in result.task_run_ids:
assert tr_id == task_run.id, err_msg
def test_get_by_returns_result(self):
"""Test get_by method returns a result if exists"""
n_answers = 1
task = TaskFactory.create(n_answers=n_answers)
task_run = TaskRunFactory.create(task=task)
result = self.result_repo.get_by(project_id=1)
err_msg = "There should be a result"
assert result.project_id == 1, err_msg
assert result.task_id == task.id, err_msg
assert len(result.task_run_ids) == n_answers, err_msg
err_msg = "The task_run id is missing in the results array"
for tr_id in result.task_run_ids:
assert tr_id == task_run.id, err_msg
def test_get_returns_result_after_increasig_redundancy(self):
"""Test get method returns a result if after increasing redundancy"""
n_answers = 1
task = TaskFactory.create(n_answers=n_answers)
task_run = TaskRunFactory.create(task=task)
result = self.result_repo.filter_by(project_id=1)
err_msg = "There should be a result"
assert len(result) == 1, err_msg
result = result[0]
assert result.project_id == 1, err_msg
assert result.task_id == task.id, err_msg
assert len(result.task_run_ids) == n_answers, err_msg
err_msg = "The task_run id is missing in the results array"
for tr_id in result.task_run_ids:
assert tr_id == task_run.id, err_msg
# Increase redundancy
tmp = task_repo.get_task(task.id)
tmp.n_answers = 2
task_repo.update(task)
err_msg = "There should be only one result"
results = result_repo.filter_by(project_id=1)
assert len(results) == 1, err_msg
task_run_2 = TaskRunFactory.create(task=task)
err_msg = "There should be 1 results"
results = result_repo.filter_by(project_id=1)
assert len(results) == 1, err_msg
err_msg = "There should be 2 results"
results = result_repo.filter_by(project_id=1, last_version=False)
assert len(results) == 2, err_msg
assert results[1].project_id == 1, err_msg
assert results[1].task_id == task.id, err_msg
err_msg = "First result should have only one task run ID"
assert len(results[0].task_run_ids) == 1, err_msg
err_msg = "Second result should have only two task run IDs"
assert len(results[1].task_run_ids) == 2, err_msg
err_msg = "The task_run id is missing in the results array"
for tr_id in results[1].task_run_ids:
assert tr_id in [task_run.id, task_run_2.id], err_msg
def test_get_returns_no_result(self):
"""Test get method does not return a result if task not completed"""
n_answers = 3
task = TaskFactory.create(n_answers=n_answers)
TaskRunFactory.create(task=task)
result = self.result_repo.filter_by(project_id=1)
err_msg = "There should not be a result"
assert len(result) == 0, err_msg
def test_update(self):
"""Test update persists the changes made to the result"""
result = self.create_result()
result.info = dict(new='value')
self.result_repo.update(result)
updated_result = self.result_repo.get(result.id)
assert updated_result.info['new'] == 'value', updated_result
def test_update_fails_if_integrity_error(self):
"""Test update raises a DBIntegrityError if the instance to be updated
lacks a required value"""
result = self.create_result()
result.project_id = None
assert_raises(DBIntegrityError, self.result_repo.update, result)
def test_update_only_updates_results(self):
"""Test update raises a WrongObjectError when an object which is not
a Result instance is updated"""
bad_object = dict()
assert_raises(WrongObjectError, self.result_repo.update, bad_object)