# -*- coding: utf8 -*-
# This file is part of PyBossa.
#
# Copyright (C) 2015 SciFabric LTD.
#
# PyBossa is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyBossa is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PyBossa. If not, see .
from mock import patch, Mock
from nose.tools import assert_raises
from twitter import TwitterHTTPError
from pybossa.importers import BulkImportException
from pybossa.importers.twitterapi import BulkTaskTwitterImport
def create_importer_with_form_data(**form_data):
with patch('pybossa.importers.twitterapi.oauth2_dance'):
form_data['consumer_key'] = 'consumer_key'
form_data['consumer_secret'] = 'consumer_secret'
importer = BulkTaskTwitterImport(**form_data)
importer.client.api = Mock()
return importer
class TestBulkTaskTwitterImportSearch(object):
def create_status(_id):
return {
u'created_at': 'created',
u'favorite_count': 77,
u'coordinates': 'coords',
u'id_str': unicode(_id),
u'id': _id,
u'retweet_count': 44,
u'user': {'screen_name': 'fulanito'},
u'text': 'this is a tweet #match'
}
no_results = {
u'statuses': []
}
one_status = {
u'statuses': [
create_status(0)
]
}
five_statuses = {
u'statuses': [create_status(i+1) for i in range(5)]
}
def test_count_tasks_returns_number_of_tweets_requested(self):
max_tweets = 10
form_data = {'source': '#match', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets.return_value = self.one_status
number_of_tasks = importer.count_tasks()
assert number_of_tasks == number_of_tasks, number_of_tasks
def test_tasks_return_task_dict_with_info_from_query_result(self):
form_data = {'source': '#match', 'max_tweets': 1}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets.return_value = self.one_status
expected_task_data = self.one_status['statuses'][0]
tasks = importer.tasks()
assert len(tasks) == 1, tasks
info = tasks[0]['info']
assert info['created_at'] == expected_task_data['created_at']
assert info['favorite_count'] == expected_task_data['favorite_count']
assert info['coordinates'] == expected_task_data['coordinates']
assert info['id'] == expected_task_data['id']
assert info['retweet_count'] == expected_task_data['retweet_count']
assert info['user_screen_name'] == expected_task_data['user']['screen_name']
assert info['user'] == expected_task_data['user']
assert info['text'] == expected_task_data['text']
def test_tasks_can_return_more_than_returned_by_single_api_call(self):
responses = [self.no_results, self.one_status, self.five_statuses]
def multiple_responses(*args, **kwargs):
return responses.pop()
max_tweets = 10
form_data = {
'source': '#hashtag',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = multiple_responses
tasks = importer.tasks()
assert len(tasks) == 6, len(tasks)
def test_tasks_does_not_return_more_than_requested_even_if_api_do(self):
max_tweets = 2
form_data = {'source': '#match', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets.return_value = self.five_statuses
tasks = importer.tasks()
assert len(tasks) == max_tweets, len(tasks)
def test_api_calls_with_max_id_pagination(self):
responses = [self.no_results, self.one_status, self.five_statuses]
calls = []
def multiple_responses(*args, **kwargs):
calls.append({'args': args, 'kwargs': kwargs})
return responses.pop()
max_tweets = 6
form_data = {
'source': '#hashtag',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = multiple_responses
tasks = importer.tasks()
assert calls[0]['kwargs']['count'] == 6, calls[0]['kwargs']
assert calls[0]['kwargs']['q'] == form_data['source'] + '-filter:retweets', calls[0]['kwargs']
assert calls[1]['kwargs']['count'] == 1, calls[1]['kwargs']
assert calls[1]['kwargs']['max_id'] == 0, calls[1]['kwargs']
assert calls[2]['kwargs']['count'] == 0, calls[2]['kwargs']
assert calls[2]['kwargs']['max_id'] == -1, calls[2]['kwargs']
def test_max_tweets_gets_a_default_value_of_200(self):
calls = []
def response(*args, **kwargs):
calls.append({'args': args, 'kwargs': kwargs})
return self.five_statuses
form_data = {'source': '#match'}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = response
tasks = importer.tasks()
assert calls[0]['kwargs']['count'] == 200, calls[0]['kwargs']['count']
@patch('pybossa.importers.twitterapi.OAuth')
@patch('pybossa.importers.twitterapi.OAuth2')
def test_user_credentials_are_used_when_provided(self, oauth2, oauth):
form_data = {
'source': '#hashtag',
'max_tweets': 500,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
oauth.assert_called_with('token', 'secret', 'consumer_key', 'consumer_secret')
oauth2.assert_not_called()
@patch('pybossa.importers.twitterapi.OAuth')
@patch('pybossa.importers.twitterapi.OAuth2')
def test_app_credentials_are_used_when_no_user_ones_provided(self, oauth2, oauth):
form_data = {'source': '#hashtag'}
importer = create_importer_with_form_data(**form_data)
oauth.assert_not_called()
assert oauth2.called
def test_only_one_api_call_is_made_when_using_app_credentials(self):
responses = [self.no_results, self.five_statuses]
api_calls = []
def multiple_responses(*args, **kwargs):
api_calls.append({'args': args, 'kwargs': kwargs})
return responses.pop()
max_tweets = 10
form_data = {'source': '#match', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = multiple_responses
tasks = importer.tasks()
assert len(api_calls) == 1, api_calls
def test_tasks_raises_exception_on_twitter_client_error(self):
def response(*args, **kwargs):
class HTTPError(object):
code = 401
headers = {}
fp = Mock()
fp.read.return_value = []
raise TwitterHTTPError(HTTPError, "api.twitter.com", None, None)
max_tweets = 10
form_data = {'source': '#match', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = response
assert_raises(BulkImportException, importer.tasks)
def test_tasks_raises_exception_on_rate_limit_error(self):
def response(*args, **kwargs):
class HTTPError(object):
code = 429
headers = {}
fp = Mock()
fp.read.return_value = []
raise TwitterHTTPError(HTTPError, "api.twitter.com", None, None)
max_tweets = 10
form_data = {'source': '#match', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = response
assert_raises(BulkImportException, importer.tasks)
try:
importer.tasks()
except BulkImportException as e:
assert e.message == "Rate limit for Twitter API reached. Please, try again in 15 minutes.", e.message
def test_metadata_is_used_for_twitter_api_call_if_present(self):
form_data = {
'source': '#hashtag',
'max_tweets': 500,
'last_import_meta': {'last_id': 3},
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets.return_value = {'statuses': []}
tasks = importer.tasks()
importer.client.api.search.tweets.assert_called_with(
count=500,
q='#hashtag-filter:retweets',
since_id=3)
def test_import_metadata_returns_None_before_fetching_tasks(self):
responses = [self.no_results, self.five_statuses]
def multiple_responses(*args, **kwargs):
return responses.pop()
max_tweets = 10
form_data = {
'source': '#hashtag',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = multiple_responses
assert importer.import_metadata() == None, importer.import_metadata()
def test_import_metadata_returns_greatest_id_of_imported_tweets(self):
responses = [self.no_results, self.five_statuses]
def multiple_responses(*args, **kwargs):
return responses.pop()
max_tweets = 10
form_data = {
'source': '#hashtag',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.search.tweets = multiple_responses
expected_metadata = {'last_id': 5}
tasks = importer.tasks()
metadata = importer.import_metadata()
assert metadata == expected_metadata, metadata
class TestBulkTaskTwitterImportFromAccount(object):
def create_status(_id):
return {
u'contributors': None,
u'truncated': False,
u'text': u'Burning news! PyBossa v1.2.1 released! This version gets all new @PyBossa releases in your admin page! https://t.co/WkOXc3YL6s',
u'is_quote_status': False,
u'in_reply_to_status_id': None,
u'id': _id,
u'favorite_count': 0,
u'source': u'TweetDeck',
u'retweeted': False,
u'coordinates': None,
u'entities': {},
u'in_reply_to_screen_name': None,
u'id_str': unicode(_id),
u'retweet_count': 0,
u'in_reply_to_user_id': None,
u'favorited': False,
u'user': {
u'follow_request_sent': False,
u'has_extended_profile': False,
u'profile_use_background_image': True,
u'default_profile_image': False,
u'id': 497181885,
u'profile_background_image_url_https': u'https://abs.twimg.com/images/themes/theme1/bg.png',
u'verified': False,
u'profile_text_color': u'333333',
u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/446669937927389184/vkDC_c3s_normal.png',
u'profile_sidebar_fill_color': u'DDEEF6',
u'entities': {},
u'followers_count': 700,
u'profile_sidebar_border_color': u'C0DEED',
u'id_str': u'497181885',
u'profile_background_color': u'C0DEED',
u'listed_count': 41,
u'is_translation_enabled': False,
u'utc_offset': 3600,
u'statuses_count': 887,
u'description': u'The open source crowdsourcing platform for research built by @Scifabric',
u'friends_count': 731,
u'location': u'Madrid, Spain',
u'profile_link_color': u'EE7147',
u'profile_image_url': u'http://pbs.twimg.com/profile_images/446669937927389184/vkDC_c3s_normal.png',
u'following': True,
u'geo_enabled': True,
u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/497181885/1401885123',
u'profile_background_image_url': u'http://abs.twimg.com/images/themes/theme1/bg.png',
u'screen_name': u'PyBossa',
u'lang': u'en',
u'profile_background_tile': False,
u'favourites_count': 185,
u'name': u'PyBossa',
u'notifications': False,
u'url': u'http://t.co/ASSBcIRZjY',
u'created_at': u'Sun Feb 19 18:17:39 +0000 2012',
u'contributors_enabled': False,
u'time_zone': u'Amsterdam',
u'protected': False,
u'default_profile': False,
u'is_translator': False
},
u'geo': None,
u'in_reply_to_user_id_str': None,
u'possibly_sensitive': False,
u'lang': u'en',
u'created_at': u'Thu Dec 03 15:09:07 +0000 2015',
u'in_reply_to_status_id_str': None,
u'place': None,
u'extended_entities': {}
}
no_results = []
one_status = [create_status(0)]
five_statuses = [create_status(i+1) for i in range(5)]
def test_count_tasks_returns_number_of_tweets_requested(self):
max_tweets = 10
form_data = {'source': '@pybossa', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline.return_value = self.no_results
number_of_tasks = importer.count_tasks()
assert number_of_tasks == number_of_tasks, number_of_tasks
def test_tasks_return_task_dict_with_info_from_query_result(self):
form_data = {'source': '@pybossa', 'max_tweets': 1}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline.return_value = self.one_status
expected_task_data = self.one_status[0]
tasks = importer.tasks()
assert len(tasks) == 1, tasks
info = tasks[0]['info']
assert info['created_at'] == expected_task_data['created_at']
assert info['favorite_count'] == expected_task_data['favorite_count']
assert info['coordinates'] == expected_task_data['coordinates']
assert info['id'] == expected_task_data['id']
assert info['retweet_count'] == expected_task_data['retweet_count']
assert info['user_screen_name'] == expected_task_data['user']['screen_name']
assert info['user'] == expected_task_data['user']
assert info['text'] == expected_task_data['text']
def test_task_can_return_more_than_returned_by_single_api_call(self):
responses = [self.no_results, self.one_status, self.five_statuses]
def multiple_responses(*args, **kwargs):
return responses.pop()
max_tweets = 10
form_data = {
'source': '@pybossa',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline = multiple_responses
tasks = importer.tasks()
assert len(tasks) == 6, len(tasks)
def test_task_does_not_return_more_than_requested_even_if_api_do(self):
max_tweets = 2
form_data = {'source': '@pybossa', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline.return_value = self.five_statuses
tasks = importer.tasks()
assert len(tasks) == max_tweets, len(tasks)
def test_api_calls_with_max_id_pagination(self):
responses = [self.no_results, self.one_status, self.five_statuses]
calls = []
def multiple_responses(*args, **kwargs):
calls.append({'args': args, 'kwargs': kwargs})
return responses.pop()
max_tweets = 6
form_data = {
'source': '@pybossa',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline = multiple_responses
tasks = importer.tasks()
assert calls[0]['kwargs']['count'] == 6, calls[0]['kwargs']
assert calls[0]['kwargs'].get('q') is None, calls[0]['kwargs']
assert calls[0]['kwargs']['screen_name'] == form_data['source']
assert calls[1]['kwargs']['count'] == 1, calls[1]['kwargs']
assert calls[1]['kwargs']['max_id'] == 0, calls[1]['kwargs']
assert calls[2]['kwargs']['count'] == 0, calls[2]['kwargs']
assert calls[2]['kwargs']['max_id'] == -1, calls[2]['kwargs']
def test_tasks_raises_exception_on_twitter_client_error(self):
def response(*args, **kwargs):
class HTTPError(object):
code = 401
headers = {}
fp = Mock()
fp.read.return_value = []
raise TwitterHTTPError(HTTPError, "api.twitter.com", None, None)
max_tweets = 10
form_data = {'source': '@pybossa', 'max_tweets': max_tweets}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline = response
assert_raises(BulkImportException, importer.tasks)
def test_if_last_import_meta_is_None_since_id_is_not_passed_to_twitter_client(self):
responses = [self.no_results, self.five_statuses]
calls = []
def multiple_responses(*args, **kwargs):
calls.append({'args': args, 'kwargs': kwargs})
return responses.pop()
max_tweets = 3
form_data = {
'source': '@pybossa',
'max_tweets': max_tweets,
'user_credentials': '{"oauth_token_secret": "secret", "oauth_token": "token"}'
}
importer = create_importer_with_form_data(**form_data)
importer.client.api.statuses.user_timeline = multiple_responses
tasks = importer.tasks()
assert 'since_id' not in calls[0]['kwargs'].keys(), calls[0]['kwargs']