PK ‘(H}!¨§¢¢ssoclient/__init__.py# Copyright 2015 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). __version__ = '2.1.1' PKv…"HL×rŒŒssoclient/tests/__init__.py# Copyright 2013 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). PK~%H'Ø÷\þ{þ{ssoclient/tests/test_v2.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). from __future__ import unicode_literals try: str = unicode except NameError: pass # Forward compatibility with Py3k import json import unittest from datetime import datetime from mock import ( MagicMock, patch, ) from requests.models import Response from ssoclient.v2 import errors from ssoclient.v2.http import ( ERRORS, ApiSession, V2ApiClientResponse, process_response, ) from ssoclient.v2.client import V2ApiClient, datetime_from_string TEST_ENDPOINT = 'http://foo.com/' class DateTimeFromStringTestCase(unittest.TestCase): """Test for the datetime parser.""" def test_parses_with_millis(self): result = datetime_from_string('2013-09-07T00:44:09.33') self.assertEqual(result, datetime(2013, 9, 7, 0, 44, 9, 330000)) def test_parses_no_millis(self): result = datetime_from_string('2013-09-07T00:44:09') self.assertEqual(result, datetime(2013, 9, 7, 0, 44, 9)) class V2ApiClientResponseTestCase(unittest.TestCase): """Tests for the Response object returned on API calls.""" def test_creation(self): content = object() response = V2ApiClientResponse(203, content) self.assertEqual(response.status_code, 203) self.assertIs(response.content, content) def test_always_ok(self): response = V2ApiClientResponse(500, '') self.assertTrue(response.ok) def test_json_is_content(self): content = object() response = V2ApiClientResponse(203, content) self.assertIs(response.json(), content) class ApiSessionTestCase(unittest.TestCase): """Test for the session used during API calls.""" @patch('ssoclient.v2.http.requests.Session.request') def test_api_session_post(self, mock_request): mock_request.return_value = MagicMock(status_code=200) api = ApiSession(TEST_ENDPOINT) api.post('/foo', data=dict(x=1)) mock_request.assert_called_once_with( 'POST', 'http://foo.com/foo', data=u'{"x": 1}', headers={u'Content-Type': u'application/json'}, json=None, ) @patch('ssoclient.v2.http.requests.Session.request') def test_api_session_get(self, mock_request): mock_request.return_value = MagicMock(status_code=200) api = ApiSession(TEST_ENDPOINT) api.get('/foo', params=dict(x=1)) mock_request.assert_called_once_with( 'GET', 'http://foo.com/foo', params={'x': 1}, allow_redirects=True, headers={} ) def test_api_session_retries(self): api = ApiSession(TEST_ENDPOINT) for protocol in ('http://', 'https://'): adapter = api.get_adapter(protocol) # XXX: requests introduces a backwards incompatible change # in version 2.5.0 if isinstance(adapter.max_retries, int): self.assertEqual(adapter.max_retries, 3) else: self.assertEqual(adapter.max_retries.total, 3) def mock_response(status_code=200, content=None, json_dump=True): response = Response() response.status_code = status_code if content is not None and json_dump: content = str(json.dumps(content)).encode('utf-8') response._content = content return response class ProcessResponseTestCase(unittest.TestCase): """Tests for the decorator "process_response".""" @process_response def do_test(self, response): return response def test_error_code_raises_correct_exception(self): for code, exc in ERRORS.items(): response = mock_response(exc.status_code, dict(code=code)) with self.assertRaises(exc): self.do_test(response) def test_400_no_json_content_raises_client_error(self): response = mock_response(status_code=400) with self.assertRaises(errors.ClientError) as e: self.do_test(response) self.assertEqual(e.msg, "No error code in response") def test_500_no_json_content_raises_server_error(self): response = mock_response(status_code=500) with self.assertRaises(errors.ServerError) as e: self.do_test(response) self.assertEqual(e.msg, "No error code in response") def test_400_unknown_code_raises_client_error(self): response = mock_response(400, dict(code="UNKNOWN_CODE")) with self.assertRaises(errors.ClientError) as e: self.do_test(response) self.assertIn(e.msg, "UNKNOWN_CODE") def test_500_unknown_code_raises_server_error(self): response = mock_response(500, dict(code="UNKNOWN_CODE")) with self.assertRaises(errors.ServerError) as e: self.do_test(response) self.assertIn(e.msg, "UNKNOWN_CODE") def test_success_returns(self): expected = dict(foo='bar', zaraza=42) response = mock_response(status_code=200, content=expected) result = self.do_test(response) self.assertEqual(result.status_code, 200) self.assertEqual(result.content, expected) class V2ClientApiTestCase(unittest.TestCase): email = 'foo@foo.com' def setUp(self): super(V2ClientApiTestCase, self).setUp() self.client = V2ApiClient(TEST_ENDPOINT) p = patch('ssoclient.v2.http.requests.Session.request') self.mock_request = p.start() self.addCleanup(p.stop) p = patch('ssoclient.v2.client.OAuth1') self.mock_oauth = p.start() self.addCleanup(p.stop) self.credentials = dict( consumer_key='consumer_key', consumer_secret='consumer_secret', token_key='token_key', token_secret='token_secret', ) def assert_request_called_with(self, *args, **kwargs): # grab parameters independently to ensure json-encoded data is correct mock_args, mock_kwargs = self.mock_request.call_args self.assertEqual(mock_args, args) for k, v in kwargs.items(): actual = mock_kwargs.get(k) if k == 'data': actual = json.loads(actual) self.assertEqual(actual, v) def unparsed_account_details(self, expand=False): """Unparsed account details as how parse_response returns them.""" result = { 'displayname': 'something', 'email': self.email, 'emails': [ {'href': '/api/v2/emails/' + self.email, 'verified': False}, ], 'href': '/api/v2/accounts/FeAQLWE', 'openid': 'FeAQLWE', 'status': 'Active', 'verified': False, 'tokens': [ {'href': '/api/v2/tokens/oauth/EqEnreoOFJPBOIbcKdwuqtCVZdJ', 'token_name': 'api test bis'}, {'href': '/api/v2/tokens/oauth/TGHuvFlSSVrpqUVjpMBwzbEHfij', 'token_name': 'api test'} ], } if expand: result['emails'] = [{ 'email': self.email, 'date_created': '2013-09-12T18:23:51.019', 'href': '/api/v2/emails/' + self.email, 'verified': False, }] result['tokens'] = [ {'token_name': 'api test bis', 'date_updated': '2013-09-12T19:06:11.387', 'token_key': 'EqEnreoOFJPBOIbcKdwuqtCVZdJnh', 'consumer_secret': 'YuPUIQYYoqJHIUmoxvZvhODNwSYrvl', 'href': '/api/v2/tokens/oauth/EqEnreoOFJPBOIbcKdwuqtCVZdJnh', 'date_created': '2013-09-12T19:06:11.387', 'consumer_key': 'FeAQLWE', 'token_secret': 'CnITkuwuhKDBNOocTPICKAsuUZkOcDprgQQhZdXOZE'}, {'token_name': 'api test', 'date_updated': '2013-09-12T18:31:19.468', 'token_key': 'TGHuvFlSSVrpqUVjpMBwzbEHfijTCw', 'consumer_secret': 'YuPUIQYYoqJHIUmoxvZvhODNwSYrvl', 'href': '/api/v2/tokens/oauth/TGHuvFlSSVrpqUVjpMBwzbEHfijTCw', 'date_created': '2013-09-12T18:28:17.816', 'consumer_key': 'FeAQLWE', 'token_secret': 'YcQgfuhpajCphDCnfNNceggfolHsXqdsXWSRufbzqe'}, ] return result def unparsed_email_details(self): """Unparsed email details as how parse_response returns them.""" return { 'date_created': '2013-09-12T18:23:51.019', 'href': '/api/v2/emails/' + self.email, 'verified': False, 'email': 'foo@a.com', } def unparsed_token_details(self): """Unparsed token details as how parse_response returns them.""" return { 'date_created': '2013-09-12T18:28:17.816', 'date_updated': '2013-09-12T18:31:19.468', 'consumer_key': 'FeAQLWE', 'consumer_secret': 'YuPUIQYYoqJHIUmoxvZvhODNwSYrvl', 'href': '/api/v2/tokens/oauth/TGHuvFlSSVrpqUVjpMBwzbEHfijTCwXtRi', 'openid': 'FeAQLWE', 'token_key': 'TGHuvFlSSVrpqUVjpMBwzbEHfijTCwXtRiGca', 'token_name': 'api test', 'token_secret': 'YcQgfuhpajCphDCnfNNceggfolHsXqdsXWSRufbzqeRJvEP', } def assert_unicode_credentials(self, credentials): self.mock_oauth.assert_called_once_with( credentials['consumer_key'], credentials['consumer_secret'], credentials['token_key'], credentials['token_secret'], ) self.assertTrue(all(isinstance(val, str) for val in self.mock_oauth.call_args[0])) class RegisterV2ClientApiTestCase(V2ClientApiTestCase): def assert_invalid_response(self, status_code, ExceptionClass): # Test the client can handle an error response that doesn't have # a json body - ideally our server will never send these response = mock_response( status_code=status_code, content='some error message', json_dump=False) self.mock_request.return_value = response with self.assertRaises(ExceptionClass) as ctx: self.client.register(email='blah') if status_code >= 500: self.assertIn('some error message', str(ctx.exception)) def test_register_invalid_data(self): self.mock_request.return_value = mock_response( 400, dict(code="INVALID_DATA")) with self.assertRaises(errors.InvalidData): self.client.register(email='blah') def test_register_captcha_required(self): self.mock_request.return_value = mock_response( 401, dict(code="CAPTCHA_REQUIRED")) with self.assertRaises(errors.CaptchaRequired): self.client.register(email='blah') def test_register_captcha_failed(self): self.mock_request.return_value = mock_response( 403, dict(code="CAPTCHA_FAILURE")) with self.assertRaises(errors.CaptchaFailure): self.client.register(email='blah') def test_register_captcha_error(self): self.mock_request.return_value = mock_response( 502, dict(code="CAPTCHA_ERROR")) with self.assertRaises(errors.CaptchaError): self.client.register(email='blah') def test_register_already_registered(self): self.mock_request.return_value = mock_response( 409, dict(code="ALREADY_REGISTERED")) with self.assertRaises(errors.AlreadyRegistered): self.client.register(email='blah') def test_register_success(self): content = self.unparsed_account_details() self.mock_request.return_value = mock_response( status_code=201, content=content) response = self.client.register(email='blah') self.assertEqual(response, content) def test_invalid_response_400(self): self.assert_invalid_response(400, errors.ClientError) def test_invalid_response_500(self): self.assert_invalid_response(500, errors.ServerError) class GetOrCreateAccountTestCase(V2ClientApiTestCase): def assert_invalid_response(self, status_code, ExceptionClass): # Test the client can handle an error response that doesn't have # a json body - ideally our server will never send these response = mock_response( status_code=status_code, content='some error message', json_dump=False) self.mock_request.return_value = response with self.assertRaises(ExceptionClass) as ctx: self.client.get_or_create_account( token=self.credentials, email='blah') if status_code >= 500: self.assertIn('some error message', str(ctx.exception)) def test_register_invalid_data(self): self.mock_request.return_value = mock_response( 400, dict(code="INVALID_DATA")) with self.assertRaises(errors.InvalidData): self.client.get_or_create_account( token=self.credentials, email='blah') self.assert_unicode_credentials(self.credentials) def test_register_already_registered(self): content = self.unparsed_account_details() self.mock_request.return_value = mock_response( 200, content=content) response, created = self.client.get_or_create_account( token=self.credentials, email='blah') self.assertFalse(created) self.assertEqual(response, content) self.assert_unicode_credentials(self.credentials) def test_register_success(self): content = self.unparsed_account_details() self.mock_request.return_value = mock_response( status_code=201, content=content) response, created = self.client.get_or_create_account( token=self.credentials, email='blah') self.assertTrue(created) self.assertEqual(response, content) self.assert_unicode_credentials(self.credentials) def test_invalid_response_400(self): self.assert_invalid_response(400, errors.ClientError) self.assert_unicode_credentials(self.credentials) def test_invalid_response_500(self): self.assert_invalid_response(500, errors.ServerError) self.assert_unicode_credentials(self.credentials) class LoginV2ClientApiTestCase(V2ClientApiTestCase): def test_login_invalid_data(self): self.mock_request.return_value = mock_response( 400, dict(code="INVALID_DATA")) with self.assertRaises(errors.InvalidData): self.client.login(email='blah') def test_login_account_suspended(self): self.mock_request.return_value = mock_response( 401, dict(code="ACCOUNT_SUSPENDED")) with self.assertRaises(errors.AccountSuspended): self.client.login(email='blah') def test_login_account_deactivated(self): self.mock_request.return_value = mock_response( 401, dict(code="ACCOUNT_DEACTIVATED")) with self.assertRaises(errors.AccountDeactivated): self.client.login(email='blah') def test_login_invalid_credentials(self): self.mock_request.return_value = mock_response( 401, dict(code="INVALID_CREDENTIALS")) with self.assertRaises(errors.InvalidCredentials): self.client.login(email='blah') def test_login_password_policy_error(self): self.mock_request.return_value = mock_response( 403, dict(code="PASSWORD_POLICY_ERROR")) with self.assertRaises(errors.PasswordPolicyError): self.client.login(email='blah') def test_login_twofactor_required(self): self.mock_request.return_value = mock_response( 401, dict(code="TWOFACTOR_REQUIRED")) with self.assertRaises(errors.TwoFactorRequired): self.client.login(email='blah') def test_login_twofactor_failure(self): self.mock_request.return_value = mock_response( 403, dict(code="TWOFACTOR_FAILURE")) with self.assertRaises(errors.TwoFactorFailure): self.client.login(email='blah') def test_login_account_locked(self): self.mock_request.return_value = mock_response( 403, dict(code="ACCOUNT_LOCKED")) with self.assertRaises(errors.AccountLocked): self.client.login(email='blah') def test_login_email_invalidated(self): self.mock_request.return_value = mock_response( 403, dict(code="EMAIL_INVALIDATED")) with self.assertRaises(errors.EmailInvalidated): self.client.login(email='blah') def test_login_success(self): content = self.unparsed_token_details() self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.login(email='blah', password='ble') expected = content.copy() expected['date_created'] = datetime(2013, 9, 12, 18, 28, 17, 816000) expected['date_updated'] = datetime(2013, 9, 12, 18, 31, 19, 468000) self.assertEqual(response, expected) class PasswordResetV2ClientApiTestCase(V2ClientApiTestCase): def test_request_password_reset(self): content = {'email': self.email} self.mock_request.return_value = mock_response( status_code=201, content=content) response = self.client.request_password_reset(self.email) self.assertEqual(response, content) self.assert_request_called_with( 'POST', 'http://foo.com/tokens/password', headers={'Content-Type': 'application/json'}, data={'token': None, 'email': self.email}) def test_request_password_reset_without_email(self): self.mock_request.return_value = mock_response( 400, dict(code="INVALID_DATA")) with self.assertRaises(errors.InvalidData): self.client.request_password_reset(None) def test_request_password_reset_with_empty_email(self): self.mock_request.return_value = mock_response( 400, dict(code="INVALID_DATA")) with self.assertRaises(errors.InvalidData): self.client.request_password_reset('') def test_request_password_reset_with_token(self): content = {'email': self.email} self.mock_request.return_value = mock_response( status_code=201, content=content) response = self.client.request_password_reset( self.email, 'token1234') self.assertEqual(response, content) self.assert_request_called_with( 'POST', 'http://foo.com/tokens/password', headers={'Content-Type': 'application/json'}, data={'token': 'token1234', 'email': self.email}) def test_request_password_reset_for_suspended_account(self): self.mock_request.return_value = mock_response( 403, dict(code="ACCOUNT_SUSPENDED")) with self.assertRaises(errors.AccountSuspended): self.client.request_password_reset(self.email) def test_request_password_reset_for_deactivated_account(self): self.mock_request.return_value = mock_response( 403, dict(code="ACCOUNT_DEACTIVATED")) with self.assertRaises(errors.AccountDeactivated): self.client.request_password_reset(self.email) def test_request_password_reset_with_invalid_email(self): self.mock_request.return_value = mock_response( 403, dict(code="RESOURCE_NOT_FOUND")) with self.assertRaises(errors.ResourceNotFound): self.client.request_password_reset(self.email) def test_request_password_reset_not_allowed(self): self.mock_request.return_value = mock_response( 403, dict(code="CAN_NOT_RESET_PASSWORD")) with self.assertRaises(errors.CanNotResetPassword): self.client.request_password_reset(self.email) def test_request_password_reset_with_invalidated_email(self): self.mock_request.return_value = mock_response( 403, dict(code="EMAIL_INVALIDATED")) with self.assertRaises(errors.EmailInvalidated): self.client.request_password_reset(self.email) def test_request_password_reset_with_too_many_tokens(self): self.mock_request.return_value = mock_response( 403, dict(code="TOO_MANY_TOKENS")) with self.assertRaises(errors.TooManyTokens): self.client.request_password_reset(self.email) class AccountDetailsV2ClientApiTestCase(V2ClientApiTestCase): def test_account_details(self): content = self.unparsed_account_details() self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.account_details('some_openid', self.credentials) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'GET', 'http://foo.com/accounts/some_openid?expand=false', auth=oauth1, headers={}, allow_redirects=True, ) self.assertEqual(response, content) def test_account_details_expanded(self): content = self.unparsed_account_details(expand=True) self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.account_details( 'some_openid', self.credentials, expand=True) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'GET', 'http://foo.com/accounts/some_openid?expand=true', auth=oauth1, headers={}, allow_redirects=True, ) expected = content.copy() # dates are parsed t = expected['tokens'][0] t['date_updated'] = datetime(2013, 9, 12, 19, 6, 11, 387000) t['date_created'] = datetime(2013, 9, 12, 19, 6, 11, 387000) t = expected['tokens'][1] t['date_updated'] = datetime(2013, 9, 12, 18, 31, 19, 468000) t['date_created'] = datetime(2013, 9, 12, 18, 28, 17, 816000) e = expected['emails'][0] e['date_created'] = datetime(2013, 9, 12, 18, 23, 51, 19000) self.assertEqual(response, expected) def test_account_details_anonymous(self): content = { 'href': '/api/v2/accounts/FeAQLWE', 'openid': 'FeAQLWE', 'verified': False, } self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.account_details('some_openid') self.mock_request.assert_called_once_with( 'GET', 'http://foo.com/accounts/some_openid?expand=false', auth=None, headers={}, allow_redirects=True, ) self.assertEqual(response, content) class EmailsV2ClientApiTestCase(V2ClientApiTestCase): def test_details(self): content = self.unparsed_email_details() self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.email_details('email', self.credentials) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'GET', 'http://foo.com/emails/email', auth=oauth1, headers={}, allow_redirects=True, ) expected = content.copy() expected['date_created'] = datetime(2013, 9, 12, 18, 23, 51, 19000) self.assertEqual(response, expected) def test_details_invalid_credentials(self): self.mock_request.return_value = mock_response( 401, dict(code="INVALID_CREDENTIALS")) with self.assertRaises(errors.InvalidCredentials): self.client.email_details('blah', {}) def test_details_not_found(self): self.mock_request.return_value = mock_response( 404, dict(code="RESOURCE_NOT_FOUND")) with self.assertRaises(errors.ResourceNotFound): self.client.email_details('blah', {}) def test_delete(self): self.mock_request.return_value = mock_response( status_code=204, content='') response = self.client.email_delete('email', self.credentials) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'DELETE', 'http://foo.com/emails/email', auth=oauth1, headers={}, ) self.assertEqual(response, True) def test_delete_invalid_credentials(self): self.mock_request.return_value = mock_response( 401, dict(code="INVALID_CREDENTIALS")) with self.assertRaises(errors.InvalidCredentials): self.client.email_delete('blah', {}) def test_delete_not_found(self): self.mock_request.return_value = mock_response( 404, dict(code="RESOURCE_NOT_FOUND")) with self.assertRaises(errors.ResourceNotFound): self.client.email_delete('blah', {}) class TokensV2ClientApiTestCase(V2ClientApiTestCase): def test_details(self): content = self.unparsed_token_details() self.mock_request.return_value = mock_response( status_code=200, content=content) response = self.client.token_details('token_key', self.credentials) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'GET', 'http://foo.com/tokens/oauth/token_key', auth=oauth1, headers={}, allow_redirects=True, ) expected = content.copy() expected['date_updated'] = datetime(2013, 9, 12, 18, 31, 19, 468000) expected['date_created'] = datetime(2013, 9, 12, 18, 28, 17, 816000) self.assertEqual(response, expected) def test_details_invalid_credentials(self): self.mock_request.return_value = mock_response( 401, dict(code="INVALID_CREDENTIALS")) with self.assertRaises(errors.InvalidCredentials): self.client.token_details('blah', {}) def test_details_not_found(self): self.mock_request.return_value = mock_response( 404, dict(code="RESOURCE_NOT_FOUND")) with self.assertRaises(errors.ResourceNotFound): self.client.token_details('blah', {}) def test_delete(self): self.mock_request.return_value = mock_response( status_code=204, content='') response = self.client.token_delete('token_key', self.credentials) self.assert_unicode_credentials(self.credentials) oauth1 = self.mock_oauth.return_value self.mock_request.assert_called_once_with( 'DELETE', 'http://foo.com/tokens/oauth/token_key', auth=oauth1, headers={}, ) self.assertEqual(response, True) def test_delete_invalid_credentials(self): self.mock_request.return_value = mock_response( 401, dict(code="INVALID_CREDENTIALS")) with self.assertRaises(errors.InvalidCredentials): self.client.token_delete('blah', {}) def test_delete_not_found(self): self.mock_request.return_value = mock_response( 404, dict(code="RESOURCE_NOT_FOUND")) with self.assertRaises(errors.ResourceNotFound): self.client.token_delete('blah', {}) class ValidateRequestV2ClientApiTestCase(V2ClientApiTestCase): http_url = 'http://example.com' def assert_validate_request_called(self, **kwargs): data = dict(http_url=self.http_url, http_method='GET') data.update(kwargs) self.assert_request_called_with( 'POST', TEST_ENDPOINT + 'requests/validate', headers={'Content-Type': 'application/json'}, data=data) def test_valid_request(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) result = self.client.validate_request( http_url=self.http_url, http_method='GET', authorization='123456789') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called(authorization='123456789') def test_invalid_request(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=False)) result = self.client.validate_request( http_url=self.http_url, http_method='GET', authorization='123456789') self.assertEqual(result, {'is_valid': False}) self.assert_validate_request_called(authorization='123456789') def test_missing_authorization(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) result = self.client.validate_request( http_url=self.http_url, http_method='GET', query_string='oauth_signature=12345678&oauth_token=something') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called( query_string='oauth_signature=12345678&oauth_token=something') def test_missing_query_string(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) result = self.client.validate_request( http_url=self.http_url, http_method='GET', authorization='123456789') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called(authorization='123456789') def test_both_defined(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) result = self.client.validate_request( http_url=self.http_url, http_method='GET', authorization='123456789', query_string='oauth_signature=12345678&oauth_token=something') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called( authorization='123456789', query_string='oauth_signature=12345678&oauth_token=something') def test_unicode_url(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) http_url = 'http://localhost/~/test/doc/dÃ¥c-id' result = self.client.validate_request( http_url=http_url, http_method='GET', authorization='something') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called( http_url=http_url, authorization='something') def test_non_ascii_url_utf8_encoded(self): self.mock_request.return_value = mock_response( 200, dict(is_valid=True)) http_url = 'http://localhost/~/test/doc/dÃ¥c-id' result = self.client.validate_request( http_url=http_url.encode('utf-8'), http_method='GET', authorization='something') self.assertEqual(result, {'is_valid': True}) self.assert_validate_request_called( http_url=http_url, authorization='something') def test_non_ascii_url_not_utf8_encoded(self): http_url = 'http://localhost/~/test/doc/dÃ¥c-id'.encode('latin-1') with self.assertRaises(errors.ClientError) as ctx: self.client.validate_request( http_url=http_url, http_method='GET', authorization='foobar') e = ctx.exception self.assertIn(repr(http_url), str(e)) self.assertFalse(self.mock_request.called) PKv…"Hiç+³ssoclient/v2/__init__.py# Copyright 2013 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). from .errors import ( AccountDeactivated, AccountLocked, AccountSuspended, AlreadyRegistered, ApiException, CanNotResetPassword, CaptchaError, CaptchaFailure, CaptchaRequired, ClientError, EmailInvalidated, InvalidCredentials, InvalidData, PasswordPolicyError, TooManyTokens, TwoFactorFailure, ResourceNotFound, ServerError, TooManyRequests, TwoFactorRequired, UnexpectedApiError, ) from .client import V2ApiClient PKv…"H¥F%BŠŠssoclient/v2/client.py# Copyright 2013 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). from __future__ import unicode_literals try: str = unicode except NameError: pass # Forward compatibility with Py3k import logging from datetime import datetime from requests_oauthlib import OAuth1 from ssoclient.v2.http import ApiSession from ssoclient.v2 import errors logger = logging.getLogger(__name__) def datetime_from_string(value): """Parse a string value representing a date in isoformat.""" try: result = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S.%f') except ValueError: result = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S') return result def parse_datetimes(value): """Recursively look for dates and try to parse them into datetimes.""" assert isinstance(value, dict) result = value.copy() for k, v in value.items(): if isinstance(v, dict): result[k] = parse_datetimes(v) elif isinstance(v, list): result[k] = [parse_datetimes(i) for i in v] elif 'date' in k: result[k] = datetime_from_string(v) return result class V2ApiClient(object): """High-level client for theV2.0 API SSO resources.""" def __init__(self, endpoint): self.session = ApiSession(endpoint) def _unicode_credentials(self, credentials): # if openid and credentials come directly from a call to client.login # then whether they are unicode or byte-strings depends on which # json library is in use. # oauthlib requires them to be unicode - so we coerce to be sure. if credentials is not None: consumer_key = str(credentials.get('consumer_key', '')) consumer_secret = str(credentials.get('consumer_secret', '')) token_key = str(credentials.get('token_key', '')) token_secret = str(credentials.get('token_secret', '')) oauth = OAuth1( consumer_key, consumer_secret, token_key, token_secret, ) else: oauth = None return oauth def _merge(self, data, extra): """Allow data to passed to functions by keyword or by dict.""" if data: data.update(extra) else: data = extra return data def register(self, data=None, **kwargs): response = self.session.post( '/accounts', data=self._merge(data, kwargs)) result = parse_datetimes(response.content) return result def login(self, data=None, **kwargs): response = self.session.post( '/tokens/oauth', data=self._merge(data, kwargs)) result = parse_datetimes(response.content) return result def get_or_create_account(self, token, **kwargs): oauth = self._unicode_credentials(token) response = self.session.post( '/accounts', data=kwargs, auth=oauth) result = parse_datetimes(response.content) created = response.status_code == 201 return result, created def account_details(self, openid, token=None, expand=False): openid = str(openid) oauth = self._unicode_credentials(token) url = '/accounts/%s?expand=%s' % (openid, str(expand).lower()) response = self.session.get(url, auth=oauth) result = parse_datetimes(response.content) return result def email_delete(self, email, credentials): oauth = self._unicode_credentials(credentials) response = self.session.delete('/emails/%s' % email, auth=oauth) return response.status_code == 204 def email_details(self, email, credentials): oauth = self._unicode_credentials(credentials) response = self.session.get('/emails/%s' % email, auth=oauth) result = parse_datetimes(response.content) return result def token_delete(self, token_key, credentials): oauth = self._unicode_credentials(credentials) response = self.session.delete( '/tokens/oauth/%s' % token_key, auth=oauth) return response.status_code == 204 def token_details(self, token_key, credentials): oauth = self._unicode_credentials(credentials) response = self.session.get('/tokens/oauth/%s' % token_key, auth=oauth) result = parse_datetimes(response.content) return result def validate_request(self, data=None, **kwargs): """Validate an OAuth signature. The OAuth signature can be given either as the value of the Authorization header, or as a query string with the OAuth information in it. Expected parameters are: * 'http_url' * 'http_method' * 'authorization' and/or 'query_string' (at least one) Return a dictionary with a 'is_valid' field that indicates whether the given OAuth signature is valid or not. """ data = self._merge(data, kwargs) http_url = data.get('http_url', '') if not isinstance(http_url, str): try: data['http_url'] = http_url.decode('utf-8') except UnicodeError: msg = ('Given http_url %r can not be utf-8 decoded' % http_url) logger.error( 'validate_request: %s, raising ClientError.', msg) raise errors.ClientError(msg=msg) response = self.session.post('/requests/validate', data=data) return response.content def request_password_reset(self, email, token=None): response = self.session.post( '/tokens/password', data=dict(email=email, token=token)) return response.content PKv…"HãMÁljjssoclient/v2/http.py# Copyright 2013 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). from __future__ import unicode_literals import functools import json import requests from ssoclient.v2 import errors JSON_MIME_TYPE = 'application/json' ERRORS = { e.error_code: e for e in [ errors.AccountDeactivated, errors.AccountLocked, errors.AlreadyRegistered, errors.AccountSuspended, errors.CanNotResetPassword, errors.CaptchaError, errors.CaptchaFailure, errors.CaptchaRequired, errors.EmailInvalidated, errors.InvalidCredentials, errors.InvalidData, errors.PasswordPolicyError, errors.ResourceNotFound, errors.TooManyRequests, errors.TooManyTokens, errors.TwoFactorFailure, errors.TwoFactorRequired, ] } class V2ApiClientResponse(object): """A successful response from an API call to the v2 SSO API.""" def __init__(self, status_code, content): super(V2ApiClientResponse, self).__init__() self.status_code = status_code self.content = content self.ok = True # backwards compat with requests' Response def json(self): """Return this instance's content, available for backwards compat.""" return self.content def process_response(func): """Decorator to parse 'func' responses and manage errors.""" @functools.wraps(func) def wrapper(*args, **kwargs): """Wrapper for func.""" response = func(*args, **kwargs) try: json_body = response.json() except (ValueError, TypeError): # simplejson raises JSONDecodeError for invalid json # json raises ValueError. JSONDecodeError is a subclass # of ValueError - so this catches either. json_body = {} if response.ok: # return the json-decoded value and the status code return V2ApiClientResponse( status_code=response.status_code, content=json_body) # from now on, just error management code = json_body.get('code') exc = ERRORS.get(code) if exc is not None: # raise a specific exception raise exc(response, json_body) if code: msg = "Unknown error code '%s' in response" % code else: msg = "No error code in response" if response.content: msg += ' (%r)' % response.content if response.status_code < 500: exc = errors.ClientError else: exc = errors.ServerError raise exc(response, msg, json_body) return wrapper class ApiSession(requests.Session): """An SSO api specfic Session Adds support for a url endpoint, 500 exceptions, JSON request body handling and retries. """ def __init__(self, endpoint): super(ApiSession, self).__init__() self.endpoint = endpoint.rstrip('/') + '/' # sent with every request self.headers['Accept'] = JSON_MIME_TYPE for protocol in ('http://', 'https://'): self.mount(protocol, requests.adapters.HTTPAdapter(max_retries=3)) @process_response def request(self, method, url, **kwargs): if 'headers' not in kwargs: kwargs['headers'] = {} if 'data' in kwargs: kwargs['data'] = json.dumps(kwargs['data']) kwargs['headers']['Content-Type'] = JSON_MIME_TYPE url = self.endpoint + url.lstrip('/') response = super(ApiSession, self).request(method, url, **kwargs) return response PKv…"H¸E_j½ ½ ssoclient/v2/errors.py# Copyright 2013 Canonical Ltd. This software is licensed under # the GNU Affero General Public License version 3 (see the file # LICENSE). from __future__ import unicode_literals class UnexpectedApiError(Exception): """An unexpected client error.""" def __init__(self, response=None, msg="", json=None): self.response = response self.json_body = json if response: msg = "%s : %s - %s" % (response.status_code, response.text, msg) super(UnexpectedApiError, self).__init__(msg) class ServerError(UnexpectedApiError): """An unexpected 5xx response.""" class ClientError(UnexpectedApiError): """An unexpected 4xx response.""" class ApiException(Exception): """An expected/understood 4xx or 5xx response. Parse the standard api error reponse format given by SSO. """ def __init__(self, response, body=None, msg=None): body = body or {} self.response = response self.body = body self.error_message = body.get('message') self.extra = body.get('extra', {}) if msg is None: # code *should* be the same as the error_code attribute # but won't be for raising an ApiException directly instead # of a subclass - so still fetch it from the payload body code = body.get('code') msg = "%s: %s" % (response.status_code, code) extra = ', '.join('%s: %r' % i for i in self.extra.items()) if extra: msg += ' (%s)' % extra super(ApiException, self).__init__(msg) class InvalidData(ApiException): error_code = "INVALID_DATA" status_code = 400 class CaptchaRequired(ApiException): error_code = "CAPTCHA_REQUIRED" status_code = 401 class InvalidCredentials(ApiException): error_code = "INVALID_CREDENTIALS" status_code = 401 class TwoFactorRequired(ApiException): error_code = "TWOFACTOR_REQUIRED" status_code = 401 class AccountSuspended(ApiException): error_code = "ACCOUNT_SUSPENDED" status_code = 403 class AccountDeactivated(ApiException): error_code = "ACCOUNT_DEACTIVATED" status_code = 403 class AccountLocked(ApiException): error_code = "ACCOUNT_LOCKED" status_code = 403 class EmailInvalidated(ApiException): error_code = "EMAIL_INVALIDATED" status_code = 403 class CanNotResetPassword(ApiException): error_code = "CAN_NOT_RESET_PASSWORD" status_code = 403 class CaptchaFailure(ApiException): error_code = "CAPTCHA_FAILURE" status_code = 403 class TooManyTokens(ApiException): error_code = "TOO_MANY_TOKENS" status_code = 403 class TwoFactorFailure(ApiException): error_code = "TWOFACTOR_FAILURE" status_code = 403 class PasswordPolicyError(ApiException): error_code = "PASSWORD_POLICY_ERROR" status_code = 403 class ResourceNotFound(ApiException): error_code = "RESOURCE_NOT_FOUND" status_code = 404 class AlreadyRegistered(ApiException): error_code = "ALREADY_REGISTERED" status_code = 409 class TooManyRequests(ApiException): error_code = "TOO_MANY_REQUESTS" status_code = 429 class CaptchaError(ApiException): error_code = "CAPTCHA_ERROR" status_code = 502 PK*’(H^-Ò )ssoclient-2.1.1.dist-info/DESCRIPTION.rstUNKNOWN PK*’(HRâbüü'ssoclient-2.1.1.dist-info/metadata.json{"extensions": {"python.details": {"contacts": [{"email": "canonical-isd@lists.launchpad.net", "name": "Canonical ISD Hackers", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}}}, "extras": [], "generator": "bdist_wheel (0.26.0)", "license": "AGPLv3", "metadata_version": "2.0", "name": "ssoclient", "run_requires": [{"requires": ["requests", "requests-oauthlib"]}], "summary": "UNKNOWN", "test_requires": [{"requires": ["mock", "requests", "requests-oauthlib"]}], "version": "2.1.1"}PK*’(Hx#lŠ 'ssoclient-2.1.1.dist-info/top_level.txtssoclient PK*’(Hìndªnnssoclient-2.1.1.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py2-none-any Tag: py3-none-any PK*’(Hª½  "ssoclient-2.1.1.dist-info/METADATAMetadata-Version: 2.0 Name: ssoclient Version: 2.1.1 Summary: UNKNOWN Home-page: UNKNOWN Author: Canonical ISD Hackers Author-email: canonical-isd@lists.launchpad.net License: AGPLv3 Platform: UNKNOWN Requires-Dist: requests Requires-Dist: requests-oauthlib UNKNOWN PK*’(H¥)Xc-- ssoclient-2.1.1.dist-info/RECORDssoclient/__init__.py,sha256=Da2Ik-K_omcgTndK_dbJI0PD7bgxFCNMBAfRRnV3BII,162 ssoclient/tests/__init__.py,sha256=ZkytKk5nfWs9nHOGGV0pYagf8wVn8mCeA5cUQmUehlY,140 ssoclient/tests/test_v2.py,sha256=kicZjPagAOePBz8c-2QXoZBO0pZ-eyxD0LlKhJS54ds,31742 ssoclient/v2/__init__.py,sha256=jVX50DuuVa_LdvjujJnZeVDHeQ6l87UcOgMn0xh74_k,639 ssoclient/v2/client.py,sha256=wNcu2gHyQURUvDnUsuOFf7lwftZRPnLVToXZmJRxrnA,5770 ssoclient/v2/errors.py,sha256=jsaav4rNtmXETtI0nls7CZF1FrDQP9h0Cj1uXWNn_rg,3261 ssoclient/v2/http.py,sha256=HMrScbXNL8RLyYqa0Rref3jZOOD3KGjmBKRU8p5V57I,3690 ssoclient-2.1.1.dist-info/DESCRIPTION.rst,sha256=OCTuuN6LcWulhHS3d5rfjdsQtW22n7HENFRh6jC6ego,10 ssoclient-2.1.1.dist-info/METADATA,sha256=4QTh3R332BV7BzNhwB1Ur49Ap39IB_mgFX8e35qVPS4,269 ssoclient-2.1.1.dist-info/RECORD,, ssoclient-2.1.1.dist-info/WHEEL,sha256=GrqQvamwgBV4nLoJe0vhYRSWzWsx7xjlt74FT0SWYfE,110 ssoclient-2.1.1.dist-info/metadata.json,sha256=E0gGlcw5S7kNUJpz0kng8oipND7E34kAJ72iAnsOSRY,508 ssoclient-2.1.1.dist-info/top_level.txt,sha256=1vrvoxqpLICDUb7HK2yP-LWV_AQvla9q6lx7KTgCgEQ,10 PK ‘(H}!¨§¢¢ssoclient/__init__.pyPKv…"HL×rŒŒÕssoclient/tests/__init__.pyPK~%H'Ø÷\þ{þ{šssoclient/tests/test_v2.pyPKv…"Hiç+³Ð}ssoclient/v2/__init__.pyPKv…"H¥F%BŠŠ…€ssoclient/v2/client.pyPKv…"HãMÁljjC—ssoclient/v2/http.pyPKv…"H¸E_j½ ½ ߥssoclient/v2/errors.pyPK*’(H^-Ò )вssoclient-2.1.1.dist-info/DESCRIPTION.rstPK*’(HRâbüü'!³ssoclient-2.1.1.dist-info/metadata.jsonPK*’(Hx#lŠ 'bµssoclient-2.1.1.dist-info/top_level.txtPK*’(Hìndªnn±µssoclient-2.1.1.dist-info/WHEELPK*’(Hª½  "\¶ssoclient-2.1.1.dist-info/METADATAPK*’(H¥)Xc-- ©·ssoclient-2.1.1.dist-info/RECORDPK м