PKք"H click_toolbelt/config.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import os from xdg.BaseDirectory import load_first_config, save_config_path from click_toolbelt import __namespace__ from click_toolbelt.compat import ConfigParser, urlparse from click_toolbelt.constants import ( CLICK_TOOLBELT_PROJECT_NAME, UBUNTU_SSO_API_ROOT_URL, ) def load_config(): """Read and return configuration from disk.""" config_dir = load_first_config(CLICK_TOOLBELT_PROJECT_NAME) filename = os.path.join(config_dir, "%s.cfg" % __namespace__) parser = ConfigParser() if os.path.exists(filename): parser.read(filename) api_endpoint = os.environ.get( 'UBUNTU_SSO_API_ROOT_URL', UBUNTU_SSO_API_ROOT_URL) location = urlparse(api_endpoint).netloc config = {} if parser.has_section(location): config.update(dict(parser.items(location))) return config def save_config(data): """Store current configuration to disk.""" config_dir = save_config_path(CLICK_TOOLBELT_PROJECT_NAME) filename = os.path.join(config_dir, "%s.cfg" % __namespace__) parser = ConfigParser() if os.path.exists(filename): parser.read(filename) api_endpoint = os.environ.get( 'UBUNTU_SSO_API_ROOT_URL', UBUNTU_SSO_API_ROOT_URL) location = urlparse(api_endpoint).netloc if not parser.has_section(location): parser.add_section(location) for key, value in data.items(): parser.set(location, key, str(value)) with open(filename, 'w') as fd: parser.write(fd) def clear_config(): """Remove configuration section from files on disk.""" config_dir = save_config_path(CLICK_TOOLBELT_PROJECT_NAME) filename = os.path.join(config_dir, "%s.cfg" % __namespace__) parser = ConfigParser() if os.path.exists(filename): parser.read(filename) api_endpoint = os.environ.get( 'UBUNTU_SSO_API_ROOT_URL', UBUNTU_SSO_API_ROOT_URL) location = urlparse(api_endpoint).netloc parser.remove_section(location) with open(filename, 'w') as fd: parser.write(fd) PKGv(HaFFclick_toolbelt/toolbelt.py#!/usr/bin/env python # Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import sys from cliff.app import App from cliff.commandmanager import CommandManager from click_toolbelt import __namespace__, __version__ class ClickToolbelt(App): def __init__(self): super(ClickToolbelt, self).__init__( description='Tools for working with click packages', version=__version__, command_manager=CommandManager(__namespace__)) def main(args=None): if args is None: args = sys.argv[1:] toolbelt = ClickToolbelt() return toolbelt.run(args) if __name__ == '__main__': # pragma: no cover args = sys.argv[1:] sys.exit(main(args)) PKGv(HFвffclick_toolbelt/info.py# -*- coding: utf-8 -*- # Copyright 2014 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import logging from cliff.command import Command from click_toolbelt.common import CommandError from storeapi.info import get_info class Info(Command): log = logging.getLogger(__name__) topics = ['version', 'department', 'license', 'country', 'channel'] def get_parser(self, prog_name): parser = super(Info, self).get_parser(prog_name) parser.add_argument('topic', nargs='?', choices=self.topics) return parser def get_version_info(self, data): return data.get('version') def get_department_info(self, data): return data.get('department') def get_license_info(self, data): return data.get('license') def get_country_info(self, data): return data.get('country') def get_channel_info(self, data): return data.get('channel') def get_topic_data(self, api_data, topic): method = getattr(self, 'get_{}_info'.format(topic)) return method(api_data) def take_action(self, parsed_args): topic = parsed_args.topic result = get_info() if not result.get('success', False): self.log.info( 'Could not get information. An error ocurred:\n\n%s\n\n', '\n'.join(result['errors'])) # raise an exception to exit with proper code raise CommandError() api_data = result['data'] self.log.info('API info:') if topic: topics = [topic] else: topics = self.topics for topic in topics: data = self.get_topic_data(api_data, topic) if data: self.log.info(' %s: %s', topic, data) PKGv(HT<#click_toolbelt/common.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import cliff.command from click_toolbelt.config import clear_config, load_config, save_config from storeapi.common import get_oauth_session class CommandError(Exception): """Exception to mark command errored out.""" class Command(cliff.command.Command): def load_config(self): """Read and return configuration from disk.""" return load_config() def save_config(self, data): """Store current configuration to disk.""" save_config(data) def clear_config(self): """Remove remove configuration section from files on disk.""" clear_config() def get_oauth_session(self): """Return a client configured to allow oauth signed requests.""" config = load_config() return get_oauth_session(config) def take_action(self, parsed_args): pass # pragma: no cover PKGv(H= click_toolbelt/channels.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json import logging from click_toolbelt.common import ( Command, CommandError, ) from storeapi.channels import get_channels, update_channels class Channels(Command): """Get/Update channels configuration for a package.""" log = logging.getLogger(__name__) def get_parser(self, prog_name): parser = super(Channels, self).get_parser(prog_name) parser.add_argument('package_name') parser.add_argument('config_filename', nargs='?') parser.add_argument('--publish', action="store_true") return parser def get_channels_config(self, config_filename, publish=False): """Return config data as expected by channels endpoint.""" if config_filename: with open(config_filename, 'r') as fd: data = json.load(fd) else: data = {} config = {'channels': data, 'publish': publish} return config def show_channels(self, data): """Display given channels config.""" for config in data: value = config.get('current') if value is not None: value = 'Revision %d (version %s)' % ( value.get('revision'), value.get('version')) self.log.info('%s: %s', config.get('channel'), value) def take_action(self, parsed_args): package_name = parsed_args.package_name config_filename = parsed_args.config_filename publish = parsed_args.publish # OAuth session is required session = self.get_oauth_session() if session is None: self.log.info('No valid credentials found.') # raise an exception to exit with proper code raise CommandError() if not publish and config_filename is None: # no changes requested, ask for current config result = get_channels(session, package_name) else: config = self.get_channels_config( config_filename, publish=publish) result = update_channels(session, package_name, config) if not result.get('success', False): self.log.info( 'Could not get information. An error ocurred:\n\n%s\n\n', '\n'.join(result['errors'])) # raise an exception to exit with proper code raise CommandError() errors = result.get('errors', []) for error in errors: self.log.info('ERROR: %s', error) data = result['data'] self.show_channels(data) PK(HVvDclick_toolbelt/__init__.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals __namespace__ = 'click_toolbelt' __version__ = '0.5.1' PKք"HB9(click_toolbelt/constants.py# Copyright 2013, 2014 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals CLICK_TOOLBELT_PROJECT_NAME = 'click-toolbelt' CLICK_UPDOWN_UPLOAD_URL = 'https://upload.apps.ubuntu.com/' MYAPPS_API_ROOT_URL = 'https://myapps.developer.ubuntu.com/dev/api/' UBUNTU_SSO_API_ROOT_URL = 'https://login.ubuntu.com/api/v2/' SCAN_STATUS_POLL_DELAY = 5 SCAN_STATUS_POLL_RETRIES = 5 PKք"H4)cKKclick_toolbelt/compat.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals try: # pragma: no cover from builtins import open # noqa from configparser import ConfigParser # noqa from urllib.parse import quote_plus, urljoin, urlparse except ImportError: # pragma: no cover from __builtin__ import open # noqa from ConfigParser import ConfigParser # noqa from urllib import quote_plus # noqa from urlparse import urljoin, urlparse # noqa PKGv(HV#click_toolbelt/upload.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import logging from click_toolbelt.common import ( Command, CommandError, ) from storeapi import upload class Upload(Command): log = logging.getLogger(__name__) def get_parser(self, prog_name): parser = super(Upload, self).get_parser(prog_name) parser.add_argument('binary_filename') metadata_group = parser.add_mutually_exclusive_group() metadata_group.add_argument('metadata_filename', nargs='?') metadata_group.add_argument('--metadata', metavar='metadata_filename', dest='metadata_filename') return parser def take_action(self, parsed_args): self.log.info('Running scan-upload command...') binary_filename = parsed_args.binary_filename metadata_filename = parsed_args.metadata_filename success = upload(binary_filename, metadata_filename) if not success: # raise an exception to exit with proper code raise CommandError() PKGv(H?jclick_toolbelt/login.py# -*- coding: utf-8 -*- # Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import logging from click_toolbelt.common import ( Command, CommandError, ) from click_toolbelt.constants import ( CLICK_TOOLBELT_PROJECT_NAME, ) from storeapi import login class Login(Command): log = logging.getLogger(__name__) def get_parser(self, prog_name): parser = super(Login, self).get_parser(prog_name) parser.add_argument('email') parser.add_argument('password') parser.add_argument('otp', nargs='?') return parser def take_action(self, parsed_args): login_data = { 'email': parsed_args.email, 'password': parsed_args.password, 'token_name': CLICK_TOOLBELT_PROJECT_NAME, } if parsed_args.otp: login_data['otp'] = parsed_args.otp response = login(**login_data) if response.get('success', False): self.log.info('Login successful.') self.save_config(response['body']) else: error = response['body'] self.log.info( 'Login failed.\n' 'Reason: %s\n' 'Detail: %s', error['code'], error['message']) # raise an exception to exit with proper code raise CommandError() PKք"H[7C !click_toolbelt/tests/test_info.py# -*- coding: utf-8 -*- # Copyright 2014 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals from collections import namedtuple from unittest import TestCase from mock import call, patch from click_toolbelt.common import CommandError from click_toolbelt.info import ( Info, ) class InfoCommandTestCase(TestCase): def setUp(self): super(InfoCommandTestCase, self).setUp() app = None args = None self.command = Info(app, args) patcher = patch('click_toolbelt.info.Info.log') self.mock_log = patcher.start() self.addCleanup(patcher.stop) self.parsed_args = namedtuple('parsed_args', 'topic') self.args = self.parsed_args(None) def test_parser(self): parser = self.command.get_parser('prog_name') # only one argument -- the first item is the default help option self.assertEqual(len(parser._actions), 2) self.assertEqual(parser._actions[0].dest, 'help') # topic is optional self.assertEqual(parser._actions[1].dest, 'topic') self.assertFalse(parser._actions[1].required) def test_take_action(self): with patch('click_toolbelt.info.get_info') as mock_get_info: mock_get_info.return_value = { 'success': True, 'errors': [], 'data': {'version': 1}, } self.command.take_action(self.args) self.assertEqual(self.mock_log.info.call_args_list, [ call('API info:'), call(' %s: %s', 'version', 1), ]) def _test_take_action_with_topic(self, data, topic, expected): with patch('click_toolbelt.info.get_info') as mock_get_info: mock_get_info.return_value = { 'success': True, 'errors': [], 'data': data, } args = self.parsed_args(topic) self.command.take_action(args) self.mock_log.info.assert_any_call('API info:') self.mock_log.info.assert_any_call( ' %s: %s', topic, expected) def test_take_action_with_version(self): data = {'version': 1} self._test_take_action_with_topic(data, 'version', 1) def test_take_action_with_department(self): expected = ['foo'] data = {'department': expected} self._test_take_action_with_topic(data, 'department', expected) def test_take_action_with_license(self): expected = ['foo'] data = {'license': expected} self._test_take_action_with_topic(data, 'license', expected) def test_take_action_with_country(self): expected = [['FO', 'foo']] data = {'country': expected} self._test_take_action_with_topic(data, 'country', expected) def test_take_action_with_channel(self): expected = ['stable', 'beta'] data = {'channel': expected} self._test_take_action_with_topic(data, 'channel', expected) def test_take_action_with_error(self): with patch('click_toolbelt.info.get_info') as mock_get_info: mock_get_info.return_value = { 'success': False, 'errors': ['some error'], 'data': None, } with self.assertRaises(CommandError): self.command.take_action(self.args) self.mock_log.info.assert_called_once_with( 'Could not get information. An error ocurred:\n\n%s\n\n', 'some error') PKք"H 8%click_toolbelt/tests/test_toolbelt.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import sys from unittest import TestCase from mock import patch from click_toolbelt import __version__ from click_toolbelt.toolbelt import ClickToolbelt, main class ClickToolbeltTestCase(TestCase): def test_constructor(self): toolbelt = ClickToolbelt() self.assertEqual(toolbelt.parser.description, 'Tools for working with click packages') self.assertEqual(toolbelt.command_manager.namespace, 'click_toolbelt') with self.assertRaises(SystemExit): self.assertIn(__version__, toolbelt.run(['--version'])) class MainTestCase(TestCase): def test_with_args(self): with patch('click_toolbelt.toolbelt.ClickToolbelt.run') as mock_run: args = [] main(args) mock_run.assert_called_with(args) def test_no_args_uses_sys(self): with patch('click_toolbelt.toolbelt.ClickToolbelt.run') as mock_run: main() mock_run.assert_called_with(sys.argv[1:]) PKք"H=+xx#click_toolbelt/tests/test_config.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import os import tempfile from unittest import TestCase from mock import patch import click_toolbelt.config from click_toolbelt.compat import ConfigParser from click_toolbelt.config import clear_config, load_config, save_config class ConfigTestCase(TestCase): def setUp(self): super(ConfigTestCase, self).setUp() patcher = patch('click_toolbelt.config.load_first_config') self.mock_load_first_config = patcher.start() self.addCleanup(patcher.stop) patcher = patch('click_toolbelt.config.save_config_path') self.mock_save_config_path = patcher.start() self.addCleanup(patcher.stop) cfg_file = self.get_temporary_file() self.filename = cfg_file.name config_dir = os.path.dirname(self.filename) self.mock_load_first_config.return_value = config_dir self.mock_save_config_path.return_value = config_dir patcher = patch.object( click_toolbelt.config, '__namespace__', os.path.basename(os.path.splitext(self.filename)[0])) patcher.start() self.addCleanup(patcher.stop) # make sure env is not overwritten patcher = patch.object(os, 'environ', {}) patcher.start() self.addCleanup(patcher.stop) def get_temporary_file(self, suffix='.cfg'): return tempfile.NamedTemporaryFile(suffix=suffix) class LoadConfigTestCase(ConfigTestCase): def test_load_config_with_no_existing_file(self): data = load_config() self.assertEqual(data, {}) def test_load_config_with_no_existing_section(self): cfg = ConfigParser() cfg.add_section('some.domain') cfg.set('some.domain', 'foo', '1') with open(self.filename, 'w') as fd: cfg.write(fd) data = load_config() self.assertEqual(data, {}) def test_load_config(self): cfg = ConfigParser() cfg.add_section('login.ubuntu.com') cfg.set('login.ubuntu.com', 'foo', '1') with open(self.filename, 'w') as fd: cfg.write(fd) data = load_config() self.assertEqual(data, {'foo': '1'}) class SaveConfigTestCase(ConfigTestCase): def test_save_config_with_no_existing_file(self): data = {'key': 'value'} save_config(data) self.assertEqual(load_config(), data) def test_save_config_with_existing_file(self): cfg = ConfigParser() cfg.add_section('some.domain') cfg.set('some.domain', 'foo', '1') with open(self.filename, 'w') as fd: cfg.write(fd) data = {'key': 'value'} save_config(data) config = load_config() self.assertEqual(config, data) class ClearConfigTestCase(ConfigTestCase): def test_clear_config_with_no_existing_section(self): cfg = ConfigParser() cfg.add_section('some.domain') cfg.set('some.domain', 'foo', '1') with open(self.filename, 'w') as fd: cfg.write(fd) config = load_config() assert config == {} clear_config() config = load_config() self.assertEqual(config, {}) def test_clear_config_removes_existing_section(self): cfg = ConfigParser() cfg.add_section('login.ubuntu.com') cfg.set('login.ubuntu.com', 'foo', '1') with open(self.filename, 'w') as fd: cfg.write(fd) config = load_config() assert config != {} clear_config() config = load_config() self.assertEqual(config, {}) def test_clear_config_with_no_existing_file(self): config = load_config() assert config == {} clear_config() config = load_config() self.assertEqual(config, {}) PKք"Hzδ""%click_toolbelt/tests/test_channels.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json import tempfile from collections import namedtuple from mock import call, patch from click_toolbelt.common import CommandError from click_toolbelt.constants import MYAPPS_API_ROOT_URL from click_toolbelt.channels import ( Channels, ) from click_toolbelt.tests.test_common import CommandTestCase class ChannelsCommandTestCase(CommandTestCase): command_class = Channels def setUp(self): super(ChannelsCommandTestCase, self).setUp() patcher = patch('click_toolbelt.channels.Channels.log') self.mock_log = patcher.start() self.addCleanup(patcher.stop) self.mock_get = self.mock_get_oauth_session.return_value.get self.mock_post = self.mock_get_oauth_session.return_value.post self.parsed_args = namedtuple( 'parsed_args', 'package_name, config_filename, publish') self.args = self.parsed_args('package.name', None, False) self.channels_data = [ {'channel': 'stable', 'current': {'revision': 2, 'version': '1'}}, {'channel': 'beta', 'current': {'revision': 4, 'version': '1.5'}}, {'channel': 'edge', 'current': None}, ] def assert_show_channels_status(self, errors=None): expected_calls = [] if errors is None: errors = [] for error in errors: error_call = call('ERROR: %s', error) expected_calls.append(error_call) channels = self.channels_data for config in channels: expected = None upload = config['current'] if upload is not None: expected = 'Revision %d (version %s)' % ( upload['revision'], upload['version']) channel_call = call('%s: %s', config['channel'], expected) expected_calls.append(channel_call) self.assertEqual(self.mock_log.info.call_args_list, expected_calls) def assert_update(self, package_name, expected_data): self.mock_post.assert_called_once_with( '%spackage-channels/%s/' % (MYAPPS_API_ROOT_URL, package_name), data=json.dumps(expected_data), headers={'Content-Type': 'application/json'}) def set_channels_get_success_response(self): mock_response = self.mock_get.return_value mock_response.ok = True mock_response.json.return_value = self.channels_data def set_channels_get_error_response(self, error_msg): mock_response = self.mock_get.return_value mock_response.ok = False mock_response.text = error_msg def set_channels_post_success_response(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'errors': [], 'channels': self.channels_data } def set_channels_post_failed_response(self, error_msg): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'errors': [error_msg], 'channels': self.channels_data } def set_channels_post_error_response(self, error_msg): mock_response = self.mock_post.return_value mock_response.ok = False mock_response.text = error_msg def set_channels_config_file(self, data): mock_channels_file = tempfile.NamedTemporaryFile() with open(mock_channels_file.name, 'wb') as mock_file: data = json.dumps(data) mock_file.write(data.encode('utf-8')) mock_file.flush() return mock_channels_file def test_parser(self): parser = self.command.get_parser('prog_name') # only one argument -- the first item is the default help option self.assertEqual(len(parser._actions), 4) self.assertEqual(parser._actions[0].dest, 'help') # package_name is required self.assertEqual(parser._actions[1].dest, 'package_name') self.assertTrue(parser._actions[1].required) # config_filename is optional self.assertEqual(parser._actions[2].dest, 'config_filename') self.assertFalse(parser._actions[2].required) # publish is optional self.assertEqual(parser._actions[3].dest, 'publish') self.assertFalse(parser._actions[3].required) def test_get_channels_config_from_file(self): channels = {'stable': 2, 'beta': 1} mock_channels_file = self.set_channels_config_file(channels) config = self.command.get_channels_config( mock_channels_file.name, False) expected = {'channels': channels, 'publish': False} self.assertEqual(config, expected) def test_get_channels_config_no_file(self): config = self.command.get_channels_config(None, False) expected = {'channels': {}, 'publish': False} self.assertEqual(config, expected) def test_get_channels_config_do_publish(self): config = self.command.get_channels_config(None, True) expected = {'channels': {}, 'publish': True} self.assertEqual(config, expected) def test_get_channels_config_from_file_and_do_publish(self): channels = {'stable': 2, 'beta': 1} mock_channels_file = self.set_channels_config_file(channels) config = self.command.get_channels_config( mock_channels_file.name, True) expected = {'channels': channels, 'publish': True} self.assertEqual(config, expected) def test_show_channels(self): self.command.show_channels(self.channels_data) self.assert_show_channels_status() def test_take_action_invalid_credentials(self): self.mock_get_oauth_session.return_value = None with self.assertRaises(CommandError): self.command.take_action(self.args) self.mock_log.info.assert_called_once_with( 'No valid credentials found.') def test_take_action_no_update(self): self.set_channels_get_success_response() self.command.take_action(self.args) self.assert_show_channels_status() def test_take_action_with_error(self): error_msg = 'some error' self.set_channels_get_error_response(error_msg) with self.assertRaises(CommandError): self.command.take_action(self.args) self.mock_log.info.assert_called_once_with( 'Could not get information. An error ocurred:\n\n%s\n\n', 'some error') def test_take_action_update_with_error(self): error_msg = 'some error' self.set_channels_post_error_response(error_msg) args = self.parsed_args('package.name', None, True) with self.assertRaises(CommandError): self.command.take_action(args) self.mock_log.info.assert_called_once_with( 'Could not get information. An error ocurred:\n\n%s\n\n', 'some error') def test_take_action_update_publish(self): self.set_channels_post_success_response() args = self.parsed_args('package.name', None, True) self.command.take_action(args) expected = {'channels': {}, 'publish': True} self.assert_update('package.name', expected) self.assert_show_channels_status() def test_take_action_update_channels(self): self.set_channels_post_success_response() channels = {'stable': 2, 'beta': 1} mock_channels_file = self.set_channels_config_file(channels) args = self.parsed_args('package.name', mock_channels_file.name, False) self.command.take_action(args) expected = {'channels': channels, 'publish': False} self.assert_update('package.name', expected) self.assert_show_channels_status() def test_take_action_update_channels_and_publish(self): self.set_channels_post_success_response() channels = {'stable': 2, 'beta': 1} mock_channels_file = self.set_channels_config_file(channels) args = self.parsed_args('package.name', mock_channels_file.name, True) self.command.take_action(args) expected = {'channels': channels, 'publish': True} self.assert_update('package.name', expected) self.assert_show_channels_status() def test_take_action_update_channels_fails(self): self.set_channels_post_failed_response('some error') args = self.parsed_args('package.name', None, True) self.command.take_action(args) expected = {'channels': {}, 'publish': True} self.assert_update('package.name', expected) self.assert_show_channels_status(errors=['some error']) PKք"H(/ click_toolbelt/tests/__init__.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). PKGv(H k"click_toolbelt/tests/test_login.py# -*- coding: utf-8 -*- # Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import os import shutil import tempfile from collections import namedtuple from unittest import TestCase from xdg.BaseDirectory import save_config_path from mock import patch from click_toolbelt import __namespace__ from click_toolbelt.common import CommandError from click_toolbelt.compat import ConfigParser from click_toolbelt.constants import ( CLICK_TOOLBELT_PROJECT_NAME, UBUNTU_SSO_API_ROOT_URL, ) from click_toolbelt.login import Login class LoginCommandTestCase(TestCase): def setUp(self): super(LoginCommandTestCase, self).setUp() app = None args = None self.command = Login(app, args) self.email = 'user@domain.com' self.password = 'password' parsed_args = namedtuple('parsed_args', 'email, password, otp') self.parsed_args = parsed_args(self.email, self.password, '') # setup patches mock_environ = { 'UBUNTU_SSO_API_ROOT_URL': UBUNTU_SSO_API_ROOT_URL, } patcher = patch('storeapi._login.os.environ', mock_environ) patcher.start() self.addCleanup(patcher.stop) patcher = patch('click_toolbelt.login.Login.log') self.mock_log = patcher.start() self.addCleanup(patcher.stop) def test_parser(self): parser = self.command.get_parser(__namespace__) for i, (name, required) in enumerate( [('email', True), ('password', True), ('otp', False)]): # argument 0 is builtin --help # start comparing from first extra argument self.assertEqual(parser._actions[i + 1].dest, name) self.assertEqual(parser._actions[i + 1].required, required) def setup_config_dir(self, include_click_dir=True): config_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, config_dir) patcher = patch('xdg.BaseDirectory.xdg_config_home', config_dir) patcher.start() self.addCleanup(patcher.stop) if include_click_dir: save_config_path(CLICK_TOOLBELT_PROJECT_NAME) return config_dir def test_save_config(self): config_dir = self.setup_config_dir() data = {'foo': '1', 'bar': '2', 'baz': '3'} expected_filename = os.path.join(config_dir, CLICK_TOOLBELT_PROJECT_NAME, "{}.cfg".format(__namespace__)) self.command.save_config(data) cfg = ConfigParser() cfg.read(expected_filename) self.assertTrue(cfg.has_section('login.ubuntu.com')) self.assertEqual(dict(cfg.items('login.ubuntu.com')), data) def test_save_config_with_existing_data(self): config_dir = self.setup_config_dir() expected_filename = os.path.join(config_dir, CLICK_TOOLBELT_PROJECT_NAME, "{}.cfg".format(__namespace__)) cfg = ConfigParser() cfg.add_section('login.ubuntu.com') cfg.set('login.ubuntu.com', 'foo', 'foo') with open(expected_filename, 'w') as fd: cfg.write(fd) data = {'foo': '1', 'bar': '2', 'baz': '3'} self.command.save_config(data) cfg.read(expected_filename) self.assertTrue(cfg.has_section('login.ubuntu.com')) self.assertEqual(dict(cfg.items('login.ubuntu.com')), data) def test_config_dir_created_if_necessary(self): config_dir = self.setup_config_dir(include_click_dir=False) data = {'foo': '1', 'bar': '2', 'baz': '3'} expected_filename = os.path.join(config_dir, CLICK_TOOLBELT_PROJECT_NAME, "{}.cfg".format(__namespace__)) self.command.save_config(data) cfg = ConfigParser() cfg.read(expected_filename) self.assertTrue(cfg.has_section('login.ubuntu.com')) self.assertEqual(dict(cfg.items('login.ubuntu.com')), data) @patch('click_toolbelt.login.Login.save_config') @patch('click_toolbelt.login.login') def test_take_action_success(self, mock_login, mock_save_config): parsed_args = namedtuple('parsed_args', 'email, password, otp') args = parsed_args(self.email, self.password, '123456') login_data = { 'email': self.email, 'password': self.password, 'token_name': CLICK_TOOLBELT_PROJECT_NAME, 'otp': '123456', } token_data = { 'consumer_key': 'consumer-key', 'consumer_secret': 'consumer-secret', 'token_key': 'token-key', 'token_secret': 'token-secret', } mock_login.return_value = { 'success': True, 'body': token_data, } self.command.take_action(args) mock_login.assert_called_once_with(**login_data) mock_save_config.assert_called_once_with(token_data) # assert output self.mock_log.info.assert_called_once_with('Login successful.') @patch('click_toolbelt.login.Login.save_config') @patch('click_toolbelt.login.login') def test_take_action_failure(self, mock_login, mock_save_config): login_data = { 'email': self.email, 'password': self.password, 'token_name': CLICK_TOOLBELT_PROJECT_NAME, } error_data = { 'code': 'UNAUTHORISED', 'message': 'Provided email/password is not correct.', 'extra': {}, } mock_login.return_value = { 'success': False, 'body': error_data, } with self.assertRaises(CommandError): self.command.take_action(self.parsed_args) mock_login.assert_called_once_with(**login_data) self.assertFalse(mock_save_config.called) # assert output self.mock_log.info.assert_called_once_with( 'Login failed.\n' 'Reason: %s\n' 'Detail: %s', 'UNAUTHORISED', 'Provided email/password is not correct.') PKi(H\i(#click_toolbelt/tests/test_upload.py# Copyright 2013 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json import tempfile from collections import namedtuple from mock import patch from click_toolbelt import __namespace__ from click_toolbelt.common import CommandError from click_toolbelt.upload import ( Upload, ) from click_toolbelt.tests.test_common import ( CommandTestCase, ) class UploadCommandTestCase(CommandTestCase): command_class = Upload def setUp(self): super(UploadCommandTestCase, self).setUp() self.mock_get = self.mock_get_oauth_session.return_value.get self.mock_post = self.mock_get_oauth_session.return_value.post p = patch('storeapi._upload.logger') self.mock_logger = p.start() self.addCleanup(p.stop) p = patch('storeapi._upload.upload_files') self.mock_upload_files = p.start() self.addCleanup(p.stop) p = patch('storeapi._upload.upload_app') self.mock_upload_app = p.start() self.addCleanup(p.stop) self.package_name = 'namespace.binary' self.binary_filename = self.package_name + '_0.1_all.click' self.parsed_args = namedtuple( 'parsed_args', 'binary_filename, metadata_filename') self.args = self.parsed_args(self.binary_filename, None) def test_parser(self): parser = self.command.get_parser(__namespace__) # binary_filename is required self.assertEqual(parser._actions[1].dest, 'binary_filename') self.assertTrue(parser._actions[1].required) # metadata_filename is optional self.assertEqual(parser._actions[2].dest, 'metadata_filename') # this is a positional argument self.assertEqual(parser._actions[2].option_strings, []) self.assertFalse(parser._actions[2].required) self.assertEqual(parser._actions[3].dest, 'metadata_filename') # this is a named argument self.assertEqual(parser._actions[3].option_strings, ['--metadata']) self.assertFalse(parser._actions[3].required) def test_take_action(self): binary_filename = self.args.binary_filename self.command.take_action(self.args) self.mock_upload_files.assert_called_once_with( binary_filename, config=None) self.mock_upload_app.assert_called_once_with( self.package_name, self.mock_upload_files.return_value, metadata={}, config=None) self.mock_logger.info.assert_any_call( 'Application uploaded successfully.' ) def test_take_action_works_for_snaps(self): binary_filename = self.package_name + '_0.1_all.snap' args = self.parsed_args(binary_filename, None) self.command.take_action(args) self.mock_upload_files.assert_called_once_with( binary_filename, config=None) self.mock_upload_app.assert_called_once_with( self.package_name, self.mock_upload_files.return_value, metadata={}, config=None) self.mock_logger.info.assert_any_call( 'Application uploaded successfully.' ) def test_take_action_with_metadata_file(self): mock_metadata_file = tempfile.NamedTemporaryFile() with open(mock_metadata_file.name, 'wb') as mock_file: data = json.dumps({'changelog': 'some changes'}) mock_file.write(data.encode('utf-8')) mock_file.flush() args = self.parsed_args(self.binary_filename, mock_metadata_file.name) self.command.take_action(args) self.mock_upload_files.assert_called_once_with( self.binary_filename, config=None) self.mock_upload_app.assert_called_once_with( self.package_name, self.mock_upload_files.return_value, metadata={'changelog': 'some changes'}, config=None) self.mock_logger.info.assert_any_call( 'Application uploaded successfully.' ) def test_take_action_with_metadata_file_without_source_file(self): mock_metadata_file = tempfile.NamedTemporaryFile() with open(mock_metadata_file.name, 'wb') as mock_file: data = json.dumps({'changelog': 'some changes'}) mock_file.write(data.encode('utf-8')) mock_file.flush() args = self.parsed_args(self.binary_filename, mock_metadata_file.name) self.command.take_action(args) self.mock_upload_files.assert_called_once_with( self.binary_filename, config=None) self.mock_upload_app.assert_called_once_with( self.package_name, self.mock_upload_files.return_value, metadata={'changelog': 'some changes'}, config=None) self.mock_logger.info.assert_any_call( 'Application uploaded successfully.' ) def test_take_action_with_error_during_file_upload(self): binary_filename = self.args.binary_filename self.mock_upload_files.return_value = { 'success': False, 'errors': ['some error'] } with self.assertRaises(CommandError): self.command.take_action(self.args) self.mock_upload_files.assert_called_once_with( binary_filename, config=None) self.assertFalse(self.mock_upload_app.called) self.mock_logger.info.assert_any_call( 'Upload failed:\n\n%s\n', 'some error') def test_take_action_with_error_during_app_upload(self): binary_filename = self.args.binary_filename self.mock_upload_app.return_value = { 'success': False, 'errors': ['some error'], } with self.assertRaises(CommandError): self.command.take_action(self.args) self.mock_upload_files.assert_called_once_with( binary_filename, config=None) self.mock_upload_app.assert_called_once_with( self.package_name, self.mock_upload_files.return_value, metadata={}, config=None) self.mock_logger.info.assert_any_call( 'Upload did not complete.') self.mock_logger.info.assert_any_call( 'Some errors were detected:\n\n%s\n\n', 'some error') def test_take_action_with_invalid_package_name(self): args = self.parsed_args('binary', None) with self.assertRaises(CommandError): self.command.take_action(args) self.assertFalse(self.mock_upload_files.called) self.assertFalse(self.mock_upload_app.called) self.mock_logger.info.assert_any_call('Invalid package filename.') def test_take_action_shows_application_url(self): self.mock_upload_app.return_value = { 'success': True, 'errors': [], 'application_url': 'http://example.com/', } self.command.take_action(self.args) self.mock_logger.info.assert_any_call( 'Please check out the application at: %s.\n', 'http://example.com/') PKGv(H#click_toolbelt/tests/test_common.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals from mock import patch from click_toolbelt.common import ( Command, ) from click_toolbelt.tests.test_config import ConfigTestCase class CommandTestCase(ConfigTestCase): command_class = Command def setUp(self): super(CommandTestCase, self).setUp() app = None args = None self.command = self.command_class(app, args) patcher = patch('click_toolbelt.common.get_oauth_session') self.mock_get_oauth_session = patcher.start() self.addCleanup(patcher.stop) def test_proxy_get_oauth_session(self): session = self.command.get_oauth_session() self.assertEqual(session, self.mock_get_oauth_session.return_value) @patch('click_toolbelt.common.load_config') def test_proxy_load_config(self, mock_load_config): config = self.command.load_config() self.assertEqual(config, mock_load_config.return_value) @patch('click_toolbelt.common.save_config') def test_proxy_save_config(self, mock_save_config): data = {} self.command.save_config(data) mock_save_config.assert_called_once_with(data) @patch('click_toolbelt.common.clear_config') def test_proxy_clear_config(self, mock_clear_config): self.command.clear_config() mock_clear_config.assert_called_once_with() PKGv(H/storeapi/info.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals from storeapi.common import myapps_api_call def get_info(): """Return information about the MyApps API. Returned data contains information about: - version - department - license - country - channel """ return myapps_api_call('') PKGv(HKstoreapi/common.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). import json import os import time from functools import wraps import requests from requests_oauthlib import OAuth1Session from storeapi.compat import urljoin from storeapi.constants import MYAPPS_API_ROOT_URL def get_oauth_session(config): """Return a client configured to allow oauth signed requests.""" try: session = OAuth1Session( config['consumer_key'], client_secret=config['consumer_secret'], resource_owner_key=config['token_key'], resource_owner_secret=config['token_secret'], signature_method='PLAINTEXT', ) except KeyError: session = None return session def myapps_api_call(path, session=None, method='GET', data=None): """Issue a request for a particular endpoint of the MyApps API.""" result = {'success': False, 'errors': [], 'data': None} if session is not None: client = session else: client = requests root_url = os.environ.get('MYAPPS_API_ROOT_URL', MYAPPS_API_ROOT_URL) url = urljoin(root_url, path) if method == 'GET': response = client.get(url) elif method == 'POST': response = client.post(url, data=data and json.dumps(data) or None, headers={'Content-Type': 'application/json'}) else: raise ValueError('Method {} not supported'.format(method)) if response.ok: result['success'] = True result['data'] = response.json() else: result['errors'] = [response.text] return result def is_scan_completed(response): """Return True if the response indicates the scan process completed.""" if response.ok: return response.json().get('completed', False) return False def retry(terminator=None, retries=3, delay=3, backoff=2, logger=None): """Decorate a function to automatically retry calling it on failure. Arguments: - terminator: this should be a callable that returns a boolean; it is used to determine if the function call was successful and the retry loop should be stopped - retries: an integer specifying the maximum number of retries - delay: initial number of seconds to wait for the first retry - backoff: exponential factor to use to adapt the delay between subsequent retries - logger: logging.Logger instance to use for logging The decorated function will return as soon as any of the following conditions are met: 1. terminator evaluates function output as True 2. there are no more retries left If the terminator callable is not provided, the function will be called exactly once and will not be retried. """ def decorated(func): if retries != int(retries) or retries < 0: raise ValueError( 'retries value must be a positive integer or zero') if delay < 0: raise ValueError('delay value must be positive') if backoff != int(backoff) or backoff < 1: raise ValueError('backoff value must be a positive integer') @wraps(func) def wrapped(*args, **kwargs): retries_left, current_delay = retries, delay result = func(*args, **kwargs) if terminator is not None: while not terminator(result) and retries_left > 0: msg = "... retrying in %d seconds" % current_delay if logger: logger.warning(msg) # sleep time.sleep(current_delay) retries_left -= 1 current_delay *= backoff # retry result = func(*args, **kwargs) return result, retries_left == 0 return wrapped return decorated PK(H$7!!storeapi/_upload.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json import logging import os import re from storeapi.common import ( get_oauth_session, is_scan_completed, retry, ) from storeapi.compat import open, quote_plus, urljoin from storeapi.constants import ( CLICK_UPDOWN_UPLOAD_URL, MYAPPS_API_ROOT_URL, SCAN_STATUS_POLL_DELAY, SCAN_STATUS_POLL_RETRIES, ) logger = logging.getLogger(__name__) def upload(binary_filename, metadata_filename='', metadata=None, config=None): """Create a new upload based on a click/snap package.""" # validate package filename pattern = (r'(.*/)?(?P[\w\-_\.]+)_' '(?P[\d\.]+)_(?P\w+)\.(click|snap)') match = re.match(pattern, binary_filename) if not match: logger.info('Invalid package filename.') return name = match.groupdict()['name'] logger.info('Uploading files...') data = upload_files(binary_filename, config=config) success = data.get('success', False) errors = data.get('errors', []) if not success: logger.info('Upload failed:\n\n%s\n', '\n'.join(errors)) return False logger.info('Uploading new version...') meta = read_metadata(metadata_filename) meta.update(metadata or {}) result = upload_app(name, data, metadata=meta, config=config) success = result.get('success', False) errors = result.get('errors', []) app_url = result.get('application_url', '') revision = result.get('revision') if success: logger.info('Application uploaded successfully.') if revision: logger.info('Uploaded as revision %s.', revision) else: logger.info('Upload did not complete.') if errors: logger.info('Some errors were detected:\n\n%s\n\n', '\n'.join(errors)) if app_url: logger.info('Please check out the application at: %s.\n', app_url) return success def upload_files(binary_filename, config=None): """Upload a binary file to the Store. Submit a file to the click-updown service and return the corresponding upload_id. """ updown_url = os.environ.get('CLICK_UPDOWN_UPLOAD_URL', CLICK_UPDOWN_UPLOAD_URL) unscanned_upload_url = urljoin(updown_url, 'unscanned-upload/') files = {'binary': open(binary_filename, 'rb')} result = {'success': False, 'errors': []} session = get_oauth_session(config) if session is None: result['errors'] = ['No valid credentials found.'] return result try: response = session.post( unscanned_upload_url, files=files) if response.ok: response_data = response.json() result.update({ 'success': response_data.get('successful', True), 'upload_id': response_data['upload_id'], 'binary_filesize': os.path.getsize(binary_filename), 'source_uploaded': 'source' in files, }) else: logger.error( 'There was an error uploading the package.\n' 'Reason: %s\n' 'Text: %s', response.reason, response.text) result['errors'] = [response.text] except Exception as err: logger.exception( 'An unexpected error was found while uploading files.') result['errors'] = [str(err)] finally: # make sure to close any open files used for request for fd in files.values(): fd.close() return result def read_metadata(metadata_filename): """Return a dictionary of metadata as read from a json file.""" if metadata_filename: with open(metadata_filename, 'r') as metadata_file: # file is automatically closed by context manager metadata = json.load(metadata_file) else: metadata = {} return metadata def upload_app(name, upload_data, metadata=None, config=None): """Request a new upload to be created for a given upload_id.""" upload_url = get_upload_url(name) result = {'success': False, 'errors': [], 'application_url': '', 'revision': None} session = get_oauth_session(config) if session is None: result['errors'] = ['No valid credentials found.'] return result if metadata is None: metadata = {} try: data = get_post_data(upload_data, metadata=metadata) files = get_post_files(metadata=metadata) response = session.post(upload_url, data=data, files=files) if response.ok: response_data = response.json() status_url = response_data['status_url'] logger.info('Package submitted to %s', upload_url) logger.info('Checking package status...') completed, data = get_scan_data(session, status_url) if completed: logger.info('Package scan completed.') message = data.get('message', '') if not message: result['success'] = True result['revision'] = data.get('revision') else: result['errors'] = [message] else: result['errors'] = [ 'Package scan took too long.', ] status_web_url = response_data.get('web_status_url') if status_web_url: result['errors'].append( 'Please check the status later at: %s.' % ( status_web_url), ) result['application_url'] = data.get('application_url', '') else: logger.error( 'There was an error uploading the application.\n' 'Reason: %s\n' 'Text: %s', response.reason, response.text) result['errors'] = [response.text] except Exception as err: logger.exception( 'There was an error uploading the application.') result['errors'] = [str(err)] finally: # make sure to close any open files used for request for fname, fd in files: fd.close() return result def get_upload_url(name): """Return the url of the uploaded package.""" myapps_api_url = os.environ.get('MYAPPS_API_ROOT_URL', MYAPPS_API_ROOT_URL) upload_url = urljoin(myapps_api_url, 'click-package-upload/') upload_url += "%s/" % quote_plus(name) return upload_url def get_post_data(upload_data, metadata=None): """Return the data to be posted in order to create the upload.""" data = { 'updown_id': upload_data['upload_id'], 'binary_filesize': upload_data['binary_filesize'], 'source_uploaded': upload_data['source_uploaded'], } data.update({ key: value for (key, value) in metadata.items() if key not in ( # make sure not to override upload_id, binary_filesize and # source_uploaded 'upload_id', 'binary_filesize', 'source_uploaded', # skip files as they will be added to the files argument 'icon_256', 'icon', 'screenshots', ) }) return data def get_post_files(metadata=None): """Return data about files to upload during the package upload request.""" files = [] icon = metadata.get('icon', metadata.get('icon_256', '')) if icon: icon_file = open(icon, 'rb') files.append(('icon_256', icon_file)) screenshots = metadata.get('screenshots', []) for screenshot in screenshots: screenshot_file = open(screenshot, 'rb') files.append(('screenshots', screenshot_file)) return files def get_scan_data(session, status_url): """Return metadata about the state of the upload scan process.""" # initial retry after 5 seconds # linear backoff after that # abort after 5 retries @retry(terminator=is_scan_completed, retries=SCAN_STATUS_POLL_RETRIES, delay=SCAN_STATUS_POLL_DELAY, backoff=1, logger=logger) def get_status(): return session.get(status_url) response, aborted = get_status() completed = False data = {} if not aborted: completed = is_scan_completed(response) data = response.json() return completed, data PKGv(HAϬstoreapi/channels.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals from storeapi.common import myapps_api_call def get_channels(session, package_name): """Get current channels config for package through API.""" channels_endpoint = 'package-channels/%s/' % package_name return myapps_api_call(channels_endpoint, session=session) def update_channels(session, package_name, data): """Update current channels config for package through API.""" channels_endpoint = 'package-channels/%s/' % package_name result = myapps_api_call(channels_endpoint, method='POST', data=data, session=session) if result['success']: result['errors'] = result['data']['errors'] result['data'] = result['data']['channels'] return result PKu(Hr**storeapi/__init__.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from .channels import get_channels, update_channels # noqa from .info import get_info # noqa from ._login import login # noqa from ._upload import upload # noqa PKGv(H*2᧛storeapi/_login.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import os from ssoclient.v2 import ( ApiException, UnexpectedApiError, V2ApiClient, ) from storeapi.constants import ( UBUNTU_SSO_API_ROOT_URL, ) def login(email, password, token_name, otp=None): """Log in via the Ubuntu One SSO API. If successful, returns the oauth token data. """ result = { 'success': False, 'body': None, } api_endpoint = os.environ.get( 'UBUNTU_SSO_API_ROOT_URL', UBUNTU_SSO_API_ROOT_URL) client = V2ApiClient(endpoint=api_endpoint) data = { 'email': email, 'password': password, 'token_name': token_name, } if otp is not None: data['otp'] = otp try: response = client.login(data=data) result['body'] = response result['success'] = True except ApiException as err: result['body'] = err.body except UnexpectedApiError as err: result['body'] = err.json_body return result PKGv(H߆2storeapi/constants.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals CLICK_UPDOWN_UPLOAD_URL = 'https://upload.apps.ubuntu.com/' MYAPPS_API_ROOT_URL = 'https://myapps.developer.ubuntu.com/dev/api/' UBUNTU_SSO_API_ROOT_URL = 'https://login.ubuntu.com/api/v2/' SCAN_STATUS_POLL_DELAY = 5 SCAN_STATUS_POLL_RETRIES = 5 PKGv(Hvstoreapi/compat.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals try: # pragma: no cover from builtins import open # noqa from urllib.parse import quote_plus, urljoin except ImportError: # pragma: no cover from __builtin__ import open # noqa from urllib import quote_plus # noqa from urlparse import urljoin # noqa PKGv(H2storeapi/tests/test_info.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals from unittest import TestCase from mock import patch from storeapi.info import get_info class InfoAPITestCase(TestCase): def setUp(self): super(InfoAPITestCase, self).setUp() patcher = patch('storeapi.common.requests.get') self.mock_get = patcher.start() self.mock_response = self.mock_get.return_value self.addCleanup(patcher.stop) def test_get_info(self): expected = { 'success': True, 'errors': [], 'data': {'version': 1}, } self.mock_response.ok = True self.mock_response.json.return_value = {'version': 1} data = get_info() self.assertEqual(data, expected) def test_get_info_with_error_response(self): expected = { 'success': False, 'errors': ['some error'], 'data': None, } self.mock_response.ok = False self.mock_response.text = 'some error' data = get_info() self.assertEqual(data, expected) def test_get_info_uses_environment_variables(self): with patch('storeapi.common.os.environ', {'MYAPPS_API_ROOT_URL': 'http://example.com'}): get_info() self.mock_get.assert_called_once_with('http://example.com') PKGv(HFT;##storeapi/tests/test_channels.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json from unittest import TestCase from mock import patch from storeapi.channels import get_channels, update_channels class ChannelsAPITestCase(TestCase): def setUp(self): super(ChannelsAPITestCase, self).setUp() # setup patches oauth_session = 'storeapi.common.get_oauth_session' patcher = patch(oauth_session) self.mock_get_oauth_session = patcher.start() self.mock_session = self.mock_get_oauth_session.return_value self.addCleanup(patcher.stop) self.mock_get = self.mock_session.get self.mock_post = self.mock_session.post self.channels_data = [ {'channel': 'stable', 'current': {'revision': 2, 'version': '1'}}, {'channel': 'beta', 'current': {'revision': 4, 'version': '1.5'}}, {'channel': 'edge', 'current': None}, ] def set_channels_get_success_response(self): mock_response = self.mock_get.return_value mock_response.ok = True mock_response.json.return_value = self.channels_data def set_channels_get_error_response(self, error_msg): mock_response = self.mock_get.return_value mock_response.ok = False mock_response.text = error_msg def set_channels_post_success_response(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'errors': [], 'channels': self.channels_data } def set_channels_post_failed_response(self, error_msg): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'errors': [error_msg], 'channels': self.channels_data } def set_channels_post_error_response(self, error_msg): mock_response = self.mock_post.return_value mock_response.ok = False mock_response.text = error_msg def test_get_channels(self): self.set_channels_get_success_response() data = get_channels(self.mock_session, 'package.name') expected = { 'success': True, 'errors': [], 'data': self.channels_data, } self.assertEqual(data, expected) def test_get_channels_with_error_response(self): error_msg = 'some error' self.set_channels_get_error_response(error_msg) data = get_channels(self.mock_session, 'package.name') expected = { 'success': False, 'errors': [error_msg], 'data': None, } self.assertEqual(data, expected) def test_get_channels_uses_environment_variables(self): with patch('storeapi.common.os.environ', {'MYAPPS_API_ROOT_URL': 'http://example.com'}): get_channels(self.mock_session, 'package.name') self.mock_get.assert_called_once_with( 'http://example.com/package-channels/package.name/') def test_update_channels(self): self.set_channels_post_success_response() data = update_channels( self.mock_session, 'package.name', {'stable': 2}) expected = { 'success': True, 'errors': [], 'data': self.channels_data, } self.assertEqual(data, expected) def test_update_channels_with_error_response(self): error_msg = 'some error' self.set_channels_post_error_response(error_msg) data = update_channels( self.mock_session, 'package.name', {'stable': 2}) expected = { 'success': False, 'errors': [error_msg], 'data': None, } self.assertEqual(data, expected) def test_update_channels_with_failed_response(self): error_msg = 'some error' self.set_channels_post_failed_response(error_msg) data = update_channels( self.mock_session, 'package.name', {'stable': 2}) expected = { 'success': True, 'errors': [error_msg], 'data': self.channels_data, } self.assertEqual(data, expected) def test_update_channels_uses_environment_variables(self): with patch('storeapi.common.os.environ', {'MYAPPS_API_ROOT_URL': 'http://example.com'}): update_channels( self.mock_session, 'package.name', {'stable': 2}) self.mock_post.assert_called_once_with( 'http://example.com/package-channels/package.name/', data=json.dumps({'stable': 2}), headers={'Content-Type': 'application/json'}) PKu(Hkstoreapi/tests/__init__.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). PKGv(H3storeapi/tests/test_login.py# -*- coding: utf-8 -*- # Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json from unittest import TestCase from mock import patch from requests import Response from storeapi._login import login from storeapi.constants import ( UBUNTU_SSO_API_ROOT_URL, ) class LoginAPITestCase(TestCase): def setUp(self): super(LoginAPITestCase, self).setUp() self.email = 'user@domain.com' self.password = 'password' self.token_name = 'token-name' # setup patches mock_environ = { 'UBUNTU_SSO_API_ROOT_URL': UBUNTU_SSO_API_ROOT_URL, } patcher = patch('storeapi._login.os.environ', mock_environ) patcher.start() self.addCleanup(patcher.stop) patcher = patch('ssoclient.v2.http.requests.Session.request') self.mock_request = patcher.start() self.addCleanup(patcher.stop) self.token_data = { 'consumer_key': 'consumer-key', 'consumer_secret': 'consumer-secret', 'token_key': 'token-key', 'token_secret': 'token-secret', } response = self.make_response(status_code=201, reason='CREATED', data=self.token_data) self.mock_request.return_value = response def make_response(self, status_code=200, reason='OK', data=None): data = data or {} response = Response() response.status_code = status_code response.reason = reason response._content = json.dumps(data).encode('utf-8') return response def assert_login_request(self, otp=None, token_name=None): if token_name is None: token_name = self.token_name data = { 'email': self.email, 'password': self.password, 'token_name': token_name } if otp is not None: data['otp'] = otp self.mock_request.assert_called_once_with( 'POST', UBUNTU_SSO_API_ROOT_URL + 'tokens/oauth', data=json.dumps(data), json=None, headers={'Content-Type': 'application/json'} ) def test_login_successful(self): result = login(self.email, self.password, self.token_name) expected = {'success': True, 'body': self.token_data} self.assertEqual(result, expected) def test_default_token_name(self): result = login(self.email, self.password, self.token_name) expected = {'success': True, 'body': self.token_data} self.assertEqual(result, expected) self.assert_login_request() def test_custom_token_name(self): result = login(self.email, self.password, token_name='my-token') expected = {'success': True, 'body': self.token_data} self.assertEqual(result, expected) self.assert_login_request(token_name='my-token') def test_login_with_otp(self): result = login(self.email, self.password, self.token_name, otp='123456') expected = {'success': True, 'body': self.token_data} self.assertEqual(result, expected) self.assert_login_request(otp='123456') def test_login_unsuccessful_api_exception(self): error_data = { 'message': 'Error during login.', 'code': 'INVALID_CREDENTIALS', 'extra': {}, } response = self.make_response( status_code=401, reason='UNAUTHORISED', data=error_data) self.mock_request.return_value = response result = login(self.email, self.password, self.token_name) expected = {'success': False, 'body': error_data} self.assertEqual(result, expected) def test_login_unsuccessful_unexpected_error(self): error_data = { 'message': 'Error during login.', 'code': 'UNEXPECTED_ERROR_CODE', 'extra': {}, } response = self.make_response( status_code=401, reason='UNAUTHORISED', data=error_data) self.mock_request.return_value = response result = login(self.email, self.password, self.token_name) expected = {'success': False, 'body': error_data} self.assertEqual(result, expected) PKGv(H#g D Dstoreapi/tests/test_upload.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). from __future__ import absolute_import, unicode_literals import json import os import tempfile from unittest import TestCase from mock import ANY, patch from requests import Response from storeapi._upload import ( get_upload_url, upload_app, upload_files, upload, ) class UploadBaseTestCase(TestCase): def setUp(self): super(UploadBaseTestCase, self).setUp() # setup patches name = 'storeapi._upload.get_oauth_session' patcher = patch(name) self.mock_get_oauth_session = patcher.start() self.addCleanup(patcher.stop) self.mock_get = self.mock_get_oauth_session.return_value.get self.mock_post = self.mock_get_oauth_session.return_value.post self.suffix = '_0.1_all.click' self.binary_file = self.get_temporary_file(suffix=self.suffix) def get_temporary_file(self, suffix='.cfg'): return tempfile.NamedTemporaryFile(suffix=suffix) class UploadWithScanTestCase(UploadBaseTestCase): def test_default_metadata(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'successful': True, 'upload_id': 'some-valid-upload-id', } upload(self.binary_file.name) data = { 'updown_id': 'some-valid-upload-id', 'source_uploaded': False, 'binary_filesize': 0, } name = os.path.basename(self.binary_file.name).replace(self.suffix, '') self.mock_post.assert_called_with( get_upload_url(name), data=data, files=[]) def test_metadata_from_file(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'successful': True, 'upload_id': 'some-valid-upload-id', } with self.get_temporary_file() as metadata_file: data = json.dumps({'name': 'from_file'}) metadata_file.write(data.encode('utf-8')) metadata_file.flush() upload( self.binary_file.name, metadata_filename=metadata_file.name) data = { 'updown_id': 'some-valid-upload-id', 'source_uploaded': False, 'binary_filesize': 0, 'name': 'from_file', } name = os.path.basename(self.binary_file.name).replace(self.suffix, '') self.mock_post.assert_called_with( get_upload_url(name), data=data, files=[]) def test_override_metadata(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'successful': True, 'upload_id': 'some-valid-upload-id', } upload( self.binary_file.name, metadata={'name': 'overridden'}) data = { 'updown_id': 'some-valid-upload-id', 'source_uploaded': False, 'binary_filesize': 0, 'name': 'overridden', } name = os.path.basename(self.binary_file.name).replace(self.suffix, '') self.mock_post.assert_called_with( get_upload_url(name), data=data, files=[]) class UploadFilesTestCase(UploadBaseTestCase): def setUp(self): super(UploadFilesTestCase, self).setUp() self.binary_file = self.get_temporary_file(suffix='_0.1_all.click') def test_upload_files(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'successful': True, 'upload_id': 'some-valid-upload-id', } response = upload_files(self.binary_file.name) self.assertEqual(response, { 'success': True, 'errors': [], 'upload_id': 'some-valid-upload-id', 'binary_filesize': os.path.getsize(self.binary_file.name), 'source_uploaded': False, }) def test_upload_files_uses_environment_variables(self): with patch.dict(os.environ, CLICK_UPDOWN_UPLOAD_URL='http://example.com'): upload_url = 'http://example.com/unscanned-upload/' upload_files(self.binary_file.name) self.mock_post.assert_called_once_with( upload_url, files={'binary': ANY}) def test_upload_files_with_source_upload(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'successful': True, 'upload_id': 'some-valid-upload-id', } response = upload_files(self.binary_file.name) self.assertEqual(response, { 'success': True, 'errors': [], 'upload_id': 'some-valid-upload-id', 'binary_filesize': os.path.getsize(self.binary_file.name), 'source_uploaded': False, }) def test_upload_files_with_invalid_oauth_session(self): self.mock_get_oauth_session.return_value = None response = upload_files(self.binary_file.name) self.assertEqual(response, { 'success': False, 'errors': ['No valid credentials found.'], }) self.assertFalse(self.mock_post.called) def test_upload_files_error_response(self): mock_response = self.mock_post.return_value mock_response.ok = False mock_response.reason = '500 INTERNAL SERVER ERROR' mock_response.text = 'server failed' response = upload_files(self.binary_file.name) self.assertEqual(response, { 'success': False, 'errors': ['server failed'], }) def test_upload_files_handle_malformed_response(self): mock_response = self.mock_post.return_value mock_response.json.return_value = {'successful': False} response = upload_files(self.binary_file.name) err = KeyError('upload_id') self.assertEqual(response, { 'success': False, 'errors': [str(err)], }) class UploadAppTestCase(UploadBaseTestCase): def setUp(self): super(UploadAppTestCase, self).setUp() self.data = { 'upload_id': 'some-valid-upload-id', 'binary_filesize': 123456, 'source_uploaded': False, } self.package_name = 'namespace.binary' patcher = patch.multiple( 'storeapi._upload', SCAN_STATUS_POLL_DELAY=0.0001) patcher.start() self.addCleanup(patcher.stop) def test_upload_app_with_invalid_oauth_session(self): self.mock_get_oauth_session.return_value = None response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': False, 'errors': ['No valid credentials found.'], 'application_url': '', 'revision': None, }) def test_upload_app_uses_environment_variables(self): with patch.dict(os.environ, MYAPPS_API_ROOT_URL='http://example.com'): upload_url = ("http://example.com/click-package-upload/%s/" % self.package_name) data = { 'updown_id': self.data['upload_id'], 'binary_filesize': self.data['binary_filesize'], 'source_uploaded': self.data['source_uploaded'], } upload_app(self.package_name, self.data) self.mock_post.assert_called_once_with( upload_url, data=data, files=[]) def test_upload_app(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'status_url': 'http://example.com/status/' } mock_status_response = self.mock_get.return_value mock_status_response.ok = True mock_status_response.json.return_value = { 'completed': True, 'revision': 15, } response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': True, 'errors': [], 'application_url': '', 'revision': 15, }) def test_upload_app_error_response(self): mock_response = self.mock_post.return_value mock_response.ok = False mock_response.reason = '500 INTERNAL SERVER ERROR' mock_response.text = 'server failure' response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': False, 'errors': ['server failure'], 'application_url': '', 'revision': None, }) def test_upload_app_handle_malformed_response(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = {} response = upload_app(self.package_name, self.data) err = KeyError('status_url') self.assertEqual(response, { 'success': False, 'errors': [str(err)], 'application_url': '', 'revision': None, }) def test_upload_app_with_errors_during_scan(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'status_url': 'http://example.com/status/' } mock_status_response = self.mock_get.return_value mock_status_response.ok = True mock_status_response.json.return_value = { 'completed': True, 'message': 'some error', 'application_url': 'http://example.com/myapp', } response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': False, 'errors': ['some error'], 'application_url': 'http://example.com/myapp', 'revision': None, }) def test_upload_app_poll_status(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.return_value = { 'success': True, 'status_url': 'http://example.com/status/' } response_not_completed = Response() response_not_completed.status_code = 200 response_not_completed.encoding = 'utf-8' response_not_completed._content = json.dumps( {'completed': False, 'application_url': ''}).encode('utf-8') response_completed = Response() response_completed.status_code = 200 response_completed.encoding = 'utf-8' response_completed._content = json.dumps( {'completed': True, 'revision': 14, 'application_url': 'http://example.org'}).encode('utf-8') self.mock_get.side_effect = [ response_not_completed, response_not_completed, response_completed, ] response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': True, 'errors': [], 'application_url': 'http://example.org', 'revision': 14, }) self.assertEqual(self.mock_get.call_count, 3) def test_upload_app_ignore_non_ok_responses(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.return_value = { 'success': True, 'status_url': 'http://example.com/status/', } ok_response = Response() ok_response.status_code = 200 ok_response.encoding = 'utf-8' ok_response._content = json.dumps( {'completed': True, 'revision': 14}).encode('utf-8') nok_response = Response() nok_response.status_code = 503 self.mock_get.side_effect = [nok_response, nok_response, ok_response] response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': True, 'errors': [], 'application_url': '', 'revision': 14, }) self.assertEqual(self.mock_get.call_count, 3) def test_upload_app_abort_polling(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'status_url': 'http://example.com/status/', 'web_status_url': 'http://example.com/status-web/', } mock_status_response = self.mock_get.return_value mock_status_response.ok = True mock_status_response.json.return_value = { 'completed': False } response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': False, 'errors': [ 'Package scan took too long.', 'Please check the status later at: ' 'http://example.com/status-web/.', ], 'application_url': '', 'revision': None, }) def test_upload_app_abort_polling_without_web_status_url(self): mock_response = self.mock_post.return_value mock_response.ok = True mock_response.json.return_value = { 'success': True, 'status_url': 'http://example.com/status/', } mock_status_response = self.mock_get.return_value mock_status_response.ok = True mock_status_response.json.return_value = { 'completed': False } response = upload_app(self.package_name, self.data) self.assertEqual(response, { 'success': False, 'errors': [ 'Package scan took too long.', ], 'application_url': '', 'revision': None, }) def test_upload_app_with_metadata(self): upload_app(self.package_name, self.data, metadata={ 'changelog': 'some changes', 'tagline': 'a tagline'}) self.mock_post.assert_called_once_with( ANY, data={ 'updown_id': self.data['upload_id'], 'binary_filesize': self.data['binary_filesize'], 'source_uploaded': self.data['source_uploaded'], 'changelog': 'some changes', 'tagline': 'a tagline', }, files=[], ) def test_upload_app_ignore_special_attributes_in_metadata(self): upload_app( self.package_name, self.data, metadata={ 'changelog': 'some changes', 'tagline': 'a tagline', 'upload_id': 'my-own-id', 'binary_filesize': 0, 'source_uploaded': False, }) self.mock_post.assert_called_once_with( ANY, data={ 'updown_id': self.data['upload_id'], 'binary_filesize': self.data['binary_filesize'], 'source_uploaded': self.data['source_uploaded'], 'changelog': 'some changes', 'tagline': 'a tagline', }, files=[], ) @patch('storeapi._upload.open') def test_upload_app_with_icon(self, mock_open): with tempfile.NamedTemporaryFile() as icon: mock_open.return_value = icon upload_app( self.package_name, self.data, metadata={ 'icon_256': icon.name, } ) self.mock_post.assert_called_once_with( ANY, data={ 'updown_id': self.data['upload_id'], 'binary_filesize': self.data['binary_filesize'], 'source_uploaded': self.data['source_uploaded'], }, files=[ ('icon_256', icon), ], ) @patch('storeapi._upload.open') def test_upload_app_with_screenshots(self, mock_open): screenshot1 = tempfile.NamedTemporaryFile() screenshot2 = tempfile.NamedTemporaryFile() mock_open.side_effect = [screenshot1, screenshot2] upload_app( self.package_name, self.data, metadata={ 'screenshots': [screenshot1.name, screenshot2.name], } ) self.mock_post.assert_called_once_with( ANY, data={ 'updown_id': self.data['upload_id'], 'binary_filesize': self.data['binary_filesize'], 'source_uploaded': self.data['source_uploaded'], }, files=[ ('screenshots', screenshot1), ('screenshots', screenshot2), ], ) def test_get_upload_url(self): with patch.dict(os.environ, MYAPPS_API_ROOT_URL='http://example.com'): upload_url = "http://example.com/click-package-upload/app.dev/" url = get_upload_url('app.dev') self.assertEqual(url, upload_url) PK(HG@storeapi/tests/test_common.py# Copyright 2015 Canonical Ltd. This software is licensed under the # GNU General Public License version 3 (see the file LICENSE). import json from unittest import TestCase import responses from mock import Mock, call, patch from requests_oauthlib import OAuth1Session from storeapi.common import ( get_oauth_session, myapps_api_call, retry, ) class GetOAuthSessionTestCase(TestCase): def test_get_oauth_session_when_no_config(self): config = {} session = get_oauth_session(config) self.assertIsNone(session) def test_get_oauth_session_when_partial_config(self): config = { 'consumer_key': 'consumer-key', 'consumer_secret': 'consumer-secret', } session = get_oauth_session(config) self.assertIsNone(session) def test_get_oauth_session(self): config = { 'consumer_key': 'consumer-key', 'consumer_secret': 'consumer-secret', 'token_key': 'token-key', 'token_secret': 'token-secret', } session = get_oauth_session(config) self.assertIsInstance(session, OAuth1Session) self.assertEqual(session.auth.client.client_key, 'consumer-key') self.assertEqual(session.auth.client.client_secret, 'consumer-secret') self.assertEqual(session.auth.client.resource_owner_key, 'token-key') self.assertEqual(session.auth.client.resource_owner_secret, 'token-secret') class ApiCallTestCase(TestCase): def setUp(self): super(ApiCallTestCase, self).setUp() p = patch('storeapi.common.os') mock_os = p.start() self.addCleanup(p.stop) mock_os.environ = {'MYAPPS_API_ROOT_URL': 'http://example.com'} @responses.activate def test_get_success(self): response_data = {'response': 'value'} responses.add(responses.GET, 'http://example.com/path', body=json.dumps(response_data)) result = myapps_api_call('/path') self.assertEqual(result, { 'success': True, 'data': response_data, 'errors': [], }) @responses.activate def test_get_error(self): response_data = {'response': 'error'} responses.add(responses.GET, 'http://example.com/path', body=json.dumps(response_data), status=500) result = myapps_api_call('/path') self.assertEqual(result, { 'success': False, 'data': None, 'errors': [json.dumps(response_data)], }) @responses.activate def test_post_success(self): response_data = {'response': 'value'} responses.add(responses.POST, 'http://example.com/path', body=json.dumps(response_data)) result = myapps_api_call('/path', method='POST') self.assertEqual(result, { 'success': True, 'data': response_data, 'errors': [], }) @responses.activate def test_post_error(self): response_data = {'response': 'value'} responses.add(responses.POST, 'http://example.com/path', body=json.dumps(response_data), status=500) result = myapps_api_call('/path', method='POST') self.assertEqual(result, { 'success': False, 'data': None, 'errors': [json.dumps(response_data)], }) def test_unsupported_method(self): self.assertRaises(ValueError, myapps_api_call, '/path', method='FOO') def test_get_with_session(self): session = Mock() myapps_api_call('/path', session=session) session.get.assert_called_once_with('http://example.com/path') def test_post_with_session(self): session = Mock() myapps_api_call('/path', method='POST', session=session) session.post.assert_called_once_with( 'http://example.com/path', data=None, headers={'Content-Type': 'application/json'}) @responses.activate def test_post_with_data(self): response_data = {'response': 'value'} responses.add(responses.POST, 'http://example.com/path', body=json.dumps(response_data)) result = myapps_api_call( '/path', method='POST', data={'request': 'value'}) self.assertEqual(result, { 'success': True, 'data': response_data, 'errors': [], }) self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.headers['Content-Type'], 'application/json') self.assertEqual(responses.calls[0].request.body, json.dumps({'request': 'value'})) class RetryDecoratorTestCase(TestCase): def target(self, *args, **kwargs): return dict(args=args, kwargs=kwargs) def test_retry(self): result, aborted = retry()(self.target)() self.assertEqual(result, dict(args=(), kwargs={})) self.assertEqual(aborted, False) @patch('storeapi.common.time.sleep') def test_retry_small_backoff(self, mock_sleep): mock_terminator = Mock() mock_terminator.return_value = False delay = 0.001 result, aborted = retry(mock_terminator, retries=2, delay=delay)(self.target)() self.assertEqual(result, dict(args=(), kwargs={})) self.assertEqual(aborted, True) self.assertEqual(mock_terminator.call_count, 3) self.assertEqual(mock_sleep.mock_calls, [ call(delay), call(delay * 2), ]) def test_retry_abort(self): mock_terminator = Mock() mock_terminator.return_value = False mock_logger = Mock() result, aborted = retry(mock_terminator, delay=0.001, backoff=1, logger=mock_logger)(self.target)() self.assertEqual(result, dict(args=(), kwargs={})) self.assertEqual(aborted, True) self.assertEqual(mock_terminator.call_count, 4) self.assertEqual(mock_logger.warning.call_count, 3) def test_retry_with_invalid_retries(self): for value in (0.1, -1): with self.assertRaises(ValueError) as ctx: retry(retries=value)(self.target) self.assertEqual( str(ctx.exception), 'retries value must be a positive integer or zero') def test_retry_with_negative_delay(self): with self.assertRaises(ValueError) as ctx: retry(delay=-1)(self.target) self.assertEqual(str(ctx.exception), 'delay value must be positive') def test_retry_with_invalid_backoff(self): for value in (-1, 0, 0.1): with self.assertRaises(ValueError) as ctx: retry(backoff=value)(self.target) self.assertEqual(str(ctx.exception), 'backoff value must be a positive integer') PKB(H^- .click_toolbelt-0.5.1.dist-info/DESCRIPTION.rstUNKNOWN PKB(H%/click_toolbelt-0.5.1.dist-info/entry_points.txt[click_toolbelt] channels = click_toolbelt.channels:Channels info = click_toolbelt.info:Info login = click_toolbelt.login:Login upload = click_toolbelt.upload:Upload [console_scripts] click-toolbelt = click_toolbelt.toolbelt:main PKB(HXshh,click_toolbelt-0.5.1.dist-info/metadata.json{"classifiers": ["Development Status :: 4 - Beta", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Programming Language :: Python", "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.3", "Intended Audience :: Developers", "Environment :: Console"], "download_url": "https://launchpad.net/click-toolbelt", "extensions": {"python.commands": {"wrap_console": {"click-toolbelt": "click_toolbelt.toolbelt:main"}}, "python.details": {"contacts": [{"email": "ricardo.kirkner@canonical.com", "name": "Ricardo Kirkner", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://launchpad.net/click-toolbelt"}}, "python.exports": {"click_toolbelt": {"channels": "click_toolbelt.channels:Channels", "info": "click_toolbelt.info:Info", "login": "click_toolbelt.login:Login", "upload": "click_toolbelt.upload:Upload"}, "console_scripts": {"click-toolbelt": "click_toolbelt.toolbelt:main"}}}, "extras": [], "generator": "bdist_wheel (0.26.0)", "metadata_version": "2.0", "name": "click-toolbelt", "platform": "Any", "run_requires": [{"requires": ["cliff", "pyxdg", "requests", "requests-oauthlib", "ssoclient"]}], "summary": "Click App Toolbelt", "test_requires": [{"requires": ["mock", "responses"]}], "version": "0.5.1"}PKB(H25click_toolbelt-0.5.1.dist-info/namespace_packages.txt PKB(Hk,click_toolbelt-0.5.1.dist-info/top_level.txtclick_toolbelt storeapi PKB(Hndnn$click_toolbelt-0.5.1.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py2-none-any Tag: py3-none-any PKB(H%YY'click_toolbelt-0.5.1.dist-info/METADATAMetadata-Version: 2.0 Name: click-toolbelt Version: 0.5.1 Summary: Click App Toolbelt Home-page: https://launchpad.net/click-toolbelt Author: Ricardo Kirkner Author-email: ricardo.kirkner@canonical.com License: UNKNOWN Download-URL: https://launchpad.net/click-toolbelt Platform: Any Classifier: Development Status :: 4 - Beta Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Programming Language :: Python Classifier: Programming Language :: Python :: 2 Classifier: Programming Language :: Python :: 2.7 Classifier: Programming Language :: Python :: 3 Classifier: Programming Language :: Python :: 3.3 Classifier: Intended Audience :: Developers Classifier: Environment :: Console Requires-Dist: cliff Requires-Dist: pyxdg Requires-Dist: requests Requires-Dist: requests-oauthlib Requires-Dist: ssoclient UNKNOWN PKB(Hm l l %click_toolbelt-0.5.1.dist-info/RECORDclick_toolbelt/__init__.py,sha256=7TPTymbIazGv0d0gmuUYbKSAcYc97i-5BxiNx__FgLA,245 click_toolbelt/channels.py,sha256=JYvaEpD8I1guXPMkPxq_X3fEq72N6f1ZKzm-apSw_iM,2742 click_toolbelt/common.py,sha256=v_o7pMxmYhfL6rSG_ArestpzfZ5JBploZ0HQ6CQzADo,1051 click_toolbelt/compat.py,sha256=4rgsmycCXAurSvVYDCovbzqfC7dd30j3AFrEL32hiuo,587 click_toolbelt/config.py,sha256=CZmZQrX5zcUDdt9VkK5zXZ6wAivVIz7NNSJLBMmu_Wg,2224 click_toolbelt/constants.py,sha256=WvhYDjrYV5uL-OflXtTaJy9CU0RSp5B8NHSJVLED5wA,490 click_toolbelt/info.py,sha256=A_cosj12J38pfS_iw8yEu1e0of3Scvg-oTDSuLL2rL8,1894 click_toolbelt/login.py,sha256=q-g80P_9J-PHpH_j6Q_p6LjUKRqOx2IafznoJREBc2c,1483 click_toolbelt/toolbelt.py,sha256=qjZohCtHePtdXSSUk8XuCpUrlJ7lJeH8wHpQtvYP6-U,838 click_toolbelt/upload.py,sha256=QEXBaDxSaK73ULOq1xRW6jhgDwwwB-UaVi8G93mEAjU,1192 click_toolbelt/tests/__init__.py,sha256=SYXq667H3S6zJjkzShaBvkD1C7WGGxL6TLGqFRyrpvk,132 click_toolbelt/tests/test_channels.py,sha256=lpLfhHk2UGKxfJeeEBLyuJrUbRMSH5rdQC8lg5u05bk,8896 click_toolbelt/tests/test_common.py,sha256=y6mb9FQ9TAHTbPJyjJdcjf4lvB3jHE937hUzwMbUFdU,1511 click_toolbelt/tests/test_config.py,sha256=cjN5UrGxCnikFDN9P-HwmVgi4EtHwkULr4jAQVSneI0,3960 click_toolbelt/tests/test_info.py,sha256=Sa5UYF3BDT_W0dp3yGn4bPdlNtZmSTVFPKGmxdr-im4,3519 click_toolbelt/tests/test_login.py,sha256=BHn3ovreBylJLbos3L34KFSBf9f8kXeQ88lhsxOr1tA,6334 click_toolbelt/tests/test_toolbelt.py,sha256=DFmDC_3R9maPTf8AZUhsNA8NrulbpSvvmr9VnbMB3wQ,1187 click_toolbelt/tests/test_upload.py,sha256=iF6QiLB3XosU3K4h-7YaVJd3qgIh_6SOoZs7aM4gyfY,7175 click_toolbelt-0.5.1.dist-info/DESCRIPTION.rst,sha256=OCTuuN6LcWulhHS3d5rfjdsQtW22n7HENFRh6jC6ego,10 click_toolbelt-0.5.1.dist-info/METADATA,sha256=j-OM7D1poYA3DFU0vaaE6OF01quyT9Ai-_7-I1A7gG4,857 click_toolbelt-0.5.1.dist-info/RECORD,, click_toolbelt-0.5.1.dist-info/WHEEL,sha256=GrqQvamwgBV4nLoJe0vhYRSWzWsx7xjlt74FT0SWYfE,110 click_toolbelt-0.5.1.dist-info/entry_points.txt,sha256=9w2gP_JmPDQjJT0dlWuWdjvm9aix-KqYs2V5nIKDTDY,232 click_toolbelt-0.5.1.dist-info/metadata.json,sha256=qLirCcz40UXWFz-XBz512q3UoMroq2Mzyq32A3mvrCI,1384 click_toolbelt-0.5.1.dist-info/namespace_packages.txt,sha256=AbpHGcgLb-kRsJGnwFEktk7uzpZOCcBY74-YBdrKVGs,1 click_toolbelt-0.5.1.dist-info/top_level.txt,sha256=oLKu6Nzwp4o3bJjyoegYn1ZDN4zoUcVKCeATZ4rC9O4,24 storeapi/__init__.py,sha256=8TyM52Nq3a3cr6YmBxb4FbA7tuTKSnwhO6tBCBXdcTg,298 storeapi/_login.py,sha256=RwUCiblcphJN9uEZn24w1r8mtYcuFEXgoOZ_bx2B1A8,1179 storeapi/_upload.py,sha256=qwwg73qS5vDWX7t6UV7M1cUTFicb8o8i57oh07hOhH0,8640 storeapi/channels.py,sha256=pgvUN-fCAzF6yjUDQ1g01nI_Ys5DIkPLeFrPLZTlZ70,937 storeapi/common.py,sha256=YmlVGOmfnYOkwiRPfukZStpLIaWWO8aFqZHGTMJT79w,3985 storeapi/compat.py,sha256=Ry7-oCIaZKLU049CyF3vc0vEJ-6vsxnlYJQQ45DHwLQ,467 storeapi/constants.py,sha256=7zO72MpJZGBVQUtnZkmFZvTQhCHufDtJ5Ve3v_uJO4Q,437 storeapi/info.py,sha256=APyY9xZr3doXd6sjizircOVflUWL6dzkEaWNyhdRgtY,483 storeapi/tests/__init__.py,sha256=rE9sU8aELJK5OrOhrClBbEXQ3G3enGkNpoQD8TstUP8,132 storeapi/tests/test_channels.py,sha256=MYNf8Syu3KZuCWhvgkqkthITUUFaLV8NzFBmIXkkmdg,4899 storeapi/tests/test_common.py,sha256=WAYqbchSxhgQnxczx3Z2lgAjcW_AcKnDsePDg6l2q50,7049 storeapi/tests/test_info.py,sha256=qbVpEDJTZ6VBZdw2moF0EiuneAAAKZlB1ryGLn8rc4Q,1505 storeapi/tests/test_login.py,sha256=F4yRor-FcmuVlZI9lcnvgCWg04BEQB01OJXZG5DgOWI,4359 storeapi/tests/test_upload.py,sha256=wswTlqgduVUAFhOZOCFuauBlCmrFU3_cp2lCrkISiMo,17421 PKք"H click_toolbelt/config.pyPKGv(HaFFclick_toolbelt/toolbelt.pyPKGv(HFвffd click_toolbelt/info.pyPKGv(HT<#click_toolbelt/common.pyPKGv(H= Oclick_toolbelt/channels.pyPK(HVvD=#click_toolbelt/__init__.pyPKք"HB9(j$click_toolbelt/constants.pyPKք"H4)cKK&click_toolbelt/compat.pyPKGv(HV#)click_toolbelt/upload.pyPKGv(H?j-click_toolbelt/login.pyPKք"H[7C !3click_toolbelt/tests/test_info.pyPKք"H 8%Aclick_toolbelt/tests/test_toolbelt.pyPKք"H=+xx#Fclick_toolbelt/tests/test_config.pyPKք"Hzδ""%Vclick_toolbelt/tests/test_channels.pyPKք"H(/ yclick_toolbelt/tests/__init__.pyPKGv(H k"Nzclick_toolbelt/tests/test_login.pyPKi(H\i(#Lclick_toolbelt/tests/test_upload.pyPKGv(H#click_toolbelt/tests/test_common.pyPKGv(H/storeapi/info.pyPKGv(HKͷstoreapi/common.pyPK(H$7!!storeapi/_upload.pyPKGv(HAϬstoreapi/channels.pyPKu(Hr**Zstoreapi/__init__.pyPKGv(H*2᧛storeapi/_login.pyPKGv(H߆2storeapi/constants.pyPKGv(Hvistoreapi/compat.pyPKGv(H2lstoreapi/tests/test_info.pyPKGv(HFT;##storeapi/tests/test_channels.pyPKu(Hkstoreapi/tests/__init__.pyPKGv(H3storeapi/tests/test_login.pyPKGv(H#g D D"storeapi/tests/test_upload.pyPK(HG@+gstoreapi/tests/test_common.pyPKB(H^- .click_toolbelt-0.5.1.dist-info/DESCRIPTION.rstPKB(H%/Eclick_toolbelt-0.5.1.dist-info/entry_points.txtPKB(HXshh,zclick_toolbelt-0.5.1.dist-info/metadata.jsonPKB(H25,click_toolbelt-0.5.1.dist-info/namespace_packages.txtPKB(Hk,click_toolbelt-0.5.1.dist-info/top_level.txtPKB(Hndnn$click_toolbelt-0.5.1.dist-info/WHEELPKB(H%YY'click_toolbelt-0.5.1.dist-info/METADATAPKB(Hm l l %0click_toolbelt-0.5.1.dist-info/RECORDPK(( ߜ