test_dashboard_task_task_run.py
5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- coding: utf8 -*-
# This file is part of PyBossa.
#
# Copyright (C) 2015 SciFabric LTD.
#
# PyBossa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyBossa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PyBossa. If not, see <http://www.gnu.org/licenses/>.
from pybossa.dashboard.jobs import new_tasks_week, new_task_runs_week
from pybossa.dashboard.data import format_new_task_runs, format_new_tasks
from pybossa.core import db
from datetime import datetime, timedelta
from factories.taskrun_factory import TaskRunFactory, AnonymousTaskRunFactory
from factories.task_factory import TaskFactory
from default import Test, with_context
from mock import patch, MagicMock
class TestDashBoardNewTask(Test):
@with_context
@patch('pybossa.dashboard.jobs.db')
def test_materialized_view_refreshed(self, db_mock):
"""Test JOB dashboard materialized view is refreshed."""
result = MagicMock()
result.exists = True
results = [result]
db_mock.slave_session.execute.return_value = results
res = new_tasks_week()
assert db_mock.session.execute.called
assert res == 'Materialized view refreshed'
@with_context
@patch('pybossa.dashboard.jobs.db')
def test_materialized_view_created(self, db_mock):
"""Test JOB dashboard materialized view is created."""
result = MagicMock()
result.exists = False
results = [result]
db_mock.slave_session.execute.return_value = results
res = new_tasks_week()
assert db_mock.session.commit.called
assert res == 'Materialized view created'
@with_context
def test_new_tasks(self):
"""Test JOB dashboard returns new task."""
TaskFactory.create()
new_tasks_week()
sql = "select * from dashboard_week_new_task;"
results = db.session.execute(sql).fetchall()
assert results[0].day_tasks == 1, results[0].day_tasks
@with_context
@patch('pybossa.dashboard.data.db')
def test_format_new_tasks_emtpy(self, db_mock):
"""Test format new tasks empty works."""
db_mock.slave_session.execute.return_value = []
new_tasks_week()
res = format_new_tasks()
assert len(res['labels']) == 1
day = datetime.utcnow().strftime('%Y-%m-%d')
assert res['labels'][0] == day
assert len(res['series']) == 1
assert res['series'][0][0] == 0, res['series'][0][0]
@with_context
def test_format_new_tasks(self):
"""Test format new tasks works."""
TaskFactory.create()
new_tasks_week()
res = format_new_tasks()
assert len(res['labels']) == 1
day = datetime.utcnow().strftime('%Y-%m-%d')
assert res['labels'][0] == day
assert len(res['series']) == 1
assert res['series'][0][0] == 1, res['series'][0][0]
class TestDashBoardNewTaskRuns(Test):
@with_context
@patch('pybossa.dashboard.jobs.db')
def test_materialized_view_refreshed(self, db_mock):
"""Test JOB dashboard materialized view is refreshed."""
result = MagicMock()
result.exists = True
results = [result]
db_mock.slave_session.execute.return_value = results
res = new_task_runs_week()
assert db_mock.session.execute.called
assert res == 'Materialized view refreshed'
@with_context
@patch('pybossa.dashboard.jobs.db')
def test_materialized_view_created(self, db_mock):
"""Test JOB dashboard materialized view is created."""
result = MagicMock()
result.exists = False
results = [result]
db_mock.slave_session.execute.return_value = results
res = new_task_runs_week()
assert db_mock.session.commit.called
assert res == 'Materialized view created'
@with_context
def test_new_task_runs(self):
"""Test JOB dashboard returns new task runs."""
day = datetime.utcnow() - timedelta(days=2)
TaskRunFactory.create(finish_time=day.isoformat())
day = datetime.utcnow() - timedelta(days=1)
TaskRunFactory.create(finish_time=day.isoformat())
new_task_runs_week()
sql = "select * from dashboard_week_new_task_run;"
results = db.session.execute(sql).fetchall()
assert results[0].day_task_runs == 1, results[0].day_task_runs
@with_context
@patch('pybossa.dashboard.data.db')
def test_format_new_task_runs_emtpy(self, db_mock):
"""Test format new task_runs empty works."""
db_mock.slave_session.execute.return_value = []
new_task_runs_week()
res = format_new_task_runs()
assert len(res['labels']) == 1
day = datetime.utcnow().strftime('%Y-%m-%d')
assert res['labels'][0] == day, res
assert len(res['series']) == 1
assert res['series'][0][0] == 0, res['series'][0][0]
@with_context
def test_format_new_task_runs(self):
"""Test format new task_runs works."""
TaskRunFactory.create()
AnonymousTaskRunFactory.create()
new_task_runs_week()
res = format_new_task_runs()
assert len(res['labels']) == 1
day = datetime.utcnow().strftime('%Y-%m-%d')
assert res['labels'][0] == day, res
assert len(res['series']) == 1
assert res['series'][0][0] == 2, res['series'][0][0]