PKIfGOBgo_optouts/main.py""" Command for launching the Vumi Go Opt Out API. """ import click from .server import ApiSite @click.command("go-optouts") @click.version_option() @click.option( '--config', '-c', required=True, help='YAML config file') @click.option( '--host', '-h', default='localhost', help='Host to listen on') @click.option( '--port', '-p', type=int, default=8080, help='Port to listen on') def run(config, host, port): """ Vumi Go Opt Out API. """ site = ApiSite(config) site.run(host, port) PK8`jGF go_optouts/api.pyimport json from klein import Klein from twisted.internet.defer import inlineCallbacks, returnValue class OwnerIdNotValid(Exception): """ Raised when no valid owner is found. """ class OptOutNotFound(Exception): """ Raised when no opt out is found. """ class OptOutAlreadyExists(Exception): """ Raised when opt out already exists. """ class OptOutNotDeleted(Exception): """ Raised when opt out not deleted. """ class API(object): app = Klein() def __init__(self, backend, auth): self._backend = backend self._auth = auth def response(self, request, status_code=200, status_reason="OK", **data): request.setResponseCode(status_code) request.setHeader('Content-Type', 'application/json') data.update({ "status": { "code": status_code, "reason": status_reason, }, }) return json.dumps(data) @inlineCallbacks def collection(self, request): owner_id = yield self._auth.owner_id(request) if owner_id is None: raise OwnerIdNotValid() returnValue(self._backend.get_opt_out_collection(owner_id)) # Error Handling @app.handle_errors(OwnerIdNotValid) def owner_id_not_valid(self, request, failure): return self.response( request, status_code=401, status_reason="Owner ID not valid.") @app.handle_errors(OptOutNotFound) def opt_out_not_found(self, request, failure): return self.response( request, status_code=404, status_reason="Opt out not found.") @app.handle_errors(OptOutAlreadyExists) def opt_out_already_exists(self, request, failure): return self.response( request, status_code=409, status_reason="Opt out already exists.") @app.handle_errors(OptOutNotDeleted) def opt_out_not_deleted(self, request, failure): return self.response( request, status_code=404, status_reason="There\'s nothing to delete.") # Methods @app.route('//', methods=['GET']) @inlineCallbacks def get_address(self, request, addresstype, address): collection = yield self.collection(request) opt_out = collection.get(addresstype, address) if opt_out is None: raise OptOutNotFound() returnValue(self.response(request, opt_out=opt_out)) @app.route('//', methods=['PUT']) @inlineCallbacks def save_address(self, request, addresstype, address): collection = yield self.collection(request) opt_out = collection.get(addresstype, address) if opt_out is not None: raise OptOutAlreadyExists() opt_out = collection.put(addresstype, address) returnValue(self.response(request, opt_out=opt_out)) @app.route('//', methods=['DELETE']) @inlineCallbacks def delete_address(self, request, addresstype, address): collection = yield self.collection(request) opt_out = collection.delete(addresstype, address) if opt_out is None: raise OptOutNotDeleted() returnValue(self.response(request, opt_out=opt_out)) @app.route('/count', methods=['GET']) @inlineCallbacks def get_opt_out_count(self, request): collection = yield self.collection(request) count = collection.count() returnValue(self.response(request, opt_out_count=count)) PK8`jGgo_optouts/auth.py""" Utilities for authenticating requests. """ import treq from twisted.internet.defer import succeed, inlineCallbacks, returnValue from zope.interface import implements, Interface class IAuthenticator(Interface): """ Authenticator interface. """ def owner_id(request): """ Retrieve an owner_id for a request. :type request: A Klein request object. :param request: The request to authenticate. :return: A deferred that fires with either an owner_id or None if the request could not be authenticated. """ class RequestHeaderAuth(object): implements(IAuthenticator) def owner_id(self, request): return succeed(request.getHeader('X-Owner-ID')) class BouncerAuth(object): implements(IAuthenticator) def __init__(self, auth_bouncer_url): self._auth_bouncer_url = auth_bouncer_url.rstrip('/') @inlineCallbacks def owner_id(self, request): auth_headers = {} auth = request.getHeader('Authorization') if auth: auth_headers['Authorization'] = auth uri = "".join([self._auth_bouncer_url, request.path]) resp = yield treq.get(uri, headers=auth_headers, persistent=False) yield resp.content() if resp.code >= 400: returnValue(None) x_owner_id = resp.headers.getRawHeaders('X-Owner-Id') if x_owner_id is None or len(x_owner_id) != 1: returnValue(None) returnValue(x_owner_id[0]) PK8`jG@QQgo_optouts/__init__.py"""An API for managing Vumi Go opt outs.""" __version__ = "0.1.1" __all__ = [] PK8`jG;Zt go_optouts/server.pyimport sys from twisted.internet import reactor from twisted.python import log from twisted.web import http from twisted.web.resource import Resource from confmodel import Config from confmodel.fields import ConfigText, ConfigDict from vumi.utils import build_web_site import yaml from .api import API from .auth import RequestHeaderAuth, BouncerAuth from .store.memory import MemoryOptOutBackend from .store.riak import RiakOptOutBackend class HealthResource(Resource): isLeaf = True def render_GET(self, request): request.setResponseCode(http.OK) request.do_not_log = True return 'OK' def read_yaml_config(config_file): """Parse a YAML config file.""" if config_file is None: return {} with file(config_file, 'r') as stream: # Assume we get a dict out of this. return yaml.safe_load(stream) class ApiSiteConfig(Config): BACKENDS = { "memory": MemoryOptOutBackend, "riak": RiakOptOutBackend, } backend = ConfigText( "Optout backend to use. One of 'memory' or 'riak'", required=True) backend_config = ConfigDict( "Configuration for backend.", default={}) auth_bouncer_url = ConfigText( "URL to bounce requests to for authentication", default=None) url_path_prefix = ConfigText( "URL path prefix for the optout API.", default="optouts") def post_validate(self): if self.backend not in self.BACKENDS: self.raise_config_error( "Backend must be one of: %s" % ", ".join(self.BACKENDS.keys())) def create_backend(self): return self.BACKENDS[self.backend].from_config(self.backend_config) def create_auth(self): if self.auth_bouncer_url: return BouncerAuth(self.auth_bouncer_url) return RequestHeaderAuth() class ApiSite(object): """ Site for serving the opt out API. """ def __init__(self, config_file=None): self.config = ApiSiteConfig(read_yaml_config(config_file)) self.api = API( self.config.create_backend(), self.config.create_auth()) self.site = build_web_site({ 'health': HealthResource(), self.config.url_path_prefix: self.api.app.resource(), }) def run(self, host, port): log.startLogging(sys.stdout) reactor.listenTCP(port, self.site, interface=host) reactor.run() PKIfG~G G go_optouts/store/riak.py""" Riak opt out backend. """ from twisted.internet.defer import inlineCallbacks, returnValue from zope.interface import implements from go.vumitools.opt_out import OptOutStore, OptOut from vumi.persist.txriak_manager import TxRiakManager from .interface import IOptOutBackend, IOptOutCollection class RiakOptOutBackend(object): """ Riak opt out backend. :type riak_manager: `vumi.persist.txriak_manager.TxRiakManager` :param riak_manager: A Riak manager for the opt out stores. """ implements(IOptOutBackend) def __init__(self, riak_manager): self.riak_manager = riak_manager @classmethod def from_config(cls, config): riak_manager = TxRiakManager.from_config(config) return cls(riak_manager) def get_opt_out_collection(self, owner_id): """ Return the opt out collection for the specified owner. :param str owner_id: The id of the owner of the opt out store. """ opt_out_store = OptOutStore(self.riak_manager, owner_id) return RiakOptOutCollection(opt_out_store) class RiakOptOutCollection(object): """ Riak opt out collection for a particular opt out store. :type opt_out_store: `go.vumitools.opt_out.models.OptOutStore` :param opt_out_store: The opt out store to provide access to. """ implements(IOptOutCollection) def __init__(self, opt_out_store): self.store = opt_out_store @classmethod def _pick_fields(cls, data, keys): """ Return a sub-dictionary of all the items from ``data`` whose keys are listed in ``keys``. """ return dict((k, data[k]) for k in keys if k in data) @classmethod def _opt_out_to_dict(cls, opt_out): """ Return a sub-dictionary of the items from ``data`` that are valid contact fields. """ return cls._pick_fields( opt_out.get_data(), OptOut.field_descriptors.keys()) @inlineCallbacks def get(self, addresstype, address): opt_out = yield self.store.get_opt_out(addresstype, address) if opt_out is None: returnValue(None) returnValue(self._opt_out_to_dict(opt_out)) @inlineCallbacks def put(self, addresstype, address): opt_out = yield self.store.new_opt_out( addresstype, address, message={ # TODO: Fix the Vumi Go opt out store to allow descriptions # of why an address was opted out. Currently the only # description allowed is a Vumi message id. :| 'message_id': None, }) returnValue(self._opt_out_to_dict(opt_out)) @inlineCallbacks def delete(self, addresstype, address): opt_out = yield self.store.get_opt_out(addresstype, address) if opt_out is None: returnValue(None) opt_out_dict = self._opt_out_to_dict(opt_out) yield opt_out.delete() returnValue(opt_out_dict) @inlineCallbacks def count(self): count = yield self.store.count() returnValue(count) PKIfGgo_optouts/store/interface.pyimport zope.interface class IOptOutBackend(zope.interface.Interface): def get_opt_out_collection(owner_id): """ Return the opt out collection for the specified owner. :param str owner_id: The id of the owner of the opt out store. """ class IOptOutCollection(zope.interface.Interface): def get(address_type, address): """ Retrieve the opt out for an address. """ def put(address_type, address): """ Store a record of an opt out for an address. """ def delete(address_type, address): """ Remove an opt out for an address. """ def count(): """ Return the number of opt outs. """ PKIfGem88go_optouts/store/memory.pyimport uuid from zope.interface import implements from .interface import IOptOutBackend, IOptOutCollection class MemoryOptOutBackend(object): """ Memory opt out backend. """ implements(IOptOutBackend) def __init__(self): self._collections = {} @classmethod def from_config(cls, _config): return cls() def get_opt_out_collection(self, owner_id): """ Return the opt out collection for the specified owner. :param str owner_id: The id of the owner of the opt out store. """ collection = self._collections.get(owner_id) if collection is None: collection = self._collections[owner_id] = MemoryOptOutCollection() return collection class MemoryOptOutCollection(object): """ This implements the IOptOutStore interface. It stores the opt out in a dictionary using the address type and address as the key. The values are opt out objects, for example:: { "id": "2468", "address_type": "msisdn", "address": "+273121100", } """ implements(IOptOutCollection) def __init__(self): # _store maps (address_type, address) pairs to opt outs self._store = {} def get(self, address_type, address): key = (address_type, address) return self._store.get(key) def put(self, address_type, address): key = (address_type, address) opt_id = str(uuid.uuid4()) self._store[key] = { 'id': opt_id, 'address_type': address_type, 'address': address } return self._store.get(key) def delete(self, address_type, address): key = (address_type, address) return self._store.pop(key, None) def count(self): return len(self._store) PKIfGgo_optouts/store/__init__.pyPKIfG"go_optouts/store/tests/__init__.pyPKIfG2sEE%go_optouts/store/tests/test_memory.py""" Tests for opt_out_http_api.store.memory. """ from zope.interface.verify import verifyClass, verifyObject from go_optouts.store.interface import ( IOptOutBackend, IOptOutCollection) from go_optouts.store.memory import ( MemoryOptOutBackend, MemoryOptOutCollection) from twisted.trial.unittest import TestCase class TestMemoryOptOutBackend(TestCase): def mk_backend(self): return MemoryOptOutBackend() def test_class_iface(self): self.assertTrue(verifyClass(IOptOutBackend, MemoryOptOutBackend)) def test_instance_iface(self): backend = self.mk_backend() self.assertTrue(verifyObject(IOptOutBackend, backend)) def test_from_config(self): backend = MemoryOptOutBackend.from_config({}) self.assertTrue(isinstance(backend, MemoryOptOutBackend)) def test_get_opt_out_collection(self): backend = self.mk_backend() collection = backend.get_opt_out_collection("owner-1") self.assertTrue(isinstance(collection, MemoryOptOutCollection)) class TestMemoryOptOutCollection(TestCase): def test_setup_class_iface(self): self.assertTrue(verifyClass(IOptOutCollection, MemoryOptOutCollection)) def test_setup_instance_iface(self): collection = MemoryOptOutCollection() self.assertTrue(verifyObject(IOptOutCollection, collection)) def test_put_and_get(self): store = MemoryOptOutCollection() opt1 = store.put("twitter_handle", "@trevor") self.assertEqual(len(opt1["id"]), 36) # length of uuid-4 string self.assertEqual(opt1, { "id": opt1["id"], "address_type": "twitter_handle", "address": "@trevor" }) opt2 = store.get("twitter_handle", "@trevor") self.assertEqual(opt2, { "id": opt1["id"], "address_type": "twitter_handle", "address": "@trevor" }) def test_get_missing(self): store = MemoryOptOutCollection() opt3 = store.get("mxit", "praekelt_mxit") self.assertEqual(None, opt3) def test_delete_missing(self): store = MemoryOptOutCollection() opt_out_delete = store.delete("twitter_handle", "@trevor") self.assertEqual(None, opt_out_delete) def test_put_and_delete(self): store = MemoryOptOutCollection() opt_put = store.put("facebook", "trevor_fb") self.assertEqual(len(opt_put["id"]), 36) self.assertEqual(opt_put, { "id": opt_put["id"], "address_type": "facebook", "address": "trevor_fb" }) opt_out_del = store.delete("facebook", "trevor_fb") self.assertEqual(opt_out_del, { "id": opt_put["id"], "address_type": "facebook", "address": "trevor_fb" }) opt_out_get = store.get("facebook", "trevor_fb") self.assertEqual(opt_out_get, None) def test_count_zero(self): store = MemoryOptOutCollection() opt_count_zero = store.count() self.assertEqual(opt_count_zero, 0) def test_count_one(self): store = MemoryOptOutCollection() opt_count_one = store.count() self.assertEqual(opt_count_one, 0) store.put("FB", "fb_PRP") opt_count_one = store.count() self.assertEqual(opt_count_one, 1) def test_count_many(self): store = MemoryOptOutCollection() opt_count = store.count() self.assertEqual(opt_count, 0) store.put("facebook", "trevor_fb") store.put("mxit", "trevor_mxit") opt_count = store.count() self.assertEqual(opt_count, 2) PKIfGy։??#go_optouts/store/tests/test_riak.py""" Test for the Riak opt out backend. """ from twisted.internet.defer import inlineCallbacks, returnValue from zope.interface.verify import verifyClass, verifyObject from vumi.tests.helpers import VumiTestCase from vumi.tests.helpers import PersistenceHelper from go.vumitools.opt_out.models import OptOutStore from go_optouts.store.interface import ( IOptOutBackend, IOptOutCollection) from go_optouts.store.riak import ( RiakOptOutBackend, RiakOptOutCollection) class TestRiakOptOutBackend(VumiTestCase): def setUp(self): self.persistence_helper = self.add_helper( PersistenceHelper(use_riak=True, is_sync=False)) @inlineCallbacks def mk_backend(self): manager = yield self.persistence_helper.get_riak_manager() backend = RiakOptOutBackend(manager) returnValue(backend) def test_class_iface(self): self.assertTrue(verifyClass(IOptOutBackend, RiakOptOutBackend)) @inlineCallbacks def test_instance_iface(self): backend = yield self.mk_backend() self.assertTrue(verifyObject(IOptOutBackend, backend)) def test_from_config(self): config = self.persistence_helper.mk_config({})["riak_manager"] backend = RiakOptOutBackend.from_config(config) self.assertTrue(isinstance(backend, RiakOptOutBackend)) self.assertEqual( backend.riak_manager.bucket_prefix, config["bucket_prefix"]) @inlineCallbacks def test_get_opt_out_collection(self): backend = yield self.mk_backend() collection = backend.get_opt_out_collection("owner-1") self.assertEqual(collection.store.user_account_key, "owner-1") self.assertTrue(isinstance(collection, RiakOptOutCollection)) class TestRiakOptOutCollection(VumiTestCase): def setUp(self): self.persistence_helper = self.add_helper( PersistenceHelper(use_riak=True, is_sync=False)) @inlineCallbacks def mk_collection(self, owner_id): manager = yield self.persistence_helper.get_riak_manager() store = OptOutStore(manager, owner_id) collection = RiakOptOutCollection(store) returnValue((store, collection)) def assertStoreAndCollectionEqual( self, opt_store, opt_collection, message, user_account): store_data = opt_store.get_data() self.assertEqual(store_data["message"], message) self.assertEqual(store_data["user_account"], user_account) self.assertEqual(opt_collection, { 'created_at': store_data.get('created_at'), 'message': message, 'user_account': user_account, }) def test_class_iface(self): self.assertTrue(verifyClass(IOptOutCollection, RiakOptOutCollection)) @inlineCallbacks def test_instance_iface(self): _store, collection = yield self.mk_collection("owner-1") self.assertTrue(verifyObject(IOptOutCollection, collection)) @inlineCallbacks def test_get_opt_out_exists(self): store, collection = yield self.mk_collection("owner-1") opt_out_store = yield store.new_opt_out( "msisdn", "+12345", {"message_id": "dummy-id"}) opt_out_coll = yield collection.get("msisdn", "+12345") self.assertStoreAndCollectionEqual( opt_out_store, opt_out_coll, message=u'dummy-id', user_account=u'owner-1') @inlineCallbacks def test_get_opt_out_absent(self): store, collection = yield self.mk_collection("owner-1") opt_out = yield collection.get("msisdn", "+12345") self.assertEqual(opt_out, None) @inlineCallbacks def test_put_opt_out_new(self): store, collection = yield self.mk_collection("owner-1") opt_out_coll = yield collection.put("msisdn", "+12345") opt_out_store = yield store.get_opt_out("msisdn", "+12345") self.assertStoreAndCollectionEqual( opt_out_store, opt_out_coll, message=None, user_account=u'owner-1') @inlineCallbacks def test_delete_opt_out_exists(self): store, collection = yield self.mk_collection("owner-1") opt_out_store = yield store.new_opt_out( "msisdn", "+12345", {"message_id": "dummy-id"}) opt_out_coll = yield collection.delete("msisdn", "+12345") self.assertStoreAndCollectionEqual( opt_out_store, opt_out_coll, message=u'dummy-id', user_account=u'owner-1') @inlineCallbacks def test_delete_opt_out_absent(self): store, collection = yield self.mk_collection("owner-1") opt_out_coll = yield collection.delete("msisdn", "+12345") self.assertEqual(opt_out_coll, None) @inlineCallbacks def test_count_zero(self): _store, collection = yield self.mk_collection("owner-1") self.assertEqual((yield collection.count()), 0) @inlineCallbacks def test_count_one(self): store, collection = yield self.mk_collection("owner-1") yield store.new_opt_out( "msisdn", "+12345", {"message_id": "dummy-id"}) self.assertEqual((yield collection.count()), 1) @inlineCallbacks def test_count_many(self): store, collection = yield self.mk_collection("owner-1") for i in range(4): yield store.new_opt_out( "msisdn", "+1234%d" % i, {"message_id": "dummy-id"}) self.assertEqual((yield collection.count()), 4) PKIfGgo_optouts/backends/__init__.pyPKIfG%go_optouts/backends/tests/__init__.pyPK8`jG?qgo_optouts/tests/test_api.pyfrom twisted.web.server import Site from twisted.internet.defer import inlineCallbacks from vumi.tests.helpers import VumiTestCase from go_optouts.api import API from go_optouts.auth import RequestHeaderAuth from go_optouts.store.memory import MemoryOptOutBackend from go_optouts.tests.utils import SiteHelper class TestApi(VumiTestCase): @inlineCallbacks def setUp(self): self.owner_id = "owner-1" self.backend = MemoryOptOutBackend() self.auth = RequestHeaderAuth() self.collection = self.backend.get_opt_out_collection(self.owner_id) self.app = API(self.backend, self.auth) self.site = Site(self.app.app.resource()) self.site_helper = yield self.add_helper( SiteHelper(self.site, self.owner_header)) def owner_header(self, owner=True, **kw): if owner: kw.setdefault("headers", {}) kw["headers"]["X-Owner-ID"] = self.owner_id return kw # Tests @inlineCallbacks def test_no_owner(self): resp = yield self.site_helper.get("/count", owner=False) self.assertEqual(resp.code, 401) data = yield resp.json() self.assertEqual(data, { "status": { "code": 401, "reason": "Owner ID not valid.", }, }) @inlineCallbacks def test_opt_out_found(self): existing_opt_out = self.collection.put("msisdn", "+273121100") resp = yield self.site_helper.get("/msisdn/+273121100") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "status": { "code": 200, "reason": "OK", }, "opt_out": { "id": existing_opt_out["id"], "address_type": "msisdn", "address": "+273121100", }, }) @inlineCallbacks def test_opt_out_not_found(self): resp = yield self.site_helper.get("/mxit/+369963") self.assertEqual(resp.code, 404) data = yield resp.json() self.assertEqual(data, { "status": { "code": 404, "reason": "Opt out not found.", }, }) @inlineCallbacks def test_opt_out_created(self): resp = yield self.site_helper.put("/msisdn/+273121100") created_opt_out = self.collection.get("msisdn", "+273121100") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "status": { "code": 200, "reason": "OK", }, "opt_out": { "id": created_opt_out["id"], "address_type": "msisdn", "address": "+273121100" }, }) @inlineCallbacks def test_opt_out_conflict(self): self.collection.put("msisdn", "+273121100") response = yield self.site_helper.put("/msisdn/+273121100") self.assertEqual(response.code, 409) data = yield response.json() self.assertEqual(data, { "status": { "code": 409, "reason": "Opt out already exists." }, }) @inlineCallbacks def test_opt_out_deleted(self): delete_opt_out = self.collection.put("whatsapp", "@whatsup") resp = yield self.site_helper.delete("/whatsapp/@whatsup") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "status": { "code": 200, "reason": "OK", }, "opt_out": { "id": delete_opt_out["id"], "address_type": "whatsapp", "address": "@whatsup" }, }) @inlineCallbacks def test_opt_out_nothing_to_delete(self): response = yield self.site_helper.delete("/whatsapp/+2716230199") self.assertEqual(response.code, 404) data = yield response.json() self.assertEqual(data, { "status": { "code": 404, "reason": "There\'s nothing to delete." }, }) @inlineCallbacks def test_opt_out_count_zero_opt_out(self): resp = yield self.site_helper.get("/count") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "opt_out_count": 0, "status": { "code": 200, "reason": "OK" }, }) @inlineCallbacks def test_opt_out_count_two_opt_outs(self): self.collection.put("slack", "@slack") self.collection.put("twitter_handle", "@trevor_october") resp = yield self.site_helper.get("/count") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "opt_out_count": 2, "status": { "code": 200, "reason": "OK" }, }) @inlineCallbacks def test_opt_out_count_three_opt_outs(self): self.collection.put("whatsapp", "+27782635432") self.collection.put("mxit", "@trevor_mxit") self.collection.put("facebook", "fb") resp = yield self.site_helper.get("/count") self.assertEqual(resp.code, 200) data = yield resp.json() self.assertEqual(data, { "opt_out_count": 3, "status": { "code": 200, "reason": "OK" }, }) PKIfG[[go_optouts/tests/utils.pyfrom zope.interface import implements import treq from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks from vumi.tests.helpers import IHelper class SiteHelper(object): """ Helper for testing HTTP Sites. :type site: twisted.web.server.Site :param site: Site to server. :type treq_kw: Function :param treq_kw: Callback function for generating treq request arguments. Any keyword arguments passed to the request helper methods are passed to this callback and the returned dictionary is passed to the underlying treq request function. The default function simple returns the keyword arguments as given. """ implements(IHelper) def __init__(self, site, treq_kw=None): self.site = site self.server = None self.url = None self.treq_kw = treq_kw if self.treq_kw is None: self.treq_kw = lambda **kw: kw @inlineCallbacks def setup(self): self.server = yield reactor.listenTCP(0, self.site) addr = self.server.getHost() self.url = "http://%s:%s" % (addr.host, addr.port) @inlineCallbacks def cleanup(self): if self.server is not None: yield self.server.loseConnection() def _call(self, handler, path, **kw): url = "%s%s" % (self.url, path) kw = self.treq_kw(**kw) return handler(url, persistent=False, **kw) def get(self, path, **kw): return self._call(treq.get, path, **kw) def post(self, path, **kw): return self._call(treq.post, path, **kw) def put(self, path, **kw): return self._call(treq.put, path, **kw) def delete(self, path, **kw): return self._call(treq.delete, path, **kw) PK8`jGKGHHgo_optouts/tests/test_server.pyimport yaml from twisted.internet.defer import inlineCallbacks from twisted.web.server import Site from confmodel.errors import ConfigError from vumi.tests.helpers import VumiTestCase, PersistenceHelper from go_optouts.server import ( HealthResource, read_yaml_config, ApiSiteConfig, ApiSite) from go_optouts.auth import RequestHeaderAuth, BouncerAuth from go_optouts.store.memory import MemoryOptOutBackend from go_optouts.store.riak import RiakOptOutBackend from go_optouts.tests.utils import SiteHelper class TestHealthResource(VumiTestCase): @inlineCallbacks def setUp(self): self.site = Site(HealthResource()) self.site_helper = yield self.add_helper(SiteHelper(self.site)) @inlineCallbacks def test_health_resource(self): result = yield self.site_helper.get('/') self.assertEqual(result.code, 200) body = yield result.text() self.assertEqual(body, "OK") class TestReadYamlConfig(VumiTestCase): def mk_config(self, data): path = self.mktemp() with open(path, "wb") as f: f.write(yaml.safe_dump(data)) return path def test_read_config(self): path = self.mk_config({ "foo": "bar", }) data = read_yaml_config(path) self.assertEqual(data, { "foo": "bar", }) def test_optional_config(self): data = read_yaml_config(None) self.assertEqual(data, {}) class TestApiSiteConfig(VumiTestCase): def setUp(self): self.persistence_helper = self.add_helper( PersistenceHelper(use_riak=True, is_sync=False)) def test_backend_memory(self): cfg = ApiSiteConfig({"backend": "memory"}) self.assertEqual(cfg.backend, "memory") def test_backend_riak(self): cfg = ApiSiteConfig({"backend": "riak"}) self.assertEqual(cfg.backend, "riak") def test_backend_required(self): err = self.assertRaises(ConfigError, ApiSiteConfig, {}) self.assertEqual(str(err), "Missing required config field 'backend'") def test_unknown_backend(self): err = self.assertRaises(ConfigError, ApiSiteConfig, {"backend": "bad"}) self.assertEqual(str(err), "Backend must be one of: riak, memory") def test_backend_config(self): cfg = ApiSiteConfig({"backend": "memory", "backend_config": { "camelot": "ham", }}) self.assertEqual(cfg.backend_config, {"camelot": "ham"}) def test_backend_config_optional(self): cfg = ApiSiteConfig({"backend": "memory"}) self.assertEqual(cfg.backend_config, {}) def test_url_path_prefix(self): cfg = ApiSiteConfig({ "backend": "memory", "url_path_prefix": "flashing/red/light"}) self.assertEqual(cfg.url_path_prefix, "flashing/red/light") def test_url_path_prefix_optional(self): cfg = ApiSiteConfig({"backend": "memory"}) self.assertEqual(cfg.url_path_prefix, "optouts") def test_auth_bouncer_url(self): cfg = ApiSiteConfig({ "backend": "memory", "auth_bouncer_url": "http://example.com/"}) self.assertEqual(cfg.auth_bouncer_url, "http://example.com/") def test_auth_bouncer_url_optional(self): cfg = ApiSiteConfig({"backend": "memory"}) self.assertEqual(cfg.auth_bouncer_url, None) def test_create_backend_memory(self): cfg = ApiSiteConfig({"backend": "memory"}) backend = cfg.create_backend() self.assertTrue(isinstance(backend, MemoryOptOutBackend)) def test_create_backend_riak(self): backend_config = self.persistence_helper.mk_config({})['riak_manager'] cfg = ApiSiteConfig({ "backend": "riak", "backend_config": backend_config, }) backend = cfg.create_backend() self.assertTrue(isinstance(backend, RiakOptOutBackend)) def test_create_auth_request_headers(self): cfg = ApiSiteConfig({"backend": "memory"}) auth = cfg.create_auth() self.assertTrue(isinstance(auth, RequestHeaderAuth)) def test_create_auth_bouncer(self): cfg = ApiSiteConfig({ "backend": "memory", "auth_bouncer_url": "http://example.com/"}) auth = cfg.create_auth() self.assertTrue(isinstance(auth, BouncerAuth)) self.assertEqual(auth._auth_bouncer_url, "http://example.com") class TestApiSite(VumiTestCase): def setUp(self): self.persistence_helper = self.add_helper( PersistenceHelper(use_riak=True, is_sync=False)) def mk_config(self, data): path = self.mktemp() with open(path, "wb") as f: f.write(yaml.safe_dump(data)) return path def mk_api_site(self, config=None): if config is None: config = {} if "backend" not in config: config["backend"] = "memory" return ApiSite(self.mk_config(config)) def mk_server(self, config=None): api_site = self.mk_api_site(config) return self.add_helper( SiteHelper(api_site.site)) @inlineCallbacks def test_health(self): site_helper = yield self.mk_server() result = yield site_helper.get('/health') self.assertEqual(result.code, 200) body = yield result.text() self.assertEqual(body, "OK") @inlineCallbacks def test_opt_out(self): site_helper = yield self.mk_server() result = yield site_helper.get('/optouts/count', headers={ "X-Owner-ID": "owner-1", }) self.assertEqual(result.code, 200) data = yield result.json() self.assertEqual(data, { 'opt_out_count': 0, 'status': { 'code': 200, 'reason': 'OK', }, }) @inlineCallbacks def test_url_path_prefix(self): site_helper = yield self.mk_server({ "url_path_prefix": "wombats" }) result = yield site_helper.get('/wombats/count', headers={ "X-Owner-ID": "owner-1", }) self.assertEqual(result.code, 200) data = yield result.json() self.assertEqual(data, { 'opt_out_count': 0, 'status': { 'code': 200, 'reason': 'OK', }, }) def test_memory_backend(self): api_site = self.mk_api_site({"backend": "memory"}) backend = api_site.api._backend self.assertTrue(isinstance(backend, MemoryOptOutBackend)) def test_riak_backend(self): config = self.persistence_helper.mk_config({})['riak_manager'] api_site = self.mk_api_site({ "backend": "riak", "backend_config": config, }) backend = api_site.api._backend self.assertTrue(isinstance(backend, RiakOptOutBackend)) self.assertEqual( backend.riak_manager.bucket_prefix, config["bucket_prefix"]) def test_auth_request_headers(self): api_site = self.mk_api_site({"backend": "memory"}) auth = api_site.api._auth self.assertTrue(isinstance(auth, RequestHeaderAuth)) def test_auth_bouncer(self): api_site = self.mk_api_site({ "backend": "memory", "auth_bouncer_url": "http://example.com/", }) auth = api_site.api._auth self.assertTrue(isinstance(auth, BouncerAuth)) self.assertEqual(auth._auth_bouncer_url, "http://example.com") PKIfGb, go_optouts/tests/test_main.pyfrom vumi.tests.helpers import VumiTestCase from click.testing import CliRunner from go_optouts.server import ApiSite from go_optouts.store.memory import MemoryOptOutBackend from go_optouts.main import run class TestCli(VumiTestCase): def setUp(self): self.run_calls = [] def record_site_run(*args): self.run_calls.append(args) self.patch(ApiSite, 'run', record_site_run) def test_help(self): runner = CliRunner() result = runner.invoke(run, ['--help']) self.assertEqual(result.exit_code, 0) self.assertTrue("Vumi Go Opt Out API." in result.output) self.assertTrue( "-c, --config TEXT YAML config file" in result.output) self.assertTrue( "-h, --host TEXT Host to listen on" in result.output) self.assertTrue( "-p, --port INTEGER Port to listen on" in result.output) def test_version(self): runner = CliRunner() result = runner.invoke(run, ['--version']) self.assertEqual(result.exit_code, 0) self.assertTrue("go-optouts, version " in result.output) def test_config_required(self): runner = CliRunner() result = runner.invoke(run, []) self.assertEqual(result.exit_code, 2) self.assertTrue('Missing option "--config" / "-c".' in result.output) def test_run(self): cfg_file = self.mktemp() with open(cfg_file, "wb") as f: f.write("backend: memory") runner = CliRunner() result = runner.invoke(run, [ '--config', cfg_file, '--host', '127.0.0.1', '--port', '8000', ]) self.assertEqual(result.exit_code, 0) [run_call] = self.run_calls [site, host, port] = run_call self.assertTrue(isinstance(site.api._backend, MemoryOptOutBackend)) self.assertEqual(host, '127.0.0.1') self.assertEqual(port, 8000) def test_run_default_parameters(self): cfg_file = self.mktemp() with open(cfg_file, "wb") as f: f.write("backend: memory") runner = CliRunner() result = runner.invoke(run, [ '--config', cfg_file, ]) self.assertEqual(result.exit_code, 0) [run_call] = self.run_calls [site, host, port] = run_call self.assertTrue(isinstance(site.api._backend, MemoryOptOutBackend)) self.assertEqual(host, 'localhost') self.assertEqual(port, 8080) PKIfGgo_optouts/tests/__init__.pyPK8`jG)dSSgo_optouts/tests/test_auth.py""" Test for go_optouts.auth. """ from zope.interface.verify import verifyClass, verifyObject from twisted.internet.defer import inlineCallbacks, Deferred from twisted.web.resource import Resource from twisted.web.server import Request, Site from vumi.tests.helpers import VumiTestCase from go_optouts.auth import IAuthenticator, RequestHeaderAuth, BouncerAuth from go_optouts.tests.utils import SiteHelper def mk_request(path=None, headers=None): request = Request(channel=None, queued=True) if path: request.path = path if headers: for k, v in headers.items(): request.requestHeaders.addRawHeader(k, v) return request class TestRequestHeaderAuth(VumiTestCase): def test_class_interface(self): self.assertTrue(verifyClass(IAuthenticator, RequestHeaderAuth)) def test_instance_iface(self): auth = RequestHeaderAuth() self.assertTrue(verifyObject(IAuthenticator, auth)) @inlineCallbacks def test_owner_id_present(self): auth = RequestHeaderAuth() request = mk_request(headers={"X-Owner-ID": "owner-1"}) owner_id_d = auth.owner_id(request) self.assertTrue(isinstance(owner_id_d, Deferred)) self.assertEqual((yield owner_id_d), "owner-1") @inlineCallbacks def test_owner_id_absent(self): auth = RequestHeaderAuth() request = mk_request() owner_id_d = auth.owner_id(request) self.assertTrue(isinstance(owner_id_d, Deferred)) self.assertEqual((yield owner_id_d), None) class DummyAuthResource(Resource): isLeaf = True def __init__(self): self.requests = [] self.responses = [] def add_response(self, code=401, body="Unauthorized", owner_id=None): self.responses.append({ 'code': code, 'body': body, 'owner_id': owner_id }) def pop_response(self): if not self.responses: self.add_response() return self.responses.pop(0) def render(self, request): self.requests.append(request) response = self.pop_response() request.setResponseCode(response['code']) if response['owner_id']: request.setHeader('X-Owner-ID', response['owner_id']) return response['body'] class TestBouncerAuth(VumiTestCase): @inlineCallbacks def setUp(self): self.auth_resource = DummyAuthResource() self.site = Site(self.auth_resource) self.site_helper = yield self.add_helper(SiteHelper(self.site)) @property def auth_url(self): return self.site_helper.url def test_class_interface(self): self.assertTrue(verifyClass(IAuthenticator, BouncerAuth)) def test_instance_iface(self): auth = BouncerAuth(self.auth_url) self.assertTrue(verifyObject(IAuthenticator, auth)) @inlineCallbacks def test_auth_success(self): self.auth_resource.add_response( code=200, body='Authorized', owner_id='owner-1') auth = BouncerAuth(self.auth_url) request = mk_request(path="/foo") owner_id_d = auth.owner_id(request) self.assertTrue(isinstance(owner_id_d, Deferred)) self.assertEqual((yield owner_id_d), "owner-1") [auth_request] = self.auth_resource.requests self.assertEqual(auth_request.uri, '/foo') @inlineCallbacks def test_auth_failed(self): auth = BouncerAuth(self.auth_url) request = mk_request(path="/foo") owner_id_d = auth.owner_id(request) self.assertTrue(isinstance(owner_id_d, Deferred)) self.assertEqual((yield owner_id_d), None) [auth_request] = self.auth_resource.requests self.assertEqual(auth_request.uri, '/foo') @inlineCallbacks def test_auth_headers_proxied(self): auth = BouncerAuth(self.auth_url) request = mk_request( path="/foo", headers={'Authorization': 'token'}) yield auth.owner_id(request) [auth_request] = self.auth_resource.requests self.assertEqual(auth_request.getHeader('Authorization'), 'token') @inlineCallbacks def test_auth_headers_absent(self): auth = BouncerAuth(self.auth_url) request = mk_request(path="/foo") yield auth.owner_id(request) [auth_request] = self.auth_resource.requests self.assertEqual(auth_request.getHeader('Authorization'), None) PKC`jGu*go_optouts-0.1.1.dist-info/DESCRIPTION.rstGo Opt Outs API =============== An API for managing `Vumi Go`_ opt outs. .. _Vumi Go: http://github.com/praekelt/vumi-go |gooptouts-ci|_ |gooptouts-cover|_ .. |gooptouts-ci| image:: https://travis-ci.org/praekelt/go-optouts-api.png?branch=develop .. _gooptouts-ci: https://travis-ci.org/praekelt/go-optouts-api .. |gooptouts-cover| image:: https://coveralls.io/repos/praekelt/go-optouts-api/badge.png?branch=develop .. _gooptouts-cover: https://coveralls.io/r/praekelt/go-optouts-api You can contact the Vumi development team in the following ways: * via *email* by joining the the `vumi-dev@googlegroups.com`_ mailing list * on *irc* in *#vumi* on the `Freenode IRC network`_ .. _vumi-dev@googlegroups.com: https://groups.google.com/forum/?fromgroups#!forum/vumi-dev .. _Freenode IRC network: https://webchat.freenode.net/?channels=#vumi Issues can be filed in the GitHub issue tracker. Please don't use the issue tracker for general support queries. PKC`jGhYFF+go_optouts-0.1.1.dist-info/entry_points.txt [console_scripts] go-optouts=go_optouts.main:run PKC`jGբ(go_optouts-0.1.1.dist-info/metadata.json{"classifiers": ["Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: BSD License", "Operating System :: POSIX", "Programming Language :: Python", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Internet :: WWW/HTTP"], "extensions": {"python.commands": {"wrap_console": {"go-optouts": "go_optouts.main:run"}}, "python.details": {"contacts": [{"email": "dev@praekeltfoundation.org", "name": "Praekelt Foundation", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "http://github.com/praekelt/go-optouts-api"}}, "python.exports": {"console_scripts": {"go-optouts": "go_optouts.main:run"}}}, "extras": [], "generator": "bdist_wheel (0.26.0)", "license": "BSD", "metadata_version": "2.0", "name": "go-optouts", "run_requires": [{"requires": ["PyYAML", "Twisted (>=13.1.0)", "click", "klein", "treq", "vumi (>=0.5.4)", "vumi-go"]}], "summary": "An API for managing Vumi Go opt outs.", "version": "0.1.1"}PKC`jGs} (go_optouts-0.1.1.dist-info/top_level.txtgo_optouts PKC`jG''\\ go_optouts-0.1.1.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py2-none-any PKC`jGHf#go_optouts-0.1.1.dist-info/METADATAMetadata-Version: 2.0 Name: go-optouts Version: 0.1.1 Summary: An API for managing Vumi Go opt outs. Home-page: http://github.com/praekelt/go-optouts-api Author: Praekelt Foundation Author-email: dev@praekeltfoundation.org License: BSD Platform: UNKNOWN Classifier: Development Status :: 4 - Beta Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: BSD License Classifier: Operating System :: POSIX Classifier: Programming Language :: Python Classifier: Topic :: Software Development :: Libraries :: Python Modules Classifier: Topic :: Internet :: WWW/HTTP Requires-Dist: PyYAML Requires-Dist: Twisted (>=13.1.0) Requires-Dist: click Requires-Dist: klein Requires-Dist: treq Requires-Dist: vumi (>=0.5.4) Requires-Dist: vumi-go Go Opt Outs API =============== An API for managing `Vumi Go`_ opt outs. .. _Vumi Go: http://github.com/praekelt/vumi-go |gooptouts-ci|_ |gooptouts-cover|_ .. |gooptouts-ci| image:: https://travis-ci.org/praekelt/go-optouts-api.png?branch=develop .. _gooptouts-ci: https://travis-ci.org/praekelt/go-optouts-api .. |gooptouts-cover| image:: https://coveralls.io/repos/praekelt/go-optouts-api/badge.png?branch=develop .. _gooptouts-cover: https://coveralls.io/r/praekelt/go-optouts-api You can contact the Vumi development team in the following ways: * via *email* by joining the the `vumi-dev@googlegroups.com`_ mailing list * on *irc* in *#vumi* on the `Freenode IRC network`_ .. _vumi-dev@googlegroups.com: https://groups.google.com/forum/?fromgroups#!forum/vumi-dev .. _Freenode IRC network: https://webchat.freenode.net/?channels=#vumi Issues can be filed in the GitHub issue tracker. Please don't use the issue tracker for general support queries. PKC`jGb!go_optouts-0.1.1.dist-info/RECORDgo_optouts/__init__.py,sha256=xSFGoB7OWnL_pOJqdimW5YLtc-STPC52lSzcJzZ54Jo,81 go_optouts/api.py,sha256=Gv--FB3xfRm9rHkbiav60HYAkG-_fa7TCU0WiW3BVWU,3571 go_optouts/auth.py,sha256=wugxup8YML7R9lohHxQ6u9QDooG0JsaMU2o3ttxqzKI,1533 go_optouts/main.py,sha256=qhgXRTtn-87C8EGXuOHrKEpyIZ6FUVExM2iL1yGQmeA,535 go_optouts/server.py,sha256=wYlyJvL9x9tvdClMuhwu0JG49YdhplA4WwU2ituMLiQ,2466 go_optouts/backends/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 go_optouts/backends/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 go_optouts/store/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 go_optouts/store/interface.py,sha256=YPNi9YCfcZ9gs1ROQz-9VlSThl10aymEy4hJ0wZThUA,705 go_optouts/store/memory.py,sha256=HAzfx94pjUW0MSvRoXUzYTgAi8r2rz14OEuUZp21VXY,1848 go_optouts/store/riak.py,sha256=hUldF3FzUL5T-_q1Ubzpc0pbqXiYntg9LnrkGsBgAx8,3143 go_optouts/store/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 go_optouts/store/tests/test_memory.py,sha256=whPhI23NNZdKfmLOV4JAWxzQ-8ytA8cK08CUhJvsH0c,3653 go_optouts/store/tests/test_riak.py,sha256=tjW7RAvVF35lz_hURTl7NxtPop6G_Gqfv2YgPYvGlSo,5439 go_optouts/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 go_optouts/tests/test_api.py,sha256=ZFRB35okEdHTGYBZVRu6XS2-pFuJClg_TQ39-lQGGqU,5605 go_optouts/tests/test_auth.py,sha256=2ntl7-di05_gnrNVoucAsOXpwnCBxbmL2SpPwc_3OJc,4435 go_optouts/tests/test_main.py,sha256=xm1eH0J_7tswvRICyLxgalEY_6y2TgwvdBJzD4N9qS8,2502 go_optouts/tests/test_server.py,sha256=mKBjFTjx0Dt86E22sV9gvcPyreCrSmCNOY3HNGP7bZg,7496 go_optouts/tests/utils.py,sha256=SJIl1knhPDmFIA7alvISroVbBbNjMRqEVxmRtwz6kLE,1806 go_optouts-0.1.1.dist-info/DESCRIPTION.rst,sha256=4ubVnIXy4o1c8lQp_iMnIJKBDOdx0ZKoe8KEU6GJk9k,963 go_optouts-0.1.1.dist-info/METADATA,sha256=Trz79FePIlTbNFIj5pJy_7k5A8DLSXR6HORJyZTIftg,1723 go_optouts-0.1.1.dist-info/RECORD,, go_optouts-0.1.1.dist-info/WHEEL,sha256=JTb7YztR8fkPg6aSjc571Q4eiVHCwmUDlX8PhuuqIIE,92 go_optouts-0.1.1.dist-info/entry_points.txt,sha256=0zNAZ1aO5plcmSlrbOVCHGCjAEEPNzi6cLCaLm0JFU0,70 go_optouts-0.1.1.dist-info/metadata.json,sha256=_R72FA6F3Cy-pJfCwvDDyWrsPEJj_XPys96n0J53USA,1018 go_optouts-0.1.1.dist-info/top_level.txt,sha256=AzqO-Z2rJZknwEPeLaXd_jXVzwLzynNKgsVS4N_Zgbo,11 PKIfGOBgo_optouts/main.pyPK8`jGF Ggo_optouts/api.pyPK8`jGigo_optouts/auth.pyPK8`jG@QQgo_optouts/__init__.pyPK8`jG;Zt go_optouts/server.pyPKIfG~G G  go_optouts/store/riak.pyPKIfGl-go_optouts/store/interface.pyPKIfGem88h0go_optouts/store/memory.pyPKIfG7go_optouts/store/__init__.pyPKIfG"8go_optouts/store/tests/__init__.pyPKIfG2sEE%R8go_optouts/store/tests/test_memory.pyPKIfGy։??#Fgo_optouts/store/tests/test_riak.pyPKIfGZ\go_optouts/backends/__init__.pyPKIfG%\go_optouts/backends/tests/__init__.pyPK8`jG?q\go_optouts/tests/test_api.pyPKIfG[[rgo_optouts/tests/utils.pyPK8`jGKGHH>zgo_optouts/tests/test_server.pyPKIfGb, ×go_optouts/tests/test_main.pyPKIfGġgo_optouts/tests/__init__.pyPK8`jG)dSSgo_optouts/tests/test_auth.pyPKC`jGu*go_optouts-0.1.1.dist-info/DESCRIPTION.rstPKC`jGhYFF+go_optouts-0.1.1.dist-info/entry_points.txtPKC`jGբ(&go_optouts-0.1.1.dist-info/metadata.jsonPKC`jGs} (fgo_optouts-0.1.1.dist-info/top_level.txtPKC`jG''\\ go_optouts-0.1.1.dist-info/WHEELPKC`jGHf#Qgo_optouts-0.1.1.dist-info/METADATAPKC`jGb!Mgo_optouts-0.1.1.dist-info/RECORDPK