-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Aitor Magán
committed
Sep 24, 2014
1 parent
b7ca82a
commit d033783
Showing
3 changed files
with
55 additions
and
65 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,37 +107,33 @@ def _helper(self, fullname_field=True, mail_field=True): | |
|
||
return helper | ||
|
||
def test_identify_with_no_credentials(self): | ||
def test_get_token_with_no_credentials(self): | ||
oauth2.toolkit = MagicMock() | ||
state = b64encode(json.dumps({'came_from': 'initial-page'})) | ||
oauth2.toolkit.request = make_request(True, 'data.com', 'callback', {'state': state}) | ||
|
||
helper = self._helper() | ||
|
||
with self.assertRaises(MissingCodeError): | ||
helper.identify() | ||
helper.get_token() | ||
|
||
@httpretty.activate | ||
def test_identify(self): | ||
def test_get_token(self): | ||
oauth2.toolkit = MagicMock() | ||
helper = self._helper() | ||
token = OAUTH2TOKEN | ||
httpretty.register_uri(httpretty.POST, helper.token_endpoint, body=json.dumps(token)) | ||
|
||
state = b64encode(json.dumps({'came_from': 'initial-page'})) | ||
oauth2.toolkit.request = make_request(True, 'data.com', 'callback', {'state': state, 'code': 'code'}) | ||
identity = helper.identify() | ||
self.assertIn('oauth2.token', identity) | ||
retrieved_token = helper.get_token() | ||
|
||
for key in token: | ||
self.assertIn(key, identity['oauth2.token']) | ||
self.assertEquals(token[key], identity['oauth2.token'][key]) | ||
|
||
self.assertIn('came_from', identity) | ||
self.assertEquals(identity['came_from'], 'initial-page') | ||
self.assertIn(key, retrieved_token) | ||
self.assertEquals(token[key], retrieved_token[key]) | ||
|
||
@httpretty.activate | ||
def test_identify_insecure(self): | ||
def test_get_token_insecure(self): | ||
oauth2.toolkit = MagicMock() | ||
helper = self._helper() | ||
token = OAUTH2TOKEN | ||
|
@@ -147,10 +143,10 @@ def test_identify_insecure(self): | |
oauth2.toolkit.request = make_request(False, 'data.com', 'callback', {'state': state, 'code': 'code'}) | ||
|
||
with self.assertRaises(InsecureTransportError): | ||
helper.identify() | ||
helper.get_token() | ||
|
||
@httpretty.activate | ||
def test_identify_error(self): | ||
def test_get_token_error(self): | ||
oauth2.toolkit = MagicMock() | ||
helper = self._helper() | ||
token = { | ||
|
@@ -163,19 +159,20 @@ def test_identify_error(self): | |
oauth2.toolkit.request = make_request(True, 'data.com', 'callback', {'state': state, 'code': 'code'}) | ||
|
||
with self.assertRaises(MissingTokenError): | ||
helper.identify() | ||
helper.get_token() | ||
|
||
@parameterized.expand([ | ||
({},), | ||
([('Set-Cookie', 'cookie1="cookie1val"; Path=/')],), | ||
([('Set-Cookie', 'cookie1="cookie1val"; Path=/'), ('Set-Cookie', 'cookie12="cookie2val"; Path=/')],) | ||
]) | ||
def test_remember(self, headers): | ||
user_name = 'user_name' | ||
|
||
# Configure the mocks | ||
oauth2.toolkit = MagicMock() | ||
environ = MagicMock() | ||
plugins = MagicMock() | ||
identity = MagicMock() | ||
authenticator = MagicMock() | ||
authenticator.remember = MagicMock(return_value=headers) | ||
|
||
|
@@ -185,10 +182,10 @@ def test_remember(self, headers): | |
|
||
# Call the function | ||
helper = self._helper() | ||
helper.remember(identity) | ||
helper.remember(user_name) | ||
|
||
# Check that the remember method has been called properly | ||
authenticator.remember.assert_called_once_with(environ, identity) | ||
authenticator.remember.assert_called_once_with(environ, {'repoze.who.userid': user_name}) | ||
|
||
for header, value in headers: | ||
oauth2.toolkit.response.headers.add.assert_any_call(header, value) | ||
|
@@ -245,11 +242,10 @@ def test_challenge(self, include_referer=True, referer='/', expected_referer='/d | |
('test_user', 'Test User Full Name', None, True, True, False), | ||
('test_user', 'Test User Full Name', '[email protected]', True, False, False), | ||
('test_user', None, None, True, False, False) | ||
]) | ||
@httpretty.activate | ||
def test_authenticate(self, username, fullname=None, email=None, user_exists=True, | ||
fullname_field=True, email_field=True): | ||
def test_identify(self, username, fullname=None, email=None, user_exists=True, | ||
fullname_field=True, email_field=True): | ||
|
||
self.helper = helper = self._helper(fullname_field, email_field) | ||
|
||
|
@@ -278,11 +274,11 @@ def test_authenticate(self, username, fullname=None, email=None, user_exists=Tru | |
oauth2.model.User = MagicMock(return_value=user) | ||
oauth2.model.User.by_name = MagicMock(return_value=user if user_exists else None) | ||
|
||
identity = {} | ||
identity['oauth2.token'] = OAUTH2TOKEN | ||
|
||
# Call the function | ||
helper.authenticate(identity) | ||
returned_username = helper.identify(OAUTH2TOKEN) | ||
|
||
# The function must return the user name | ||
self.assertEquals(username, returned_username) | ||
|
||
# Asserts | ||
oauth2.model.User.by_name.assert_called_once_with(username) | ||
|
@@ -309,26 +305,21 @@ def test_authenticate(self, username, fullname=None, email=None, user_exists=Tru | |
oauth2.model.Session.commit.assert_called_once() | ||
oauth2.model.Session.remove.assert_called_once() | ||
|
||
# The identity object should contain the user name | ||
self.assertIn('repoze.who.userid', identity) | ||
self.assertEquals(username, identity['repoze.who.userid']) | ||
|
||
@parameterized.expand([ | ||
({'error': 'invalid_token', 'error_description': 'Error Description'},), | ||
({'error': 'another_error'},) | ||
]) | ||
@httpretty.activate | ||
def test_authenticate_invalid_token(self, user_info): | ||
def test_identify_invalid_token(self, user_info): | ||
|
||
helper = self._helper() | ||
identity = {} | ||
identity['oauth2.token'] = {'access_token': 'OAUTH_TOKEN'} | ||
token = {'access_token': 'OAUTH_TOKEN'} | ||
|
||
httpretty.register_uri(httpretty.GET, helper.profile_api_url, status=401, body=json.dumps(user_info)) | ||
|
||
exception_risen = False | ||
try: | ||
helper.authenticate(identity) | ||
helper.identify(token) | ||
except Exception as e: | ||
if user_info['error'] == 'invalid_token': | ||
self.assertIsInstance(e, ValueError) | ||
|
@@ -337,16 +328,12 @@ def test_authenticate_invalid_token(self, user_info): | |
|
||
self.assertTrue(exception_risen) | ||
|
||
def test_authenticate_no_token(self): | ||
with self.assertRaises(ValueError): | ||
self._helper().authenticate({}) | ||
|
||
def test_get_token_non_existing_user(self): | ||
def test_get_stored_token_non_existing_user(self): | ||
helper = self._helper() | ||
oauth2.db.UserToken.by_user_name = MagicMock(return_value=None) | ||
self.assertIsNone(helper.get_token('user')) | ||
self.assertIsNone(helper.get_stored_token('user')) | ||
|
||
def test_get_token_existing_user(self): | ||
def test_get_stored_token_existing_user(self): | ||
helper = self._helper() | ||
|
||
usertoken = MagicMock() | ||
|
@@ -356,20 +343,23 @@ def test_get_token_existing_user(self): | |
usertoken.refresh_token = OAUTH2TOKEN['refresh_token'] | ||
|
||
oauth2.db.UserToken.by_user_name = MagicMock(return_value=usertoken) | ||
self.assertEquals(OAUTH2TOKEN, helper.get_token('user')) | ||
self.assertEquals(OAUTH2TOKEN, helper.get_stored_token('user')) | ||
|
||
@parameterized.expand([ | ||
({'came_from': 'http://localhost/dataset'}, ), | ||
({},) | ||
]) | ||
def test_redirect_from_callback(self, identity): | ||
oauth2.toolkit = MagicMock() | ||
came_from = 'initial-page' | ||
state = b64encode(json.dumps({'came_from': came_from})) | ||
oauth2.toolkit.request = make_request(True, 'data.com', 'callback', {'state': state, 'code': 'code'}) | ||
|
||
helper = self._helper() | ||
helper.redirect_from_callback(identity) | ||
helper.redirect_from_callback() | ||
|
||
expected_url = oauth2.INITIAL_PAGE if 'came_from' not in identity else identity['came_from'] | ||
self.assertEquals(302, oauth2.toolkit.response.status) | ||
self.assertEquals(expected_url, oauth2.toolkit.response.location) | ||
self.assertEquals(came_from, oauth2.toolkit.response.location) | ||
|
||
@parameterized.expand([ | ||
(True,), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters