PKY]ŒHYí++junebug/api.pyfrom klein import Klein import logging from werkzeug.exceptions import HTTPException from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web import http from vumi.persist.txredis_manager import TxRedisManager from vumi.utils import load_class_by_string from junebug.amqp import MessageSender from junebug.channel import Channel from junebug.error import JunebugError from junebug.utils import api_from_event, json_body, response from junebug.validate import body_schema, validate from junebug.stores import ( InboundMessageStore, MessageRateStore, OutboundMessageStore) logging = logging.getLogger(__name__) class ApiUsageError(JunebugError): '''Exception that is raised whenever the API is used incorrectly. Used for incorrect requests and invalid data.''' name = 'ApiUsageError' description = 'api usage error' code = http.BAD_REQUEST class JunebugApi(object): app = Klein() def __init__(self, service, config): self.service = service self.redis_config = config.redis self.amqp_config = config.amqp self.config = config @inlineCallbacks def setup(self, redis=None, message_sender=None): if redis is None: redis = yield TxRedisManager.from_config(self.redis_config) if message_sender is None: message_sender = MessageSender( 'amqp-spec-0-8.xml', self.amqp_config) self.redis = redis self.message_sender = message_sender self.message_sender.setServiceParent(self.service) self.inbounds = InboundMessageStore( self.redis, self.config.inbound_message_ttl) self.outbounds = OutboundMessageStore( self.redis, self.config.outbound_message_ttl) self.message_rate = MessageRateStore(self.redis) self.plugins = [] for plugin_config in self.config.plugins: cls = load_class_by_string(plugin_config['type']) plugin = cls() yield plugin.start_plugin(plugin_config, self.config) self.plugins.append(plugin) yield Channel.start_all_channels( self.redis, self.config, self.service, self.plugins) @inlineCallbacks def teardown(self): yield self.redis.close_manager() for plugin in self.plugins: yield plugin.stop_plugin() @app.handle_errors(JunebugError) def generic_junebug_error(self, request, failure): return response(request, failure.value.description, { 'errors': [{ 'type': failure.value.name, 'message': failure.getErrorMessage(), }] }, code=failure.value.code) @app.handle_errors(HTTPException) def http_error(self, request, failure): return response(request, failure.value.description, { 'errors': [{ 'type': failure.value.name, 'message': failure.getErrorMessage(), }] }, code=failure.value.code) @app.handle_errors def generic_error(self, request, failure): logging.exception(failure) return response(request, 'generic error', { 'errors': [{ 'type': failure.type.__name__, 'message': failure.getErrorMessage(), }] }, code=http.INTERNAL_SERVER_ERROR) @app.route('/channels/', methods=['GET']) @inlineCallbacks def get_channel_list(self, request): '''List all channels''' ids = yield Channel.get_all(self.redis) returnValue(response(request, 'channels listed', sorted(ids))) @app.route('/channels/', methods=['POST']) @json_body @validate( body_schema({ 'type': 'object', 'properties': { 'type': {'type': 'string'}, 'label': {'type': 'string'}, 'config': {'type': 'object'}, 'metadata': {'type': 'object'}, 'status_url': {'type': 'string'}, 'mo_url': {'type': 'string'}, 'amqp_queue': {'type': 'string'}, 'rate_limit_count': { 'type': 'integer', 'minimum': 0, }, 'rate_limit_window': { 'type': 'integer', 'minimum': 0, }, 'character_limit': { 'type': 'integer', 'minimum': 0, }, }, 'required': ['type', 'config'], })) @inlineCallbacks def create_channel(self, request, body): '''Create a channel''' if not (body.get('mo_url') or body.get('amqp_queue')): raise ApiUsageError( 'One or both of "mo_url" and "amqp_queue" must be specified') channel = Channel( self.redis, self.config, body, self.plugins) yield channel.start(self.service) yield channel.save() returnValue(response( request, 'channel created', (yield channel.status()))) @app.route('/channels/', methods=['GET']) @inlineCallbacks def get_channel(self, request, channel_id): '''Return the channel configuration and a nested status object''' channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) resp = yield channel.status() returnValue(response( request, 'channel found', resp)) @app.route('/channels/', methods=['POST']) @json_body @validate( body_schema({ 'type': 'object', 'properties': { 'type': {'type': 'string'}, 'label': {'type': 'string'}, 'config': {'type': 'object'}, 'metadata': {'type': 'object'}, 'status_url': {'type': 'string'}, 'mo_url': {'type': 'string'}, 'rate_limit_count': { 'type': 'integer', 'minimum': 0, }, 'rate_limit_window': { 'type': 'integer', 'minimum': 0, }, 'character_limit': { 'type': 'integer', 'minimum': 0, }, }, })) @inlineCallbacks def modify_channel(self, request, body, channel_id): '''Mondify the channel configuration''' channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) resp = yield channel.update(body) returnValue(response( request, 'channel updated', resp)) @app.route('/channels/', methods=['DELETE']) @inlineCallbacks def delete_channel(self, request, channel_id): '''Delete the channel''' channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) yield channel.stop() yield channel.delete() returnValue(response( request, 'channel deleted', {})) @app.route('/channels//restart', methods=['POST']) @inlineCallbacks def restart_channel(self, request, channel_id): '''Restart a channel.''' channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) yield channel.stop() yield channel.start(self.service) returnValue(response(request, 'channel restarted', {})) @app.route('/channels//logs', methods=['GET']) @inlineCallbacks def get_logs(self, request, channel_id): '''Get the last N logs for a channel, sorted reverse chronologically.''' n = request.args.get('n', None) if n is not None: n = int(n[0]) channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) logs = yield channel.get_logs(n) returnValue(response(request, 'logs retrieved', logs)) @app.route('/channels//messages/', methods=['POST']) @json_body @validate( body_schema({ 'type': 'object', 'properties': { 'to': {'type': 'string'}, 'from': {'type': ['string', 'null']}, 'reply_to': {'type': 'string'}, 'content': {'type': ['string', 'null']}, 'event_url': {'type': 'string'}, 'priority': {'type': 'string'}, 'channel_data': {'type': 'object'}, }, 'required': ['content'], 'additionalProperties': False, })) @inlineCallbacks def send_message(self, request, body, channel_id): '''Send an outbound (mobile terminated) message''' if 'to' not in body and 'reply_to' not in body: raise ApiUsageError( 'Either "to" or "reply_to" must be specified') if 'to' in body and 'reply_to' in body: raise ApiUsageError( 'Only one of "to" and "reply_to" may be specified') if 'from' in body and 'reply_to' in body: raise ApiUsageError( 'Only one of "from" and "reply_to" may be specified') channel = yield Channel.from_id( self.redis, self.config, channel_id, self.service, self.plugins) if 'to' in body: msg = yield channel.send_message( self.message_sender, self.outbounds, body) else: msg = yield channel.send_reply_message( self.message_sender, self.outbounds, self.inbounds, body) yield self.message_rate.increment( channel_id, 'outbound', self.config.metric_window) returnValue(response(request, 'message sent', msg)) @app.route( '/channels//messages/', methods=['GET']) @inlineCallbacks def get_message_status(self, request, channel_id, message_id): '''Retrieve the status of a message''' events = yield self.outbounds.load_all_events(channel_id, message_id) events = sorted( (api_from_event(channel_id, e) for e in events), key=lambda e: e['timestamp']) last_event = events[-1] if events else None last_event_type = last_event['event_type'] if last_event else None last_event_timestamp = last_event['timestamp'] if last_event else None returnValue(response(request, 'message status', { 'id': message_id, 'last_event_type': last_event_type, 'last_event_timestamp': last_event_timestamp, 'events': events, })) @app.route('/health', methods=['GET']) def health_status(self, request): return response(request, 'health ok', {}) PKwPGÇŠ’Š¡¡junebug/amqp.pyfrom twisted.application.internet import TCPClient from twisted.application.service import MultiService from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.protocol import ReconnectingClientFactory from twisted.python import log from twisted.web import http from txamqp.client import TwistedDelegate from txamqp.content import Content from txamqp.protocol import AMQClient from vumi.utils import vumi_resource_path from vumi.service import get_spec from junebug.error import JunebugError class AmqpConnectionError(JunebugError): '''Exception that is raised whenever a message is attempted to be send, but no amqp connection is available to send it on''' name = 'AmqpConnectionError' description = 'amqp connection error' code = http.INTERNAL_SERVER_ERROR class MessageSender(MultiService): '''Keeps track of the amqp connection and can send messages. Raises an exception if a message is sent when there is no amqp connection''' def __init__(self, specfile, amqp_config): super(MessageSender, self).__init__() self.amqp_config = amqp_config self.factory = AmqpFactory( specfile, amqp_config, self._connected_callback, self._disconnected_callback) def startService(self): super(MessageSender, self).startService() self.amqp_service = TCPClient( self.amqp_config['hostname'], self.amqp_config['port'], self.factory) self.amqp_service.setServiceParent(self) def _connected_callback(self, client): self.client = client def _disconnected_callback(self): self.client = None def send_message(self, message, **kwargs): if not hasattr(self, 'client') or self.client is None: raise AmqpConnectionError( 'Message not sent, AMQP connection error.') return self.client.publish_message(message, **kwargs) class AmqpFactory(ReconnectingClientFactory, object): def __init__( self, specfile, amqp_config, connected_callback, disconnected_callback): '''Factory that creates JunebugAMQClients. specfile - string of specfile name amqp_config - connection details for amqp server ''' self.connected_callback, self.disconnected_callback = ( connected_callback, disconnected_callback) self.amqp_config = amqp_config self.spec = get_spec(vumi_resource_path(specfile)) self.delegate = TwistedDelegate() super(AmqpFactory, self).__init__() def buildProtocol(self, addr): amqp_client = JunebugAMQClient( self.delegate, self.amqp_config['vhost'], self.spec, self.amqp_config.get('heartbeat', 0)) amqp_client.factory = self self.resetDelay() return amqp_client def clientConnectionFailed(self, connector, reason): log.err("AmqpFactory connection failed (%s)" % ( reason.getErrorMessage(),)) super(AmqpFactory, self).clientConnectionFailed(connector, reason) def clientConnectionLost(self, connector, reason): log.err("AmqpFactory client connection lost (%s)" % ( reason.getErrorMessage(),)) self.disconnected_callback() super(AmqpFactory, self).clientConnectionLost(connector, reason) class RoutingKeyError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class JunebugAMQClient(AMQClient, object): exchange_name = "vumi" routing_key = "routing_key" delivery_mode = 2 # save to disk @inlineCallbacks def connectionMade(self): super(JunebugAMQClient, self).connectionMade() yield self.authenticate(self.factory.amqp_config['username'], self.factory.amqp_config['password']) # authentication was successful log.msg("Got an authenticated AMQP connection") self.factory.connected_callback(self) @inlineCallbacks def get_channel(self): """If channel is None a new channel is created""" if not hasattr(self, 'cached_channel'): channel_id = self.get_new_channel_id() channel = yield self.channel(channel_id) yield channel.channel_open() self.cached_channel = channel else: channel = self.cached_channel returnValue(channel) def get_new_channel_id(self): """ AMQClient keeps track of channels in a dictionary. The channel ids are the keys, get the highest number and up it or just return zero for the first channel """ return (max(self.channels) + 1) if self.channels else 0 def check_routing_key(self, routing_key): if(routing_key != routing_key.lower()): raise RoutingKeyError("The routing_key: %s is not all lower case!" % (routing_key)) def publish_message(self, message, **kwargs): d = self.publish_raw(message.to_json(), **kwargs) d.addCallback(lambda r: message) return d def publish_raw(self, data, **kwargs): amq_message = Content(data) amq_message['delivery mode'] = kwargs.pop( 'delivery_mode', self.delivery_mode) return self.publish(amq_message, **kwargs) @inlineCallbacks def publish(self, message, **kwargs): exchange_name = kwargs.get('exchange_name') or self.exchange_name routing_key = kwargs.get('routing_key') or self.routing_key self.check_routing_key(routing_key) channel = yield self.get_channel() yield channel.basic_publish( exchange=exchange_name, content=message, routing_key=routing_key) PKY]ŒHçð:¬¬junebug/utils.pyimport json from twisted.web import http from functools import wraps from vumi.message import JSONMessageEncoder def response(req, description, data, code=http.OK): req.setHeader('Content-Type', 'application/json') req.setResponseCode(code) return json.dumps({ 'status': code, 'code': http.RESPONSES[code], 'description': description, 'result': data, }, cls=JSONMessageEncoder) def json_body(fn): @wraps(fn) def wrapper(api, req, *a, **kw): body = json.loads(req.content.read()) return fn(api, req, body, *a, **kw) return wrapper def conjoin(a, b): result = {} result.update(a) result.update(b) return result def omit(collection, *fields): return dict((k, v) for k, v in collection.iteritems() if k not in fields) def api_from_message(msg): ret = {} ret['to'] = msg['to_addr'] ret['from'] = msg['from_addr'] ret['message_id'] = msg['message_id'] ret['channel_id'] = msg['transport_name'] ret['timestamp'] = msg['timestamp'] ret['reply_to'] = msg['in_reply_to'] ret['content'] = msg['content'] ret['channel_data'] = msg['helper_metadata'] if msg.get('continue_session') is not None: ret['channel_data']['continue_session'] = msg['continue_session'] if msg.get('session_event') is not None: ret['channel_data']['session_event'] = msg['session_event'] return ret def message_from_api(channel_id, msg): ret = {} if 'reply_to' not in msg: ret['to_addr'] = msg.get('to') ret['from_addr'] = msg.get('from') ret['content'] = msg['content'] ret['transport_name'] = channel_id channel_data = msg.get('channel_data', {}) if channel_data.get('continue_session') is not None: ret['continue_session'] = channel_data.pop('continue_session') if channel_data.get('session_event') is not None: ret['session_event'] = channel_data.pop('session_event') ret['helper_metadata'] = channel_data ret['transport_name'] = channel_id return ret def api_from_event(channel_id, event): parser = { 'ack': _api_from_event_ack, 'nack': _api_from_event_nack, 'delivery_report': _api_from_event_dr, }.get(event['event_type'], lambda *a, **kw: {}) return conjoin({ 'channel_id': channel_id, 'timestamp': event['timestamp'], 'message_id': event['user_message_id'], 'event_details': {}, 'event_type': None, }, parser(channel_id, event)) def api_from_status(channel_id, status): return { 'channel_id': channel_id, 'component': status['component'], 'status': status['status'], 'type': status['type'], 'message': status['message'], 'details': status['details'], } def _api_from_event_ack(channel_id, event): return { 'event_type': 'submitted', 'event_details': {} } def _api_from_event_nack(channel_id, event): return { 'event_type': 'rejected', 'event_details': {'reason': event['nack_reason']} } def _api_from_event_dr(channel_id, event): return { 'event_type': { 'pending': 'delivery_pending', 'failed': 'delivery_failed', 'delivered': 'delivery_succeeded', }.get(event['delivery_status']), } def channel_public_http_properties(properties): config = properties.get('config', {}) results = conjoin({ 'enabled': True, 'web_path': config.get('web_path'), 'web_port': config.get('web_port'), }, properties.get('public_http', {})) if results['web_path'] is None or results['web_port'] is None: return None else: return results PKùh…HeILSb b junebug/workers.pyimport json import logging import treq from twisted.internet.defer import inlineCallbacks from vumi.application.base import ApplicationConfig, ApplicationWorker from vumi.config import ConfigDict, ConfigInt, ConfigText, ConfigFloat from vumi.message import JSONMessageEncoder from vumi.persist.txredis_manager import TxRedisManager from vumi.worker import BaseConfig, BaseWorker from junebug.utils import api_from_message, api_from_event, api_from_status from junebug.stores import ( InboundMessageStore, OutboundMessageStore, StatusStore, MessageRateStore) class MessageForwardingConfig(ApplicationConfig): '''Config for MessageForwardingWorker application worker''' mo_message_url = ConfigText( "The URL to send HTTP POST requests to for MO messages", default=None, static=True) message_queue = ConfigText( "The AMQP queue to forward messages on", default=None, static=True) redis_manager = ConfigDict( "Redis config.", required=True, static=True) inbound_ttl = ConfigInt( "Maximum time (in seconds) allowed to reply to messages", required=True, static=True) outbound_ttl = ConfigInt( "Maximum time (in seconds) allowed for events to arrive for messages", required=True, static=True) metric_window = ConfigFloat( "Size of the buckets to use (in seconds) for metrics", required=True, static=True) class MessageForwardingWorker(ApplicationWorker): '''This application worker consumes vumi messages placed on a configured amqp queue, and sends them as HTTP requests with a JSON body to a configured URL''' CONFIG_CLASS = MessageForwardingConfig @inlineCallbacks def setup_application(self): self.redis = yield TxRedisManager.from_config( self.config['redis_manager']) self.inbounds = InboundMessageStore( self.redis, self.config['inbound_ttl']) self.outbounds = OutboundMessageStore( self.redis, self.config['outbound_ttl']) self.message_rate = MessageRateStore(self.redis) if self.config.get('message_queue') is not None: self.ro_connector = yield self.setup_ro_connector( self.config['message_queue']) self.ro_connector.set_outbound_handler( self._publish_message) @inlineCallbacks def teardown_application(self): yield self.redis.close_manager() @property def channel_id(self): return self.config['transport_name'] @inlineCallbacks def consume_user_message(self, message): '''Sends the vumi message as an HTTP request to the configured URL''' yield self.inbounds.store_vumi_message(self.channel_id, message) msg = api_from_message(message) if self.config.get('mo_message_url') is not None: resp = yield post(self.config['mo_message_url'], msg) if request_failed(resp): logging.exception( 'Error sending message, received HTTP code %r with body %r' '. Message: %r' % (resp.code, (yield resp.content()), msg)) if self.config.get('message_queue') is not None: yield self.ro_connector.publish_inbound(message) yield self._increment_metric('inbound') @inlineCallbacks def store_and_forward_event(self, event): '''Store the event in the message store, POST it to the correct URL.''' yield self._store_event(event) yield self._forward_event(event) yield self._count_event(event) def _increment_metric(self, label): return self.message_rate.increment( self.channel_id, label, self.config['metric_window']) def _count_event(self, event): if event['event_type'] == 'ack': return self._increment_metric('submitted') if event['event_type'] == 'nack': return self._increment_metric('rejected') if event['event_type'] == 'delivery_report': if event['delivery_status'] == 'pending': return self._increment_metric('delivery_pending') if event['delivery_status'] == 'failed': return self._increment_metric('delivery_failed') if event['delivery_status'] == 'delivered': return self._increment_metric('delivery_succeeded') def _store_event(self, event): '''Stores the event in the message store''' message_id = event['user_message_id'] return self.outbounds.store_event(self.channel_id, message_id, event) @inlineCallbacks def _forward_event(self, event): '''Forward the event to the correct places.''' yield self._forward_event_http(event) yield self._forward_event_amqp(event) @inlineCallbacks def _forward_event_http(self, event): '''POST the event to the correct URL''' url = yield self._get_event_url(event) if url is None: return msg = api_from_event(self.channel_id, event) if msg['event_type'] is None: logging.exception("Discarding unrecognised event %r" % (event,)) return resp = yield post(url, msg) if request_failed(resp): logging.exception( 'Error sending event, received HTTP code %r with body %r. ' 'Event: %r' % (resp.code, (yield resp.content()), event)) def _forward_event_amqp(self, event): '''Put the event on the correct queue.''' if self.config.get('message_queue') is not None: return self.ro_connector.publish_event(event) def consume_ack(self, event): return self.store_and_forward_event(event) def consume_nack(self, event): return self.store_and_forward_event(event) def consume_delivery_report(self, event): return self.store_and_forward_event(event) def _get_event_url(self, event): msg_id = event['user_message_id'] return self.outbounds.load_event_url(self.channel_id, msg_id) class ChannelStatusConfig(BaseConfig): '''Config for the ChannelStatusWorker''' redis_manager = ConfigDict( "Redis config.", required=True, static=True) channel_id = ConfigText( "The channel id which this worker is consuming statuses for", required=True, static=True) status_url = ConfigText( "Optional url to POST status events to", default=None, static=True) class ChannelStatusWorker(BaseWorker): '''This worker consumes status messages for the transport, and stores them in redis. Statuses with the same component are overwritten. It can also optionally forward the statuses to a URL''' CONFIG_CLASS = ChannelStatusConfig @inlineCallbacks def setup_connectors(self): connector = yield self.setup_receive_status_connector( "%s.status" % (self.config['channel_id'],)) connector.set_status_handler(self.consume_status) @inlineCallbacks def setup_worker(self): redis = yield TxRedisManager.from_config(self.config['redis_manager']) self.store = StatusStore(redis, ttl=None) yield self.unpause_connectors() def teardown_worker(self): pass @inlineCallbacks def consume_status(self, status): '''Store the status in redis under the correct component''' yield self.store.store_status(self.config['channel_id'], status) if self.config.get('status_url') is not None: yield self.send_status(status) @inlineCallbacks def send_status(self, status): data = api_from_status(self.config['channel_id'], status) resp = yield post(self.config['status_url'], data) if request_failed(resp): logging.exception( 'Error sending status event, received HTTP code %r with ' 'body %r. Status event: %r' % (resp.code, (yield resp.content()), status)) def request_failed(resp): return resp.code < 200 or resp.code >= 300 def post(url, data): return treq.post( url.encode('utf-8'), data=json.dumps(data, cls=JSONMessageEncoder), headers={'Content-Type': 'application/json'}) PKùh…HÐPŠ˜——junebug/validate.pyfrom functools import wraps from twisted.web import http from jsonschema import Draft4Validator from junebug.utils import response def validate(*validators): def validator(fn): @wraps(fn) def wrapper(api, req, *a, **kw): errors = [] for v in validators: errors.extend(v(req, *a, **kw) or []) if not errors: return fn(api, req, *a, **kw) else: return response( req, 'api usage error', {'errors': sorted(errors)}, code=http.BAD_REQUEST) return wrapper return validator def body_schema(schema): json_validator = Draft4Validator(schema) def validator(req, body, *a, **kw): return [{ 'type': 'invalid_body', 'message': e.message } for e in json_validator.iter_errors(body)] return validator PKûh…H…îg^] ] junebug/command_line.pyfrom copy import deepcopy import argparse import json import logging import logging.handlers import os import sys import yaml from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue from twisted.python import log from junebug.service import JunebugService from junebug.config import JunebugConfig def create_parser(): parser = argparse.ArgumentParser( description=( 'Junebug. A system for managing text messaging transports via a ' 'RESTful HTTP interface')) parser.add_argument( '--config', '-c', dest='config_filename', type=str, help='Path to config file. Optional. Command line options override ' 'config options') parser.add_argument( '--interface', '-i', dest='interface', type=str, help='The interface to expose the API on. Defaults to "localhost"') parser.add_argument( '--port', '-p', dest='port', type=int, help='The port to expose the API on, defaults to "8080"') parser.add_argument( '--log-file', '-l', dest='logfile', type=str, help='The file to log to. Defaults to not logging to a file') parser.add_argument( '--redis-host', '-redish', dest='redis_host', type=str, help='The hostname of the redis instance. Defaults to "localhost"') parser.add_argument( '--redis-port', '-redisp', dest='redis_port', type=int, help='The port of the redis instance. Defaults to "6379"') parser.add_argument( '--redis-db', '-redisdb', dest='redis_db', type=int, help='The database to use for the redis instance. Defaults to "0"') parser.add_argument( '--redis-password', '-redispass', dest='redis_pass', type=str, help='The password to use for the redis instance. Defaults to "None"') parser.add_argument( '--amqp-host', '-amqph', dest='amqp_host', type=str, help='The hostname of the amqp endpoint. Defaults to "127.0.0.1"') parser.add_argument( '--amqp-vhost', '-amqpvh', dest='amqp_vhost', type=str, help='The amqp vhost. Defaults to "/"') parser.add_argument( '--amqp-port', '-amqpp', dest='amqp_port', type=int, help='The port of the amqp endpoint. Defaults to "5672"') parser.add_argument( '--amqp-user', '-amqpu', dest='amqp_user', type=str, help='The username to use for the amqp auth. Defaults to "guest"') parser.add_argument( '--amqp-password', '-amqppass', dest='amqp_pass', type=str, help='The password to use for the amqp auth. Defaults to "guest"') parser.add_argument( '--inbound-message-ttl', '-ittl', dest='inbound_message_ttl', type=int, help='The maximum time allowed to reply to a message (in seconds).' 'Defaults to 600 seconds (10 minutes).') parser.add_argument( '--outbound-message-ttl', '-ottl', dest='outbound_message_ttl', type=int, help='The maximum time allowed for events to arrive for ' 'messages (in seconds). Defaults to 172800 seconds (2 days)') parser.add_argument( '--channels', '-ch', dest='channels', type=str, action='append', help='Add a mapping to the list of channels, in the format ' '"channel_type:python_class".') parser.add_argument( '--replace-channels', '-rch', dest='replace_channels', type=bool, help='If True, replaces the default channels with `channels`. ' 'If False, adds `channels` to the list of default channels. Defaults' ' to False.') parser.add_argument( '--plugin', '-pl', dest='plugins', type=str, action='append', help='Add a plugins to the list of plugins, as a json blob of the ' 'plugin config. Must contain a `type` key, with the full python class ' 'path of the plugin') parser.add_argument( '--metric-window', '-mw', type=float, dest='metric_window', help='The size of each bucket ' '(in seconds) to use for metrics. Defaults to 10 seconds.') parser.add_argument( '--logging-path', '-lp', type=str, dest='logging_path', help='The path to place log files for each ' 'channel. Defaults to `logs/`') parser.add_argument( '--log-rotate-size', '-lrs', type=int, dest='log_rotate_size', help='The maximum size (in bytes) for each ' 'log file before it gets rotated. Defaults to 1000000.') parser.add_argument( '--max-log-files', '-mlf', type=int, dest='max_log_files', help='the maximum number of log files to ' 'keep before deleting old files. defaults to 5. 0 is unlimited.') parser.add_argument( '--max-logs', '-ml', type=int, dest='max_logs', help='the maximum number of log entries to ' 'to allow to be fetched through the API. Defaults to 100.') return parser def parse_arguments(args): '''Parse and return the command line arguments''' parser = create_parser() return config_from_args(vars(parser.parse_args(args))) def logging_setup(filename): '''Sets up the logging system to output to stdout and filename, if filename is not None''' LOGGING_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s' if not os.environ.get('JUNEBUG_DISABLE_LOGGING'): # Send Twisted Logs to python logger log.PythonLoggingObserver().start() # Set up stdout logger logging.basicConfig( level=logging.INFO, format=LOGGING_FORMAT, stream=sys.stdout) # Set up file logger if filename: handler = logging.handlers.RotatingFileHandler( filename, maxBytes=1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter(LOGGING_FORMAT)) logging.getLogger().addHandler(handler) @inlineCallbacks def start_server(config): '''Starts a new Junebug HTTP API server on the specified resource and port''' service = JunebugService(config) yield service.startService() returnValue(service) def main(): config = parse_arguments(sys.argv[1:]) logging_setup(config.logfile) start_server(config) reactor.run() def config_from_args(args): args = omit_nones(args) config = load_config(args.pop('config_filename', None)) config['redis'] = parse_redis(config.get('redis', {}), args) config['amqp'] = parse_amqp(config.get('amqp', {}), args) parse_channels(args) args['plugins'] = parse_plugins(config.get('plugins', []), args) combined = conjoin(config, args) # max_log_files == 0 means that no limit should be set, so we need to set # it to `None` for that case combined['max_log_files'] = combined.get('max_log_files') or None return JunebugConfig(combined) def parse_redis(config, args): config = conjoin(deepcopy(JunebugConfig.redis.default), config) overrides(config, args, { 'host': 'redis_host', 'port': 'redis_port', 'db': 'redis_db', 'password': 'redis_pass', }) return config def parse_amqp(config, args): config = conjoin(deepcopy(JunebugConfig.amqp.default), config) overrides(config, args, { 'hostname': 'amqp_host', 'vhost': 'amqp_vhost', 'port': 'amqp_port', 'username': 'amqp_user', 'password': 'amqp_pass', }) return config def parse_channels(args): channels = {} for ch in args.get('channels', {}): key, value = ch.split(':') channels[key] = value if len(channels) > 0: args['channels'] = channels def parse_plugins(config, args): for plugin in args.get('plugins', []): plugin = json.loads(plugin) config.append(plugin) return config def omit_nones(d): return dict((k, v) for k, v in d.iteritems() if v is not None) def conjoin(a, b): result = {} result.update(a) result.update(b) return result def overrides(target, source, mappings): for to_key, from_key in mappings.iteritems(): if from_key in source: target[to_key] = source[from_key] def load_config(filename): if filename is None: return {} with open(filename) as f: config = yaml.safe_load(f) return config if __name__ == '__main__': main() PKY]ŒH» ¥Ô‰‰junebug/__init__.py''' Junebug is a system for managing text messaging transports via a RESTful HTTP interface. ''' import os # Allows us to select which Twisted reactor to use. Must be done before any # Twisted import calls. r = os.environ.get('JUNEBUG_REACTOR', 'DEFAULT') if r == "SELECT": from twisted.internet import selectreactor as r elif r == "POLL": from twisted.internet import pollreactor as r elif r == "KQUEUE": from twisted.internet import kqreactor as r elif r == "WFMO": from twisted.internet import win32eventreactor as r elif r == "IOCP": from twisted.internet import iocpreactor as r elif r == "EPOLL": from twisted.internet import epollreactor as r elif r == "DEFAULT": r = None else: raise RuntimeError("Unsupported JUNEBUG_REACTOR setting %r" % (r,)) if r is not None: r.install() from junebug.api import JunebugApi __all__ = ['JunebugApi'] __version__ = '0.1.4' PKùh…HáŸó8¢¢junebug/stores.pyfrom math import ceil import time from twisted.internet.defer import inlineCallbacks, returnValue from vumi.message import TransportEvent, TransportUserMessage, TransportStatus class BaseStore(object): ''' Base class for store classes. Stores data in redis as a hash. :param redis: Redis manager :type redis: :class:`vumi.persist.redis_manager.RedisManager` :param ttl: Expiry time for keys in the store :type ttl: integer ''' USE_DEFAULT_TTL = object() def __init__(self, redis, ttl=None): self.redis = redis self.ttl = ttl @inlineCallbacks def _redis_op(self, func, id, *args, **kwargs): ttl = kwargs.pop('ttl') if ttl is self.USE_DEFAULT_TTL: ttl = self.ttl val = yield func(id, *args, **kwargs) if ttl is not None: yield self.redis.expire(id, ttl) returnValue(val) def get_key(self, *args): '''Returns a key given strings''' return ':'.join(args) def store_all(self, id, properties, ttl=USE_DEFAULT_TTL): '''Stores all of the keys and values given in the dict `properties` as a hash at the key `id`''' return self._redis_op(self.redis.hmset, id, properties, ttl=ttl) def store_property(self, id, key, value, ttl=USE_DEFAULT_TTL): '''Stores a single key with a value as a hash at the key `id`''' return self._redis_op(self.redis.hset, id, key, value, ttl=ttl) @inlineCallbacks def load_all(self, id, ttl=USE_DEFAULT_TTL): '''Retrieves all the keys and values stored as a hash at the key `id`''' returnValue(( yield self._redis_op(self.redis.hgetall, id, ttl=ttl)) or {}) def load_property(self, id, key, ttl=USE_DEFAULT_TTL): return self._redis_op(self.redis.hget, id, key, ttl=ttl) def increment_id(self, id, ttl=USE_DEFAULT_TTL): '''Increments the value stored at `id` by 1.''' return self._redis_op(self.redis.incr, id, 1, ttl=ttl) def get_id(self, id, ttl=USE_DEFAULT_TTL): '''Returns the value stored at `id`.''' return self._redis_op(self.redis.get, id, ttl=ttl) class InboundMessageStore(BaseStore): '''Stores the entire inbound message, in order to later construct replies''' def get_key(self, channel_id, message_id): return super(InboundMessageStore, self).get_key( channel_id, 'inbound_messages', message_id) def store_vumi_message(self, channel_id, message): '''Stores the given vumi message''' key = self.get_key(channel_id, message.get('message_id')) return self.store_property(key, 'message', message.to_json()) @inlineCallbacks def load_vumi_message(self, channel_id, message_id): '''Retrieves the stored vumi message, given its unique id''' key = self.get_key(channel_id, message_id) msg_json = yield self.load_property(key, 'message') if msg_json is None: returnValue(None) returnValue(TransportUserMessage.from_json(msg_json)) class OutboundMessageStore(BaseStore): '''Stores the event url, in order to look it up when deciding where events should go''' PROPERTY_KEYS = ['event_url'] def get_key(self, channel_id, message_id): return super(OutboundMessageStore, self).get_key( channel_id, 'outbound_messages', message_id) def store_event_url(self, channel_id, message_id, event_url): '''Stores the event_url''' key = self.get_key(channel_id, message_id) return self.store_property(key, 'event_url', event_url) def load_event_url(self, channel_id, message_id): '''Retrieves a stored event url, given the channel and message ids''' key = self.get_key(channel_id, message_id) return self.load_property(key, 'event_url') def store_event(self, channel_id, message_id, event): '''Stores an event for a message''' key = self.get_key(channel_id, message_id) event_id = event['event_id'] return self.store_property(key, event_id, event.to_json()) @inlineCallbacks def load_event(self, channel_id, message_id, event_id): '''Loads the event with id event_id''' key = self.get_key(channel_id, message_id) event_json = yield self.load_property(key, event_id) if event_json is None: returnValue(None) returnValue(TransportEvent.from_json(event_json)) @inlineCallbacks def load_all_events(self, channel_id, message_id): '''Returns a list of all the stored events''' key = self.get_key(channel_id, message_id) events_json = yield self.load_all(key) self._remove_property_keys(events_json) returnValue([ TransportEvent.from_json(e) for e in events_json.values()]) def _remove_property_keys(self, dct): '''If we remove all other property keys, we will be left with just the events.''' for k in self.PROPERTY_KEYS: dct.pop(k, None) class StatusStore(BaseStore): '''Stores the most recent status message for each status component.''' def get_key(self, channel_id): return '%s:status' % channel_id def store_status(self, channel_id, status): '''Stores a single status. Overrides any previous status with the same component.''' key = self.get_key(channel_id) return self.store_property(key, status['component'], status.to_json()) @inlineCallbacks def get_statuses(self, channel_id): '''Returns the latest status message for each component in a dictionary''' key = self.get_key(channel_id) statuses = yield self.load_all(key) returnValue(dict( (k, TransportStatus.from_json(v)) for k, v in statuses.iteritems() )) class MessageRateStore(BaseStore): '''Gets called everytime a message should be counted, and can return the current messages per second.''' def get_seconds(self): return time.time() def get_key(self, channel_id, label, bucket): return super(MessageRateStore, self).get_key( channel_id, label, str(bucket)) def _get_current_key(self, channel_id, label, bucket_size): bucket = int(self.get_seconds() / bucket_size) return self.get_key(channel_id, label, bucket) def _get_last_key(self, channel_id, label, bucket_size): bucket = int(self.get_seconds() / bucket_size) - 1 return self.get_key(channel_id, label, bucket) def increment(self, channel_id, label, bucket_size): '''Increments the correct counter. Should be called whenever a message that should be counted is received. Note: bucket_size should be kept constant for each channel_id and label combination. Changing bucket sizes results in undefined behaviour.''' key = self._get_current_key(channel_id, label, bucket_size) return self.increment_id(key, ttl=int(ceil(bucket_size * 2))) @inlineCallbacks def get_messages_per_second(self, channel_id, label, bucket_size): '''Gets the current message rate in messages per second. Note: bucket_size should be kept constant for each channel_id and label combination. Changing bucket sizes results in undefined behaviour.''' key = self._get_last_key(channel_id, label, bucket_size) rate = yield self.get_id(key, ttl=None) if rate is None: returnValue(0) returnValue(float(rate) / bucket_size) PKwPGÂ!ƒoáájunebug/service.pyfrom twisted.application.service import MultiService from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks from twisted.python import log from twisted.web.server import Site from junebug import JunebugApi class JunebugService(MultiService, object): '''Base service that runs the HTTP API, and contains transports as child services''' def __init__(self, config): super(JunebugService, self).__init__() self.config = config @inlineCallbacks def startService(self): '''Starts the HTTP server, and returns the port object that the server is listening on''' super(JunebugService, self).startService() self.api = JunebugApi(self, self.config) yield self.api.setup() self._port = reactor.listenTCP( self.config.port, Site(self.api.app.resource()), interface=self.config.interface) log.msg( 'Junebug is listening on %s:%s' % (self.config.interface, self.config.port)) @inlineCallbacks def stopService(self): '''Stops the HTTP server.''' yield self.api.teardown() yield self._port.stopListening() super(JunebugService, self).stopService() PKùh…H;Éï£{={=junebug/channel.pyimport collections from copy import deepcopy import json import uuid from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web import http from vumi.message import TransportUserMessage from vumi.service import WorkerCreator from vumi.servicemaker import VumiOptions from junebug.logging_service import JunebugLoggerService, read_logs from junebug.stores import StatusStore, MessageRateStore from junebug.utils import api_from_message, message_from_api, api_from_status from junebug.error import JunebugError class MessageNotFound(JunebugError): '''Raised when a message is not found.''' name = 'MessageNotFound' description = 'message not found' code = http.BAD_REQUEST class ChannelNotFound(JunebugError): '''Raised when a channel's data cannot be found.''' name = 'ChannelNotFound' description = 'channel not found' code = http.NOT_FOUND class InvalidChannelType(JunebugError): '''Raised when an invalid channel type is specified''' name = 'InvalidChannelType', description = 'invalid channel type' code = http.BAD_REQUEST class MessageTooLong(JunebugError): '''Raised when a message exceeds the configured character limit''' name = 'MessageTooLong' description = 'message too long' code = http.BAD_REQUEST transports = { 'telnet': 'vumi.transports.telnet.TelnetServerTransport', 'xmpp': 'vumi.transports.xmpp.XMPPTransport', 'smpp': 'vumi.transports.smpp.SmppTransport', 'dmark': 'vumi.transports.dmark.DmarkUssdTransport', } allowed_message_fields = [ 'transport_name', 'timestamp', 'in_reply_to', 'to_addr', 'from_addr', 'content', 'session_event', 'helper_metadata', 'message_id'] # excluded fields: from_addr_type, group, provider, routing_metadata, # to_addr_type, from_addr_type, message_version, transport_metadata, # message_type, transport_type class Channel(object): OUTBOUND_QUEUE = '%s.outbound' APPLICATION_ID = 'application:%s' STATUS_APPLICATION_ID = 'status:%s' APPLICATION_CLS_NAME = 'junebug.workers.MessageForwardingWorker' STATUS_APPLICATION_CLS_NAME = 'junebug.workers.ChannelStatusWorker' JUNEBUG_LOGGING_SERVICE_CLS = JunebugLoggerService def __init__(self, redis_manager, config, properties, plugins=[], id=None): '''Creates a new channel. ``redis_manager`` is the redis manager, from which a sub manager is created using the channel id. If the channel id is not supplied, a UUID one is generated. Call ``save`` to save the channel data. It can be started using the ``start`` function.''' self._properties = properties self.redis = redis_manager self.id = id self.config = config if self.id is None: self.id = str(uuid.uuid4()) self.options = deepcopy(VumiOptions.default_vumi_options) self.options.update(self.config.amqp) self.transport_worker = None self.application_worker = None self.status_application_worker = None self.sstore = StatusStore(self.redis) self.plugins = plugins self.message_rates = MessageRateStore(self.redis) @property def application_id(self): return self.APPLICATION_ID % (self.id,) @property def status_application_id(self): return self.STATUS_APPLICATION_ID % (self.id,) @property def character_limit(self): return self._properties.get('character_limit') @inlineCallbacks def start(self, service, transport_worker=None): '''Starts the relevant workers for the channel. ``service`` is the parent of under which the workers should be started.''' self._start_transport(service, transport_worker) self._start_application(service) self._start_status_application(service) for plugin in self.plugins: yield plugin.channel_started(self) @inlineCallbacks def stop(self): '''Stops the relevant workers for the channel''' yield self._stop_application() yield self._stop_status_application() yield self._stop_transport() for plugin in self.plugins: yield plugin.channel_stopped(self) @inlineCallbacks def save(self): '''Saves the channel data into redis.''' properties = json.dumps(self._properties) channel_redis = yield self.redis.sub_manager(self.id) yield channel_redis.set('properties', properties) yield self.redis.sadd('channels', self.id) @inlineCallbacks def update(self, properties): '''Updates the channel configuration, saves the updated configuration, and (if needed) restarts the channel with the new configuration. Returns the updated configuration and status.''' self._properties.update(properties) yield self.save() service = self.transport_worker.parent # Only restart if the channel config has changed if 'config' in properties: yield self._stop_transport() yield self._start_transport(service) if 'mo_url' in properties or 'amqp_queue' in properties: yield self._stop_application() yield self._start_application(service) returnValue((yield self.status())) @inlineCallbacks def delete(self): '''Removes the channel data from redis''' channel_redis = yield self.redis.sub_manager(self.id) yield channel_redis.delete('properties') yield self.redis.srem('channels', self.id) @inlineCallbacks def status(self): '''Returns a dict with the configuration and status of the channel''' status = deepcopy(self._properties) status['id'] = self.id status['status'] = yield self._get_status() returnValue(status) def _get_message_rate(self, label): return self.message_rates.get_messages_per_second( self.id, label, self.config.metric_window) @inlineCallbacks def _get_status(self): components = yield self.sstore.get_statuses(self.id) components = dict( (k, api_from_status(self.id, v)) for k, v in components.iteritems() ) status_values = { 'down': 0, 'degraded': 1, 'ok': 2, } try: status = min( (c['status'] for c in components.values()), key=status_values.get) except ValueError: # No statuses status = None returnValue({ 'components': components, 'status': status, 'inbound_message_rate': ( yield self._get_message_rate('inbound')), 'outbound_message_rate': ( yield self._get_message_rate('outbound')), 'submitted_event_rate': ( yield self._get_message_rate('submitted')), 'rejected_event_rate': ( yield self._get_message_rate('rejected')), 'delivery_succeeded_rate': ( yield self._get_message_rate('delivery_succeeded')), 'delivery_failed_rate': ( yield self._get_message_rate('delivery_failed')), 'delivery_pending_rate': ( yield self._get_message_rate('delivery_pending')), }) @classmethod @inlineCallbacks def from_id(cls, redis, config, id, parent, plugins=[]): '''Creates a channel by loading the data from redis, given the channel's id, and the parent service of the channel''' channel_redis = yield redis.sub_manager(id) properties = yield channel_redis.get('properties') if properties is None: raise ChannelNotFound() properties = json.loads(properties) obj = cls(redis, config, properties, plugins, id=id) obj._restore(parent) returnValue(obj) @classmethod @inlineCallbacks def get_all(cls, redis): '''Returns a set of keys of all of the channels''' channels = yield redis.smembers('channels') returnValue(channels) @classmethod @inlineCallbacks def start_all_channels(cls, redis, config, parent, plugins=[]): '''Ensures that all of the stored channels are running''' for id in (yield cls.get_all(redis)): if id not in parent.namedServices: properties = json.loads(( yield redis.get('%s:properties' % id))) channel = cls(redis, config, properties, plugins, id=id) yield channel.start(parent) @inlineCallbacks def send_message(self, sender, outbounds, msg): '''Sends a message.''' event_url = msg.get('event_url') msg = message_from_api(self.id, msg) msg = TransportUserMessage.send(**msg) msg = yield self._send_message(sender, outbounds, event_url, msg) returnValue(api_from_message(msg)) @inlineCallbacks def send_reply_message(self, sender, outbounds, inbounds, msg): '''Sends a reply message.''' in_msg = yield inbounds.load_vumi_message(self.id, msg['reply_to']) if in_msg is None: raise MessageNotFound( "Inbound message with id %s not found" % (msg['reply_to'],)) event_url = msg.get('event_url') msg = message_from_api(self.id, msg) msg = in_msg.reply(**msg) msg = yield self._send_message(sender, outbounds, event_url, msg) returnValue(api_from_message(msg)) def get_logs(self, n): '''Returns the last `n` logs. If `n` is greater than the configured limit, only returns the configured limit amount of logs. If `n` is None, returns the configured limit amount of logs.''' if n is None: n = self.config.max_logs n = min(n, self.config.max_logs) logfile = self.transport_worker.getServiceNamed( 'Junebug Worker Logger').logfile return read_logs(logfile, n) @property def _transport_config(self): config = self._properties['config'] config = self._convert_unicode(config) config['transport_name'] = self.id config['worker_name'] = self.id config['publish_status'] = True return config @property def _application_config(self): return { 'transport_name': self.id, 'mo_message_url': self._properties.get('mo_url'), 'message_queue': self._properties.get('amqp_queue'), 'redis_manager': self.config.redis, 'inbound_ttl': self.config.inbound_message_ttl, 'outbound_ttl': self.config.outbound_message_ttl, 'metric_window': self.config.metric_window, } @property def _status_application_config(self): return { 'redis_manager': self.config.redis, 'channel_id': self.id, 'status_url': self._properties.get('status_url'), } @property def _available_transports(self): if self.config.replace_channels: return self.config.channels else: channels = {} channels.update(transports) channels.update(self.config.channels) return channels @property def _transport_cls_name(self): cls_name = self._available_transports.get(self._properties.get('type')) if cls_name is None: raise InvalidChannelType( 'Invalid channel type %r, must be one of: %s' % ( self._properties.get('type'), ', '.join(self._available_transports.keys()))) return cls_name def _start_transport(self, service, transport_worker=None): # transport_worker parameter is for testing, if it is None, # create the transport worker if transport_worker is None: transport_worker = self._create_transport() transport_worker.setName(self.id) logging_service = self._create_junebug_logger_service() transport_worker.addService(logging_service) transport_worker.setServiceParent(service) self.transport_worker = transport_worker def _start_application(self, service): worker = self._create_application() worker.setName(self.application_id) worker.setServiceParent(service) self.application_worker = worker def _start_status_application(self, service): worker = self._create_status_application() worker.setName(self.status_application_id) worker.setServiceParent(service) self.status_application_worker = worker def _create_transport(self): return self._create_worker( self._transport_cls_name, self._transport_config) def _create_application(self): return self._create_worker( self.APPLICATION_CLS_NAME, self._application_config) def _create_status_application(self): return self._create_worker( self.STATUS_APPLICATION_CLS_NAME, self._status_application_config) def _create_junebug_logger_service(self): return self.JUNEBUG_LOGGING_SERVICE_CLS( self.id, self.config.logging_path, self.config.log_rotate_size, self.config.max_log_files) def _create_worker(self, cls_name, config): creator = WorkerCreator(self.options) worker = creator.create_worker(cls_name, config) return worker @inlineCallbacks def _stop_transport(self): if self.transport_worker is not None: yield self.transport_worker.disownServiceParent() self.transport_worker = None @inlineCallbacks def _stop_application(self): if self.application_worker is not None: yield self.application_worker.disownServiceParent() self.application_worker = None @inlineCallbacks def _stop_status_application(self): if self.status_application_worker is not None: yield self.status_application_worker.disownServiceParent() self.status_application_worker = None def _restore(self, service): self.transport_worker = service.getServiceNamed(self.id) self.application_worker = service.getServiceNamed(self.application_id) self.status_application_worker = service.getServiceNamed( self.status_application_id) def _convert_unicode(self, data): # Twisted doesn't like it when we give unicode in for config things if isinstance(data, basestring): return str(data) elif isinstance(data, collections.Mapping): return dict(map(self._convert_unicode, data.iteritems())) elif isinstance(data, collections.Iterable): return type(data)(map(self._convert_unicode, data)) else: return data def _check_character_limit(self, content): count = len(content) if (self.character_limit is not None and count > self.character_limit): raise MessageTooLong( 'Message content %r is of length %d, which is greater than the' ' character limit of %d' % ( content, count, self.character_limit) ) @inlineCallbacks def _send_message(self, sender, outbounds, event_url, msg): self._check_character_limit(msg['content']) if event_url is not None: yield outbounds.store_event_url( self.id, msg['message_id'], event_url) queue = self.OUTBOUND_QUEUE % (self.id,) msg = yield sender.send_message(msg, routing_key=queue) returnValue(msg) PKwPGOÓÜhññjunebug/error.pyfrom twisted.web import http class JunebugError(Exception): '''Generic error from which all other junebug errors inherit from''' name = 'JunebugError' description = 'Generic Junebug Error' code = http.INTERNAL_SERVER_ERROR PKùh…H¹¬::junebug/logging_service.pyfrom itertools import chain import json import logging import os import time from twisted.python import log from twisted.python.log import ILogObserver from twisted.python.logfile import LogFile from twisted.application.service import Service from zope.interface import implements DEFAULT_LOG_CONTEXT_SENTINEL = "_JUNEBUG_CONTEXT_" class JunebugLogObserver(object): """Twisted log observer that logs to a rotated log file.""" implements(ILogObserver) DEFAULT_ERROR_LEVEL = logging.ERROR DEFAULT_LOG_LEVEL = logging.INFO LOG_LEVEL_THRESHOLD = logging.INFO LOG_ENTRY = '%[(timestamp)s] ' def __init__(self, logfile, worker_id, log_context_sentinel=None): ''' Create a new JunebugLogObserver. :param logfile: File to write logs to. :type logfile: :class:`twisted.python.logfile.LogFile` :param str worker_id: ID of the worker that the log is for. ''' if log_context_sentinel is None: log_context_sentinel = DEFAULT_LOG_CONTEXT_SENTINEL self.worker_id = worker_id self.log_context_sentinel = log_context_sentinel self.log_context = {self.log_context_sentinel: True} self.logfile = logfile def level_for_event(self, event): '''Get the associated log level for an event.''' level = event.get('logLevel') if level is not None: return level if event.get('isError'): return self.DEFAULT_ERROR_LEVEL return self.DEFAULT_LOG_LEVEL def logger_for_event(self, event): '''Get the name of the logger for an event.''' system = event.get('system') logger = ".".join(system.split(',')) return logger.lower() def _log_to_file(self, event): '''Logs the specified event to the log file.''' level = self.level_for_event(event) if level < self.LOG_LEVEL_THRESHOLD: return data = { "logger": self.logger_for_event(event), "level": level, "timestamp": time.time(), "message": log.textFromEventDict(event), } failure = event.get('failure') if failure: data['exception'] = { 'class': repr(failure.type), 'instance': repr(failure.value), 'stack': failure.stack, } self.logfile.write(json.dumps(data) + '\n') def __call__(self, event): if self.log_context_sentinel in event: return if self.worker_id not in (event.get('system') or '').split(','): return log.callWithContext(self.log_context, self._log_to_file, event) class JunebugLoggerService(Service): '''Service for :class:`junebug.logging.JunebugLogObserver`''' log_observer = None def __init__(self, worker_id, path, rotate, max_files, logger=None): ''' Create the service for the Junebug Log Observer. :param str worker_id: ID of the worker to observe logs for. :param str path: Path to place the log files. :param int rotate: Size (in bytes) before rotating log file. :param int max_files: Maximum amount of log files before old log files start to get deleted. :param logger: logger to add observer to. Defaults to twisted.python.log.theLogPublisher :type logger: :class:`twisted.python.log.LogPublisher` ''' self.setName('Junebug Worker Logger') self.logger = logger if logger is not None else log.theLogPublisher self.worker_id = worker_id self.path = path self.rotate = rotate self.max_files = max_files def startService(self): self.logfile = LogFile( self.worker_id, self.path, rotateLength=self.rotate, maxRotatedFiles=self.max_files) self.log_observer = JunebugLogObserver(self.logfile, self.worker_id) self.logger.addObserver(self.log_observer) return super(JunebugLoggerService, self).startService() def stopService(self): if self.running: self.logger.removeObserver(self.log_observer) self.logfile.close() return super(JunebugLoggerService, self).stopService() def registered(self): return self.log_observer in self.logger.observers def reverse_read(filename, buf): ''' Read the non-blank lines from a file in reverse order. :param str filename: The path of the file to be read. :param int buf: The size of the read buffer. :returns: A generator that yields each line from the file in reverse order as a string. ''' with open(filename) as f: f.seek(0, os.SEEK_END) remaining_size = f.tell() incomplete_line = None while remaining_size > 0: f.seek(max(0, remaining_size - buf), os.SEEK_SET) data = f.read(min(remaining_size, buf)) # If we read no data, we should exit the loop if len(data) == 0: break remaining_size -= len(data) lines = data.split('\n') # If the last line of the file doesn't end in a new line, it is # possible that the line hasn't been finished yet, so we should # ignore that line. if incomplete_line is None and data[-1] != '\n': lines.pop(-1) # If there is an incomplete line from the last read, we should # append it to the end of the current read. if incomplete_line: lines[-1] += incomplete_line # If we still have any lines left, the first one could be # incomplete, so store it for the next iteration. if lines: incomplete_line = lines.pop(0) for l in lines[::-1]: if l != '': yield l if incomplete_line: yield incomplete_line def read_logs(logfile, lines, buf=4096): ''' Reads log files to fetch the last N logs. :param logfile: The LogFile to read from. :type logfile: :class:`twisted.python.logfile.LogFile` :param int lines: Maximum number of lines to read. :param int buf: The size of the read buffer. :returns: list -- Each item is a dictionary representing a log entry. ''' logs = [] for filepath in chain( (logfile.path,), ('%s.%d' % (logfile.path, n) for n in logfile.listLogs()) ): for line in reverse_read(filepath, buf): logs.append(json.loads(line)) if len(logs) == lines: return logs return logs PKùh…HshרØjunebug/plugin.pyclass JunebugPlugin(object): '''Base class for all Junebug plugins''' def start_plugin(self, config, junebug_config): ''' Can be overridden with any required startup code for the plugin. Can return a deferred. :param config: The config specific to the plugin. :type config: dictionary :param junebug_config: The config that Junebug was started with. :type junebug_config: :class:`JunebugConfig` ''' pass def stop_plugin(self): ''' Can be overridden with any required shutdown code for the plugin. Can return a deferred. ''' pass def channel_started(self, channel): ''' Called whenever a channel is started. Should be implemented by the plugin. Can return a deferred. :param channel: The channel that has been started. :type channel: :class:`Channel` ''' pass def channel_stopped(self, channel): ''' Called whenever a channel is stopped. Should be implemented by the plugin. Can return a deferred. :param channel: The channel that has been stopped. :type channel: :class:`Channel` ''' pass PKùh…H?vA A junebug/config.pyfrom confmodel import Config from confmodel.fields import ( ConfigBool, ConfigText, ConfigInt, ConfigDict, ConfigList, ConfigFloat) class JunebugConfig(Config): interface = ConfigText( "Interface to expose the API on", default='localhost') port = ConfigInt( "Port to expose the API on", default=8080) logfile = ConfigText( "File to log to or `None` for no logging", default=None) redis = ConfigDict( "Config to use for redis connection", default={ 'host': 'localhost', 'port': 6379, 'db': 0, 'password': None }) amqp = ConfigDict( "Config to use for amqp connection", default={ 'hostname': '127.0.0.1', 'vhost': '/', 'port': 5672, 'db': 0, 'username': 'guest', 'password': 'guest' }) inbound_message_ttl = ConfigInt( "Maximum time (in seconds) allowed to reply to messages", default=60 * 10) outbound_message_ttl = ConfigInt( "Maximum time (in seconds) allowed for events to arrive for messages", default=60 * 60 * 24 * 2) channels = ConfigDict( "Mapping between channel types and python classes.", default={}) replace_channels = ConfigBool( "If `True`, replaces the default channels with `channels`. If `False`," " `channels` is added to the default channels.", default=False) plugins = ConfigList( "A list of dictionaries describing all of the enabled plugins. Each " "item should have a `type` key, with the full python class name of " "the plugin.", default=[]) metric_window = ConfigFloat( "The size of the buckets (in seconds) used for metrics.", default=10.0) logging_path = ConfigText( "The path to place log files in.", default="logs/") log_rotate_size = ConfigInt( "The maximum size (in bytes) of a log file before it gets rotated.", default=1000000) max_log_files = ConfigInt( "The maximum amount of log files allowed before old files start to " "get deleted. 0 is unlimited.", default=5) max_logs = ConfigInt( "The maximum amount of logs that is allowed to be retrieved via the " "API.", default=100) PKùh…Hjunebug/plugins/__init__.pyPKùh…Ho2Æœyy!junebug/plugins/nginx/__init__.pyfrom junebug.plugins.nginx.plugin import NginxPluginConfig, NginxPlugin __all__ = ['NginxPluginConfig', 'NginxPlugin'] PKùh…H?ÊOO$junebug/plugins/nginx/vhost.templateserver { listen 80; server_name %(server_name)s; include %(includes)s; } PKCs…H(Q4junebug/plugins/nginx/plugin.pyimport logging import subprocess from os import path, remove from distutils.dir_util import mkpath from urlparse import urljoin from pkg_resources import resource_filename from confmodel import Config from confmodel.fields import ConfigText from junebug.plugin import JunebugPlugin from junebug.utils import channel_public_http_properties log = logging.getLogger(__name__) def resource_path(filename): return resource_filename('junebug.plugins.nginx', filename) class NginxPluginConfig(Config): '''Config for :class:`NginxJunebugPlugin`''' vhost_file = ConfigText( "The file to write the junebug nginx vhost file to", default='/etc/nginx/sites-enabled/junebug.conf', static=True) locations_dir = ConfigText( "The directory to write location block config files to", default='/etc/nginx/includes/junebug/', static=True) server_name = ConfigText( "Server name to use for nginx vhost", required=True, static=True) vhost_template = ConfigText( "Path to the template file to use for the vhost config", default=resource_path('vhost.template'), static=True) location_template = ConfigText( "Path to the template file to use for each channel's location config", default=resource_path('location.template'), static=True) class NginxPlugin(JunebugPlugin): ''' Manages an nginx virtual host that proxies to the Junebug instance's http-based channels. ''' def start_plugin(self, config, junebug_config): self.configured_channels = set() self.config = NginxPluginConfig(config) self.vhost_template = read(self.config.vhost_template) self.location_template = read(self.config.location_template) write(self.config.vhost_file, self.get_vhost_config()) reload_nginx() def stop_plugin(self): ensure_removed(self.config.vhost_file) for channel_id in self.configured_channels: ensure_removed(self.get_location_path(channel_id)) self.configured_channels = set() reload_nginx() def channel_started(self, channel): properties = channel_public_http_properties(channel._properties) if properties is not None and properties['enabled']: mkpath(self.config.locations_dir) write( self.get_location_path(channel.id), self.get_location_config(properties)) reload_nginx() self.configured_channels.add(channel.id) def channel_stopped(self, channel): if channel.id in self.configured_channels: ensure_removed(self.get_location_path(channel.id)) self.configured_channels.remove(channel.id) reload_nginx() def get_vhost_config(self): return self.vhost_template % self.get_vhost_context() def get_vhost_context(self): return { 'server_name': self.config.server_name, 'includes': path.join(self.config.locations_dir, '*.conf') } def get_location_config(self, properties): return self.location_template % self.get_location_context(properties) def get_location_context(self, properties): web_path = properties['web_path'] base_url = 'http://localhost:%s' % (properties['web_port'],) return { 'external_path': web_path, 'internal_url': urljoin(base_url, web_path) } def get_location_path(self, id): return path.join(self.config.locations_dir, "%s.conf" % (id,)) def reload_nginx(): if in_path('nginx'): subprocess.check_call(['nginx', '-s', 'reload']) else: log.error('Cannot reload nginx, nginx not found in path') def in_path(name): return True if subprocess.call(['which', name]) == 0 else False def read(filename): with open(filename, 'r') as file: return file.read() def write(filename, content): with open(filename, 'w') as file: file.write(content) def ensure_removed(filename): if path.exists(filename): remove(filename) PKùh…HÏê‡>>'junebug/plugins/nginx/location.templatelocation %(external_path)s { proxy_pass %(internal_url)s; } PKùh…H'junebug/plugins/nginx/tests/__init__.pyPKY]ŒH&qæ ; ;*junebug/plugins/nginx/tests/test_plugin.pyimport subprocess from os import path from shutil import rmtree from tempfile import mkdtemp, mkstemp from twisted.internet.defer import inlineCallbacks from junebug.config import JunebugConfig from junebug.tests.helpers import JunebugTestBase from junebug.plugins.nginx.plugin import ( NginxPlugin, read, write, ensure_removed) class TestNginxPlugin(JunebugTestBase): @inlineCallbacks def setUp(self): yield self.start_server() self.nginx_reloads = self.patch_nginx_reloads() def patch_subprocess_call(self, fixtures): calls = [] def call(call_args): calls.append(call_args) matches = [res for args, res in fixtures if args == call_args] return matches[0] if matches else None self.patch(subprocess, 'call', call) return calls def patch_nginx_reloads(self): calls = self.patch_subprocess_call(( (['which', 'nginx'], 0), (['nginx', '-s', 'reload'], 0), )) def nginx_reloads(): reloads = calls.count(['nginx', '-s', 'reload']) del calls[:] return reloads return nginx_reloads def make_temp_dir(self): dirname = mkdtemp() self.addCleanup(lambda: rmtree(dirname)) return dirname def make_temp_file(self): _, filename = mkstemp() self.addCleanup(lambda: ensure_removed(filename)) return filename def test_start_plugin_create_vhost_config(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() vhost_filename = self.make_temp_file() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': vhost_filename, 'locations_dir': locations_dirname }, JunebugConfig({})) self.assertEqual( read(vhost_filename), read(plugin.config.vhost_template) % { 'server_name': 'http//www.example.org', 'includes': path.join(locations_dirname, '*.conf') }) def test_start_plugin_create_vhost_config_custom_template(self): plugin = NginxPlugin() vhost_filename = self.make_temp_file() vhost_template_filename = self.make_temp_file() write(vhost_template_filename, '%(server_name)s') plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': vhost_filename, 'locations_dir': self.make_temp_dir(), 'vhost_template': vhost_template_filename, }, JunebugConfig({})) self.assertEqual(read(vhost_filename), 'http//www.example.org') def test_start_plugin_nginx_reload(self): plugin = NginxPlugin() self.assertEqual(self.nginx_reloads(), 0) plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) self.assertEqual(self.nginx_reloads(), 1) def test_stop_plugin_remove_vhost_config(self): plugin = NginxPlugin() vhost_filename = self.make_temp_file() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': vhost_filename, 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) self.assertTrue(path.exists(vhost_filename)) plugin.stop_plugin() self.assertFalse(path.exists(vhost_filename)) @inlineCallbacks def test_stop_plugin_remove_location_configs(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) chan4 = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) chan5 = yield self.create_channel( self.service, self.redis, id='chan5', properties=properties) plugin.channel_started(chan4) plugin.channel_started(chan5) self.assertTrue( path.exists(path.join(locations_dirname, 'chan4.conf'))) self.assertTrue( path.exists(path.join(locations_dirname, 'chan5.conf'))) plugin.stop_plugin() self.assertFalse( path.exists(path.join(locations_dirname, 'chan4.conf'))) self.assertFalse( path.exists(path.join(locations_dirname, 'chan5.conf'))) @inlineCallbacks def test_stop_plugin_nginx_reload(self): plugin = NginxPlugin() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) chan4 = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) chan5 = yield self.create_channel( self.service, self.redis, id='chan5', properties=properties) plugin.channel_started(chan4) plugin.channel_started(chan5) self.nginx_reloads() # flush reloads plugin.stop_plugin() self.assertEqual(self.nginx_reloads(), 1) @inlineCallbacks def test_channel_started(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) plugin.channel_started(channel) self.assertEqual( read(path.join(locations_dirname, 'chan4.conf')), read(plugin.config.location_template) % { 'external_path': '/foo/bar', 'internal_url': 'http://localhost:2323/foo/bar' }) @inlineCallbacks def test_channel_started_custom_template(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() location_template_filename = self.make_temp_file() write(location_template_filename, '%(external_path)s') plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname, 'location_template': location_template_filename }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) plugin.channel_started(channel) self.assertEqual( read(path.join(locations_dirname, 'chan4.conf')), '/foo/bar') @inlineCallbacks def test_channel_started_ensure_dir(self): plugin = NginxPlugin() locations_dirname = path.join(self.make_temp_dir(), 'a/b/c') plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) plugin.channel_started(channel) self.assertTrue(path.exists(locations_dirname)) @inlineCallbacks def test_channel_started_non_http(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) channel = yield self.create_channel( self.service, self.redis, id='chan4') plugin.channel_started(channel) self.assertFalse( path.exists(path.join(locations_dirname, 'chan4.conf'))) @inlineCallbacks def test_channel_started_public_http_disabled(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(public_http={ 'enabled': False, 'web_path': '/foo/bar', 'web_port': 2323 }) channel = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) plugin.channel_started(channel) self.assertFalse( path.exists(path.join(locations_dirname, 'chan4.conf'))) @inlineCallbacks def test_channel_started_exec_nginx_reload(self): plugin = NginxPlugin() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, properties=properties) self.nginx_reloads() # flush reloads plugin.channel_started(channel) self.assertEqual(self.nginx_reloads(), 1) @inlineCallbacks def test_channel_started_no_nginx_found(self): self.patch_logger() calls = self.patch_subprocess_call(( (['which', 'nginx'], 1), )) plugin = NginxPlugin() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) properties = self.create_channel_properties( web_path='/foo/bar', web_port=2323) channel = yield self.create_channel( self.service, self.redis, properties=properties) self.assertEqual(calls.count(['nginx', '-s', 'reload']), 0) plugin.channel_started(channel) self.assertEqual(calls.count(['nginx', '-s', 'reload']), 0) self.assert_was_logged('Cannot reload nginx, nginx not found in path') @inlineCallbacks def test_channel_stopped(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) plugin.channel_started(channel) self.assertTrue( path.exists(path.join(locations_dirname, 'chan4.conf'))) plugin.channel_stopped(channel) self.assertFalse( path.exists(path.join(locations_dirname, 'chan4.conf'))) @inlineCallbacks def test_channel_stopped_irrelevant_channels(self): plugin = NginxPlugin() locations_dirname = self.make_temp_dir() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': locations_dirname }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) chan4 = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) chan5 = yield self.create_channel( self.service, self.redis, id='chan5', properties=properties) write(path.join(locations_dirname, 'chan5.conf'), 'foo') plugin.channel_started(chan4) self.assertTrue( path.exists(path.join(locations_dirname, 'chan4.conf'))) self.assertTrue( path.exists(path.join(locations_dirname, 'chan5.conf'))) plugin.channel_stopped(chan4) plugin.channel_stopped(chan5) self.assertFalse( path.exists(path.join(locations_dirname, 'chan4.conf'))) self.assertTrue( path.exists(path.join(locations_dirname, 'chan5.conf'))) @inlineCallbacks def test_channel_stopped_nginx_reload(self): plugin = NginxPlugin() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) channel = yield self.create_channel( self.service, self.redis, properties=properties) plugin.channel_started(channel) self.nginx_reloads() # flush reloads plugin.channel_stopped(channel) self.assertEqual(self.nginx_reloads(), 1) @inlineCallbacks def test_channel_stopped_irrelevant_channel_nginx_reload(self): plugin = NginxPlugin() plugin.start_plugin({ 'server_name': 'http//www.example.org', 'vhost_file': self.make_temp_file(), 'locations_dir': self.make_temp_dir() }, JunebugConfig({})) properties = self.create_channel_properties(config={ 'web_path': '/foo/bar', 'web_port': 2323, }) chan4 = yield self.create_channel( self.service, self.redis, id='chan4', properties=properties) chan5 = yield self.create_channel( self.service, self.redis, id='chan5', properties=properties) plugin.channel_started(chan4) self.nginx_reloads() # flush reloads plugin.channel_stopped(chan4) plugin.channel_stopped(chan5) self.assertEqual(self.nginx_reloads(), 1) PKY]ŒH”ûé…€€junebug/tests/test_api.pyimport logging import json import treq from twisted.internet.defer import inlineCallbacks from twisted.web import http from vumi.message import TransportEvent, TransportUserMessage from junebug.channel import Channel from junebug.utils import api_from_message from junebug.tests.helpers import JunebugTestBase, FakeJunebugPlugin from junebug.utils import api_from_event, conjoin, omit class TestJunebugApi(JunebugTestBase): @inlineCallbacks def setUp(self): self.patch_logger() yield self.start_server() def get(self, url, params={}): return treq.get( "%s%s" % (self.url, url), params=params, persistent=False) def post(self, url, data, headers=None): return treq.post( "%s%s" % (self.url, url), json.dumps(data), persistent=False, headers=headers) def delete(self, url): return treq.delete("%s%s" % (self.url, url), persistent=False) @inlineCallbacks def assert_response(self, response, code, description, result, ignore=[]): data = yield response.json() self.assertEqual(response.code, code) for field in ignore: data['result'].pop(field) self.assertEqual(data, { 'status': code, 'code': http.RESPONSES[code], 'description': description, 'result': result, }) @inlineCallbacks def test_http_error(self): resp = yield self.get('/foobar') yield self.assert_response( resp, http.NOT_FOUND, 'The requested URL was not found on the server. If you entered ' 'the URL manually please check your spelling and try again.', { 'errors': [{ 'message': '404: Not Found', 'type': 'Not Found', }] }) @inlineCallbacks def test_startup_plugins_started(self): '''When the API starts, all the configured plugins should start''' yield self.stop_server() config = yield self.create_channel_config( plugins=[{ 'type': 'junebug.tests.helpers.FakeJunebugPlugin' }] ) yield self.start_server(config=config) [plugin] = self.api.plugins self.assertEqual(type(plugin), FakeJunebugPlugin) [(name, [plugin_conf, junebug_conf])] = plugin.calls self.assertEqual(name, 'start_plugin') self.assertEqual(plugin_conf, { 'type': 'junebug.tests.helpers.FakeJunebugPlugin'}) self.assertEqual(junebug_conf, config) @inlineCallbacks def test_shutdown_plugins_stopped(self): '''When the API stops, all the configured plugins should stop''' yield self.stop_server() config = yield self.create_channel_config( plugins=[{ 'type': 'junebug.tests.helpers.FakeJunebugPlugin' }] ) yield self.start_server(config=config) [plugin] = self.api.plugins plugin.calls = [] yield self.stop_server() [(name, [])] = plugin.calls self.assertEqual(name, 'stop_plugin') @inlineCallbacks def test_startup_single_channel(self): properties = self.create_channel_properties() resp = yield self.post('/channels/', properties) id = (yield resp.json())['result']['id'] yield self.stop_server() self.assertFalse(id in self.service.namedServices) yield self.start_server() self.assertTrue(id in self.service.namedServices) @inlineCallbacks def test_startup_multiple_channel(self): ids = [] for i in range(5): properties = self.create_channel_properties() resp = yield self.post('/channels/', properties) id = (yield resp.json())['result']['id'] ids.append(id) yield self.stop_server() for id in ids: self.assertFalse(id in self.service.namedServices) yield self.start_server() for id in ids: self.assertTrue(id in self.service.namedServices) @inlineCallbacks def test_get_channel_list(self): redis = yield self.get_redis() properties = self.create_channel_properties() config = yield self.create_channel_config() resp = yield self.get('/channels/') yield self.assert_response(resp, http.OK, 'channels listed', []) yield Channel(redis, config, properties, id=u'test-channel-1').save() resp = yield self.get('/channels/') yield self.assert_response(resp, http.OK, 'channels listed', [ u'test-channel-1', ]) yield Channel(redis, config, properties, id=u'test-channel-2').save() resp = yield self.get('/channels/') yield self.assert_response(resp, http.OK, 'channels listed', [ u'test-channel-1', u'test-channel-2', ]) @inlineCallbacks def test_create_channel(self): properties = self.create_channel_properties() resp = yield self.post('/channels/', properties) yield self.assert_response( resp, http.OK, 'channel created', conjoin(properties, {'status': self.generate_status()}), ignore=['id']) @inlineCallbacks def test_create_channel_transport(self): properties = self.create_channel_properties() resp = yield self.post('/channels/', properties) # Check that the transport is created with the correct config id = (yield resp.json())['result']['id'] transport = self.service.namedServices[id] self.assertEqual(transport.parent, self.service) self.assertEqual(transport.config, conjoin(properties['config'], { 'transport_name': id, 'worker_name': id, 'publish_status': True, })) @inlineCallbacks def test_create_channel_application(self): properties = self.create_channel_properties() resp = yield self.post('/channels/', properties) channel_id = (yield resp.json())['result']['id'] id = Channel.APPLICATION_ID % (channel_id,) worker = self.service.namedServices[id] self.assertEqual(worker.parent, self.service) self.assertEqual(worker.config['transport_name'], channel_id) self.assertEqual(worker.config['mo_message_url'], 'http://foo.bar') @inlineCallbacks def test_create_channel_invalid_parameters(self): resp = yield self.post('/channels/', { 'type': 'smpp', 'config': {}, 'rate_limit_count': -3, 'character_limit': 'a', 'mo_url': 'http://example.org', }) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': sorted([ { 'message': '-3 is less than the minimum of 0', 'type': 'invalid_body', }, { 'message': "u'a' is not of type 'integer'", 'type': 'invalid_body', }, ]) }) @inlineCallbacks def test_create_channel_mo_destination(self): '''When creating a channel, one of or both of mo_url and mo_queue must be present.''' resp = yield self.post('/channels/', { 'type': 'smpp', 'config': {} }) self.maxDiff = None yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [{ 'message': 'One or both of "mo_url" and "amqp_queue" must' ' be specified', 'type': 'ApiUsageError', }], }) @inlineCallbacks def test_get_missing_channel(self): resp = yield self.get('/channels/foo-bar') yield self.assert_response( resp, http.NOT_FOUND, 'channel not found', { 'errors': [{ 'message': '', 'type': 'ChannelNotFound', }] }) @inlineCallbacks def test_get_channel(self): properties = self.create_channel_properties() config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id=u'test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.get('/channels/test-channel') yield self.assert_response( resp, http.OK, 'channel found', conjoin(properties, { 'status': self.generate_status(), 'id': 'test-channel', })) @inlineCallbacks def test_modify_unknown_channel(self): resp = yield self.post('/channels/foo-bar', {}) yield self.assert_response( resp, http.NOT_FOUND, 'channel not found', { 'errors': [{ 'message': '', 'type': 'ChannelNotFound', }] }) @inlineCallbacks def test_modify_channel_no_config_change(self): properties = self.create_channel_properties() config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post( '/channels/test-channel', {'metadata': {'foo': 'bar'}}) yield self.assert_response( resp, http.OK, 'channel updated', conjoin(properties, { 'status': self.generate_status(), 'id': 'test-channel', 'metadata': {'foo': 'bar'}, })) @inlineCallbacks def test_modify_channel_config_change(self): redis = yield self.get_redis() properties = self.create_channel_properties() config = yield self.create_channel_config() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) properties['config']['name'] = 'bar' resp = yield self.post('/channels/test-channel', properties) yield self.assert_response( resp, http.OK, 'channel updated', conjoin(properties, { 'status': self.generate_status(), 'id': 'test-channel', })) @inlineCallbacks def test_modify_channel_invalid_parameters(self): resp = yield self.post('/channels/foo-bar', { 'rate_limit_count': -3, 'character_limit': 'a', }) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [ { 'message': '-3 is less than the minimum of 0', 'type': 'invalid_body', }, { 'message': "u'a' is not of type 'integer'", 'type': 'invalid_body', }, ] }) @inlineCallbacks def test_delete_channel(self): config = yield self.create_channel_config() properties = self.create_channel_properties() channel = Channel(self.redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) self.assertTrue('test-channel' in self.service.namedServices) properties = yield self.redis.get('test-channel:properties') self.assertNotEqual(properties, None) resp = yield self.delete('/channels/test-channel') yield self.assert_response(resp, http.OK, 'channel deleted', {}) self.assertFalse('test-channel' in self.service.namedServices) properties = yield self.redis.get('test-channel:properties') self.assertEqual(properties, None) resp = yield self.delete('/channels/test-channel') yield self.assert_response( resp, http.NOT_FOUND, 'channel not found', { 'errors': [{ 'message': '', 'type': 'ChannelNotFound', }] }) self.assertFalse('test-channel' in self.service.namedServices) properties = yield self.redis.get('test-channel:properties') self.assertEqual(properties, None) def record_channel_methods(self, *methods): calls = [] def method_recorder(meth): orig_method = getattr(Channel, meth) def record(self, *args, **kw): result = orig_method(self, *args, **kw) calls.append((meth, self.id)) return result return record for meth in methods: self.patch(Channel, meth, method_recorder(meth)) return calls @inlineCallbacks def test_restart_channel(self): config = yield self.create_channel_config() properties = self.create_channel_properties() channel = Channel(self.redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) actions = self.record_channel_methods('start', 'stop') resp = yield self.post('/channels/test-channel/restart', None) yield self.assert_response(resp, http.OK, 'channel restarted', {}) self.assertEqual(actions, [ ('stop', u'test-channel'), ('start', u'test-channel'), ]) @inlineCallbacks def test_restart_missing_channel(self): resp = yield self.post('/channels/test-channel/restart', None) yield self.assert_response( resp, http.NOT_FOUND, 'channel not found', { 'errors': [{ 'message': '', 'type': 'ChannelNotFound', }] }) @inlineCallbacks def test_send_message_invalid_channel(self): resp = yield self.post('/channels/foo-bar/messages/', { 'to': '+1234', 'from': '', 'content': None}) yield self.assert_response( resp, http.NOT_FOUND, 'channel not found', { 'errors': [{ 'message': '', 'type': 'ChannelNotFound', }] }) @inlineCallbacks def test_send_message(self): '''Sending a message should place the message on the queue for the channel''' properties = self.create_channel_properties() config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': 'foo', 'from': None}) yield self.assert_response( resp, http.OK, 'message sent', { 'to': '+1234', 'channel_id': 'test-channel', 'from': None, 'reply_to': None, 'channel_data': {}, 'content': 'foo', }, ignore=['timestamp', 'message_id']) [message] = self.get_dispatched_messages('test-channel.outbound') message_id = (yield resp.json())['result']['message_id'] self.assertEqual(message['message_id'], message_id) event_url = yield self.api.outbounds.load_event_url( 'test-channel', message['message_id']) self.assertEqual(event_url, None) @inlineCallbacks def test_send_message_message_rate(self): '''Sending a message should increment the message rate counter''' clock = yield self.patch_message_rate_clock() channel = Channel( (yield self.get_redis()), (yield self.create_channel_config()), self.create_channel_properties(), id='test-channel') yield channel.save() yield channel.start(self.service) yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': 'foo', 'from': None}) clock.advance(channel.config.metric_window) rate = yield self.api.message_rate.get_messages_per_second( 'test-channel', 'outbound', channel.config.metric_window) self.assertEqual(rate, 1.0 / channel.config.metric_window) @inlineCallbacks def test_send_message_event_url(self): '''Sending a message with a specified event url should store the event url for sending events in the future''' properties = self.create_channel_properties() config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': 'foo', 'from': None, 'event_url': 'http://test.org'}) yield self.assert_response( resp, http.OK, 'message sent', { 'to': '+1234', 'channel_id': 'test-channel', 'from': None, 'reply_to': None, 'channel_data': {}, 'content': 'foo', }, ignore=['timestamp', 'message_id']) event_url = yield self.api.outbounds.load_event_url( 'test-channel', (yield resp.json())['result']['message_id']) self.assertEqual(event_url, 'http://test.org') @inlineCallbacks def test_send_message_reply(self): '''Sending a reply message should fetch the relevant inbound message, use it to construct a reply message, and place the reply message on the queue for the channel''' channel = Channel( redis_manager=(yield self.get_redis()), config=(yield self.create_channel_config()), properties=self.create_channel_properties(), id='test-channel') yield channel.save() yield channel.start(self.service) in_msg = TransportUserMessage( from_addr='+2789', to_addr='+1234', transport_name='test-channel', transport_type='_', transport_metadata={'foo': 'bar'}) yield self.api.inbounds.store_vumi_message('test-channel', in_msg) expected = in_msg.reply(content='testcontent') expected = api_from_message(expected) resp = yield self.post('/channels/test-channel/messages/', { 'reply_to': in_msg['message_id'], 'content': 'testcontent', }) yield self.assert_response( resp, http.OK, 'message sent', omit(expected, 'timestamp', 'message_id'), ignore=['timestamp', 'message_id']) [message] = self.get_dispatched_messages('test-channel.outbound') message_id = (yield resp.json())['result']['message_id'] self.assertEqual(message['message_id'], message_id) @inlineCallbacks def test_send_message_no_to_or_reply_to(self): resp = yield self.post( '/channels/foo-bar/messages/', {'from': None, 'content': None}) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [{ 'message': 'Either "to" or "reply_to" must be specified', 'type': 'ApiUsageError', }] }) @inlineCallbacks def test_send_message_additional_properties(self): '''Additional properties should result in an error being returned.''' resp = yield self.post( '/channels/foo-bar/messages/', { 'from': None, 'content': None, 'to': '', 'foo': 'bar'}) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [{ 'message': "Additional properties are not allowed (u'foo' " "was unexpected)", 'type': 'invalid_body', }] }) @inlineCallbacks def test_send_message_both_to_and_reply_to(self): resp = yield self.post('/channels/foo-bar/messages/', { 'from': None, 'to': '+1234', 'reply_to': '2e8u9ua8', 'content': None, }) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [{ 'message': 'Only one of "to" and "reply_to" may be ' 'specified', 'type': 'ApiUsageError', }] }) @inlineCallbacks def test_send_message_from_and_reply_to(self): resp = yield self.post('/channels/foo-bar/messages/', { 'from': '+1234', 'reply_to': '2e8u9ua8', 'content': None, }) yield self.assert_response( resp, http.BAD_REQUEST, 'api usage error', { 'errors': [{ 'message': 'Only one of "from" and "reply_to" may be ' 'specified', 'type': 'ApiUsageError', }] }) @inlineCallbacks def test_send_message_under_character_limit(self): '''If the content length is under the character limit, no errors should be returned''' properties = self.create_channel_properties(character_limit=100) config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': 'Under the character limit.', 'from': None}) yield self.assert_response( resp, http.OK, 'message sent', { 'to': '+1234', 'channel_id': 'test-channel', 'from': None, 'reply_to': None, 'channel_data': {}, 'content': 'Under the character limit.', }, ignore=['timestamp', 'message_id']) @inlineCallbacks def test_send_message_equal_character_limit(self): '''If the content length is equal to the character limit, no errors should be returned''' content = 'Equal to the character limit.' properties = self.create_channel_properties( character_limit=len(content)) config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': content, 'from': None}) yield self.assert_response( resp, http.OK, 'message sent', { 'to': '+1234', 'channel_id': 'test-channel', 'from': None, 'reply_to': None, 'channel_data': {}, 'content': content, }, ignore=['timestamp', 'message_id']) @inlineCallbacks def test_send_message_over_character_limit(self): '''If the content length is over the character limit, an error should be returned''' properties = self.create_channel_properties(character_limit=10) config = yield self.create_channel_config() redis = yield self.get_redis() channel = Channel(redis, config, properties, id='test-channel') yield channel.save() yield channel.start(self.service) resp = yield self.post('/channels/test-channel/messages/', { 'to': '+1234', 'content': 'Over the character limit.', 'from': None}) yield self.assert_response( resp, http.BAD_REQUEST, 'message too long', { 'errors': [{ 'message': "Message content u'Over the character limit.' " "is of length 25, which is greater than the character " "limit of 10", 'type': 'MessageTooLong', }], }) @inlineCallbacks def test_get_message_status_no_events(self): '''Returns `None` for last event fields, and empty list for events''' resp = yield self.get('/channels/foo-bar/messages/message-id') yield self.assert_response( resp, http.OK, 'message status', { 'id': 'message-id', 'last_event_type': None, 'last_event_timestamp': None, 'events': [], }) @inlineCallbacks def test_get_message_status_one_event(self): '''Returns the event details for last event fields, and list with single event for `events`''' event = TransportEvent( user_message_id='message-id', sent_message_id='message-id', event_type='nack', nack_reason='error error') yield self.outbounds.store_event('channel-id', 'message-id', event) resp = yield self.get('/channels/channel-id/messages/message-id') event_dict = api_from_event('channel-id', event) event_dict['timestamp'] = str(event_dict['timestamp']) yield self.assert_response( resp, http.OK, 'message status', { 'id': 'message-id', 'last_event_type': 'rejected', 'last_event_timestamp': str(event['timestamp']), 'events': [event_dict], }) @inlineCallbacks def test_get_message_status_multiple_events(self): '''Returns the last event details for last event fields, and list with all events for `events`''' events = [] event_dicts = [] for i in range(5): event = TransportEvent( user_message_id='message-id', sent_message_id='message-id', event_type='nack', nack_reason='error error') yield self.outbounds.store_event('channel-id', 'message-id', event) events.append(event) event_dict = api_from_event('channel-id', event) event_dict['timestamp'] = str(event_dict['timestamp']) event_dicts.append(event_dict) resp = yield self.get('/channels/channel-id/messages/message-id') yield self.assert_response( resp, http.OK, 'message status', { 'id': 'message-id', 'last_event_type': 'rejected', 'last_event_timestamp': event_dicts[-1]['timestamp'], 'events': event_dicts, }) @inlineCallbacks def test_get_health_check(self): resp = yield self.get('/health') yield self.assert_response( resp, http.OK, 'health ok', {}) @inlineCallbacks def test_get_channel_logs_no_logs(self): '''If there are no logs, an empty list should be returned.''' channel = yield self.create_channel(self.service, self.redis) log_worker = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') yield log_worker.startService() resp = yield self.get('/channels/%s/logs' % channel.id, params={ 'n': '3', }) self.assert_response( resp, http.OK, 'logs retrieved', []) @inlineCallbacks def test_get_channel_logs_less_than_limit(self): '''If the amount of logs is less than the limit, all the logs should be returned.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test') resp = yield self.get('/channels/%s/logs' % channel.id, params={ 'n': '2', }) self.assert_response( resp, http.OK, 'logs retrieved', [], ignore=[0]) [log] = (yield resp.json())['result'] self.assert_log(log, { 'logger': channel.id, 'message': 'Test', 'level': logging.INFO}) @inlineCallbacks def test_get_channel_logs_more_than_limit(self): '''If the amount of logs is more than the limit, only the latest n should be returned.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test1') channel.transport_worker.test_log('Test2') channel.transport_worker.test_log('Test3') resp = yield self.get('/channels/%s/logs' % channel.id, params={ 'n': '2', }) self.assert_response( resp, http.OK, 'logs retrieved', [], ignore=[1, 0]) [log1, log2] = (yield resp.json())['result'] self.assert_log(log1, { 'logger': channel.id, 'message': 'Test3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test2', 'level': logging.INFO}) @inlineCallbacks def test_get_channel_logs_more_than_configured(self): '''If the amount of requested logs is more than what is configured, then only the configured amount of logs are returned.''' logpath = self.mktemp() config = yield self.create_channel_config( max_logs=2, channels={ 'logging': 'junebug.tests.helpers.LoggingTestTransport', }, logging_path=logpath ) properties = yield self.create_channel_properties(type='logging') yield self.stop_server() yield self.start_server(config=config) channel = yield self.create_channel( self.service, self.redis, config=config, properties=properties) worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test1') channel.transport_worker.test_log('Test2') channel.transport_worker.test_log('Test3') resp = yield self.get('/channels/%s/logs' % channel.id, params={ 'n': '3', }) self.assert_response( resp, http.OK, 'logs retrieved', [], ignore=[1, 0]) [log1, log2] = (yield resp.json())['result'] self.assert_log(log1, { 'logger': channel.id, 'message': 'Test3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test2', 'level': logging.INFO}) @inlineCallbacks def test_get_channel_logs_no_n(self): '''If the number of logs is not specified, then the API should return the configured maximum number of logs.''' logpath = self.mktemp() config = yield self.create_channel_config( max_logs=2, channels={ 'logging': 'junebug.tests.helpers.LoggingTestTransport', }, logging_path=logpath ) properties = yield self.create_channel_properties(type='logging') yield self.stop_server() yield self.start_server(config=config) channel = yield self.create_channel( self.service, self.redis, config=config, properties=properties) worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test1') channel.transport_worker.test_log('Test2') channel.transport_worker.test_log('Test3') resp = yield self.get('/channels/%s/logs' % channel.id) self.assert_response( resp, http.OK, 'logs retrieved', [], ignore=[1, 0]) [log1, log2] = (yield resp.json())['result'] self.assert_log(log1, { 'logger': channel.id, 'message': 'Test3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test2', 'level': logging.INFO}) PKùh…HŠsm<) ) junebug/tests/test_validate.pyimport json from twisted.web import http from twisted.trial.unittest import TestCase from twisted.internet.defer import inlineCallbacks import treq from klein import Klein from junebug.tests.utils import ToyServer from junebug.utils import json_body from junebug.validate import body_schema, validate class TestValidate(TestCase): @inlineCallbacks def test_validate_fail(self): class Api(object): app = Klein() @app.route('/') @validate( lambda _: errs1, lambda _: None, lambda _: errs2) def route(self, req): pass errs1 = [{ 'type': '1', 'message': 'a' }] errs2 = [{ 'type': 'b', 'message': 'B' }] srv = yield ToyServer.from_test(self, Api().app) resp = yield treq.get(srv.url, persistent=False) self.assertEqual(resp.code, http.BAD_REQUEST) content = yield resp.json() self.assertEqual(content['result'], { 'errors': sorted(errs1 + errs2) }) self.assertEqual(content['status'], 400) self.assertEqual(content['code'], 'Bad Request') self.assertEqual(content['description'], 'api usage error') @inlineCallbacks def test_validate_pass(self): class Api(object): app = Klein() @app.route('/') @validate( lambda _: None, lambda _: None) def route(self, req): return 'ok' srv = yield ToyServer.from_test(self, Api().app) resp = yield treq.get(srv.url, persistent=False) self.assertEqual(resp.code, http.OK) self.assertEqual((yield resp.content()), 'ok') @inlineCallbacks def test_body_schema(self): class Api(object): app = Klein() @app.route('/') @json_body @validate(body_schema({'properties': {'foo': {'type': 'string'}}})) def route(self, req, body): pass srv = yield ToyServer.from_test(self, Api().app) resp = yield treq.get( srv.url, persistent=False, data=json.dumps({'foo': 23})) content = yield resp.json() self.assertEqual(content['result'], { 'errors': [{ 'type': 'invalid_body', 'message': "23 is not of type 'string'" }] }) self.assertEqual(content['status'], 400) self.assertEqual(content['code'], 'Bad Request') self.assertEqual(content['description'], 'api usage error') resp = yield treq.get( srv.url, persistent=False, data=json.dumps({'foo': 'bar'})) self.assertEqual(resp.code, http.OK) PKwPGví;ððjunebug/tests/utils.pyfrom klein import Klein from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web.server import Site class ToyServer(object): @inlineCallbacks def setup(self, app=None): if app is None: app = Klein() self.app = app self.server = yield reactor.listenTCP(0, Site(self.app.resource())) addr = self.server.getHost() self.url = "http://%s:%s" % (addr.host, addr.port) def teardown(self): self.server.loseConnection() @classmethod @inlineCallbacks def from_test(cls, test, app=None): server = cls() yield server.setup(app) test.addCleanup(server.teardown) returnValue(server) PKùh…HrßžfF‚F‚junebug/tests/test_channel.pyimport logging import json from twisted.internet.defer import inlineCallbacks from vumi.message import TransportUserMessage, TransportStatus from vumi.transports.telnet import TelnetServerTransport from junebug.utils import api_from_message, api_from_status, conjoin from junebug.workers import ChannelStatusWorker, MessageForwardingWorker from junebug.channel import ( Channel, ChannelNotFound, InvalidChannelType, MessageNotFound) from junebug.logging_service import JunebugLoggerService from junebug.tests.helpers import JunebugTestBase, FakeJunebugPlugin class TestChannel(JunebugTestBase): @inlineCallbacks def setUp(self): self.patch_logger() yield self.start_server() @inlineCallbacks def test_save_channel(self): properties = self.create_channel_properties() channel = yield self.create_channel( self.service, self.redis) props = yield self.redis.get('%s:properties' % channel.id) self.assertEqual(json.loads(props), conjoin(properties, { 'config': conjoin( properties['config'], {'transport_name': channel.id}) })) channel_list = yield self.redis.get('channels') self.assertEqual(channel_list, set([channel.id])) @inlineCallbacks def test_delete_channel(self): properties = self.create_channel_properties() channel = yield self.create_channel( self.service, self.redis) props = yield self.redis.get('%s:properties' % channel.id) self.assertEqual(json.loads(props), conjoin(properties, { 'config': conjoin( properties['config'], {'transport_name': channel.id}) })) channel_list = yield self.redis.get('channels') self.assertEqual(channel_list, set([channel.id])) yield channel.delete() properties = yield self.redis.get('%s:properties' % channel.id) self.assertEqual(properties, None) channel_list = yield self.redis.get('channels') self.assertEqual(channel_list, set()) @inlineCallbacks def test_start_channel_transport(self): '''Starting the channel should start the transport, as well as the logging service for that transport.''' channel = yield self.create_channel( self.service, self.redis) worker = self.service.getServiceNamed(channel.id) self.assertEqual(worker, channel.transport_worker) self.assertTrue(isinstance(worker, TelnetServerTransport)) logging_worker = worker.getServiceNamed('Junebug Worker Logger') self.assertTrue( isinstance(logging_worker, channel.JUNEBUG_LOGGING_SERVICE_CLS)) @inlineCallbacks def test_start_channel_logging(self): '''When the channel is started, the logging worker should be started along with it.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') self.assertTrue(isinstance(worker_logger, JunebugLoggerService)) @inlineCallbacks def test_channel_logging_single_channel(self): '''All logs from a single channel should go to the logging worker.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test message1') channel.transport_worker.test_log('Test message2') [log1, log2] = worker_logger.logfile.logs self.assertEqual(json.loads(log1)['message'], 'Test message1') self.assertEqual(json.loads(log2)['message'], 'Test message2') @inlineCallbacks def test_channel_logging_multiple_channels(self): '''All logs from a single channel should go to the logging worker.''' channel1 = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger1 = channel1.transport_worker.getServiceNamed( 'Junebug Worker Logger') channel2 = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger2 = channel2.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger1.startService() worker_logger2.startService() channel1.transport_worker.test_log('Test message1') channel1.transport_worker.test_log('Test message2') [log1, log2] = worker_logger1.logfile.logs self.assertEqual(json.loads(log1)['message'], 'Test message1') self.assertEqual(json.loads(log2)['message'], 'Test message2') channel2.transport_worker.test_log('Test message3') self.assertEqual(len(worker_logger1.logfile.logs), 2) [log3] = worker_logger2.logfile.logs self.assertEqual(json.loads(log3)['message'], 'Test message3') @inlineCallbacks def test_transport_class_name_default(self): config = yield self.create_channel_config(channels={}) properties = self.create_channel_properties(type='telnet') channel = Channel(self.redis, config, properties) self.assertEqual( channel._transport_cls_name, 'vumi.transports.telnet.TelnetServerTransport') @inlineCallbacks def test_transport_class_name_specified(self): config = yield self.create_channel_config(channels={'foo': 'bar.baz'}) properties = self.create_channel_properties(type='foo') channel = Channel(self.redis, config, properties) self.assertEqual( channel._transport_cls_name, 'bar.baz') @inlineCallbacks def test_transport_class_name_overridden(self): config = yield self.create_channel_config( channels={'foo': 'bar.baz'}, replace_channels=True) properties = self.create_channel_properties(type='telnet') channel = Channel(self.redis, config, properties) err = self.assertRaises( InvalidChannelType, getattr, channel, '_transport_cls_name') self.assertTrue(all(cls in err.message for cls in ['telnet', 'foo'])) @inlineCallbacks def test_start_channel_application(self): properties = self.create_channel_properties(mo_url='http://foo.org') channel = yield self.create_channel( self.service, self.redis, properties=properties) worker = channel.application_worker id = channel.application_id self.assertTrue(isinstance(worker, MessageForwardingWorker)) self.assertEqual(self.service.namedServices[id], worker) self.assertEqual(worker.config, { 'transport_name': channel.id, 'mo_message_url': 'http://foo.org', 'message_queue': None, 'redis_manager': channel.config.redis, 'inbound_ttl': channel.config.inbound_message_ttl, 'outbound_ttl': channel.config.outbound_message_ttl, 'metric_window': channel.config.metric_window, }) @inlineCallbacks def test_start_channel_status_application(self): properties = self.create_channel_properties() channel = yield self.create_channel( self.service, self.redis, properties=properties) worker = channel.status_application_worker id = channel.status_application_id self.assertTrue(isinstance(worker, ChannelStatusWorker)) self.assertEqual(self.service.namedServices[id], worker) self.assertEqual(worker.config, { 'redis_manager': channel.config.redis, 'channel_id': channel.id, 'status_url': None, }) @inlineCallbacks def test_start_channel_status_application_status_url(self): properties = self.create_channel_properties(status_url='example.org') channel = yield self.create_channel( self.service, self.redis, properties=properties) worker = channel.status_application_worker self.assertEqual(worker.config['status_url'], 'example.org') @inlineCallbacks def test_channel_character_limit(self): '''`character_limit` parameter should return the character limit, or `None` if no character limit was specified''' properties_limit = self.create_channel_properties(character_limit=100) properties_no_limit = self.create_channel_properties() channel_limit = yield self.create_channel( self.service, self.redis, properties=properties_limit) channel_no_limit = yield self.create_channel( self.service, self.redis, properties=properties_no_limit) self.assertEqual(channel_limit.character_limit, 100) self.assertEqual(channel_no_limit.character_limit, None) @inlineCallbacks def test_create_channel_invalid_type(self): channel = yield self.create_channel( self.service, self.redis) channel._properties['type'] = 'foo' err = yield self.assertFailure(channel.start(None), InvalidChannelType) self.assertTrue(all( s in err.message for s in ('xmpp', 'telnet', 'foo'))) @inlineCallbacks def test_start_channel_plugins_called(self): '''Starting a channel should call `channel_started` on all plugins''' plugin = FakeJunebugPlugin() plugin.calls = [] channel = yield self.create_channel( self.service, self.redis, plugins=[plugin]) [(name, [plugin_channel])] = plugin.calls self.assertEqual(name, 'channel_started') self.assertEqual(plugin_channel, channel) @inlineCallbacks def test_stop_channel_plugins_called(self): '''Stopping a channel should call `channel_stopped` on all plugins''' plugin = FakeJunebugPlugin() plugin.calls = [] channel = yield self.create_channel( self.service, self.redis, plugins=[plugin]) plugin.calls = [] yield channel.stop() [(name, [plugin_channel])] = plugin.calls self.assertEqual(name, 'channel_stopped') self.assertEqual(plugin_channel, channel) @inlineCallbacks def test_update_channel_config(self): properties = self.create_channel_properties() channel = yield self.create_channel( self.service, self.redis) update = yield channel.update({'foo': 'bar'}) self.assertEqual(update, conjoin(properties, { 'foo': 'bar', 'status': self.generate_status(), 'id': channel.id, 'config': conjoin(properties['config'], { 'transport_name': channel.id }) })) @inlineCallbacks def test_update_channel_restart_transport_on_config_change(self): channel = yield self.create_channel( self.service, self.redis) worker1 = channel.transport_worker self.assertEqual(self.service.namedServices[channel.id], worker1) yield channel.update({'foo': 'bar'}) self.assertEqual(self.service.namedServices[channel.id], worker1) properties = self.create_channel_properties() properties['config']['foo'] = ['bar'] yield channel.update(properties) worker2 = channel.transport_worker self.assertEqual(self.service.namedServices[channel.id], worker2) self.assertTrue(worker1 not in self.service.services) @inlineCallbacks def test_update_channel_restart_application_on_config_change(self): channel = yield self.create_channel( self.service, self.redis) worker1 = channel.application_worker id = channel.application_id self.assertEqual(self.service.namedServices[id], worker1) yield channel.update({'foo': 'bar'}) self.assertEqual(self.service.namedServices[id], worker1) properties = self.create_channel_properties(mo_url='http://baz.org') yield channel.update(properties) worker2 = channel.application_worker self.assertEqual(self.service.namedServices[id], worker2) self.assertTrue(worker1 not in self.service.services) @inlineCallbacks def test_stop_channel(self): channel = yield self.create_channel( self.service, self.redis) self.assertEqual( self.service.namedServices[channel.id], channel.transport_worker) yield channel.stop() self.assertEqual(self.service.namedServices.get(channel.id), None) application_id = channel.application_id self.assertEqual(self.service.namedServices.get(application_id), None) status_application_id = channel.status_application_id self.assertEqual( self.service.namedServices.get(status_application_id), None) @inlineCallbacks def test_create_channel_from_id(self): channel1 = yield self.create_channel( self.service, self.redis) channel2 = yield self.create_channel_from_id( self.redis, {}, channel1.id, self.service) self.assertEqual((yield channel1.status()), (yield channel2.status())) self.assertEqual( channel1.transport_worker, channel2.transport_worker) self.assertEqual( channel1.application_worker, channel2.application_worker) self.assertEqual( channel1.status_application_worker, channel2.status_application_worker) @inlineCallbacks def test_create_channel_from_unknown_id(self): yield self.assertFailure( self.create_channel_from_id( self.redis, {}, 'unknown-id', self.service), ChannelNotFound) @inlineCallbacks def test_channel_status_empty(self): properties = self.create_channel_properties() channel = yield self.create_channel( self.service, self.redis, id='channel-id') self.assertEqual((yield channel.status()), conjoin(properties, { 'status': self.generate_status(), 'id': 'channel-id', 'config': conjoin(properties['config'], { 'transport_name': channel.id }) })) @inlineCallbacks def test_channel_status_single_status(self): channel = yield self.create_channel( self.service, self.redis, id='channel-id') status = TransportStatus( status='ok', component='foo', type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) self.assert_status((yield channel.status())['status'], components={ 'foo': api_from_status('channel-id', status), }, level='ok') @inlineCallbacks def test_channel_multiple_statuses_ok(self): channel = yield self.create_channel( self.service, self.redis, id='channel-id') components = {} for i in range(5): status = TransportStatus( status='ok', component=i, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components[str(i)] = api_from_status('channel-id', status) self.assert_status( (yield channel.status())['status'], level='ok', components=components) @inlineCallbacks def test_channel_multiple_statuses_degraded(self): channel = yield self.create_channel( self.service, self.redis, id='channel-id') components = {} for i in range(5): status = TransportStatus( status='ok', component=i, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components[str(i)] = api_from_status('channel-id', status) status = TransportStatus( status='degraded', component=5, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components['5'] = api_from_status('channel-id', status) self.assert_status( (yield channel.status())['status'], level='degraded', components=components) @inlineCallbacks def test_channel_multiple_statuses_down(self): channel = yield self.create_channel( self.service, self.redis, id='channel-id') components = {} for i in range(5): status = TransportStatus( status='ok', component=i, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components[str(i)] = api_from_status('channel-id', status) status = TransportStatus( status='degraded', component=5, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components['5'] = api_from_status('channel-id', status) status = TransportStatus( status='down', component=6, type='bar', message='Bar') yield channel.sstore.store_status('channel-id', status) components['6'] = api_from_status('channel-id', status) self.assert_status( (yield channel.status())['status'], level='down', components=components) @inlineCallbacks def test_get_all(self): channels = yield Channel.get_all(self.redis) self.assertEqual(channels, set()) channel1 = yield self.create_channel( self.service, self.redis) channels = yield Channel.get_all(self.redis) self.assertEqual(channels, set([channel1.id])) channel2 = yield self.create_channel( self.service, self.redis) channels = yield Channel.get_all(self.redis) self.assertEqual(channels, set([channel1.id, channel2.id])) @inlineCallbacks def test_start_all_channels(self): yield Channel.start_all_channels( self.redis, self.config, self.service) channel1 = yield self.create_channel( self.service, self.redis) self.assertTrue(channel1.id in self.service.namedServices) yield channel1.stop() self.assertFalse(channel1.id in self.service.namedServices) yield Channel.start_all_channels( self.redis, self.config, self.service) self.assertTrue(channel1.id in self.service.namedServices) channel2 = yield self.create_channel( self.service, self.redis) self.assertTrue(channel2.id in self.service.namedServices) yield channel2.stop() self.assertFalse(channel2.id in self.service.namedServices) yield Channel.start_all_channels( self.redis, self.config, self.service) self.assertTrue(channel1.id in self.service.namedServices) self.assertTrue(channel2.id in self.service.namedServices) @inlineCallbacks def test_convert_unicode(self): channel = yield self.create_channel( self.service, self.redis, id='channel-id') resp = channel._convert_unicode({ u'both': u'unicode', u'key': 'unicode', 'value': u'unicode', 'nested': { u'unicode': u'nested' }, }) for key, value in resp.iteritems(): self.assertTrue(isinstance(key, str)) if not isinstance(value, dict): self.assertTrue(isinstance(value, str)) for key, value in resp['nested'].iteritems(): self.assertTrue(isinstance(key, str)) self.assertTrue(isinstance(value, str)) self.assertTrue(isinstance(channel._convert_unicode(1), int)) @inlineCallbacks def test_send_message(self): '''The send_message function should place the message on the correct queue''' channel = yield self.create_channel( self.service, self.redis, id='channel-id') msg = yield channel.send_message( self.message_sender, self.outbounds, { 'from': '+1234', 'content': 'testcontent', }) self.assertEqual(msg['channel_id'], 'channel-id') self.assertEqual(msg['from'], '+1234') self.assertEqual(msg['content'], 'testcontent') [dispatched_message] = self.get_dispatched_messages( 'channel-id.outbound') self.assertEqual(msg['message_id'], dispatched_message['message_id']) @inlineCallbacks def test_send_message_event_url(self): '''Sending a message with a specified event url should store the event url for sending events in the future''' channel = yield self.create_channel( self.service, self.redis, id='channel-id') msg = yield channel.send_message( self.message_sender, self.outbounds, { 'from': '+1234', 'content': 'testcontent', 'event_url': 'http://test.org' }) event_url = yield self.outbounds.load_event_url( 'channel-id', msg['message_id']) self.assertEqual(event_url, 'http://test.org') @inlineCallbacks def test_send_reply_message(self): '''send_reply_message should place the correct reply message on the correct queue''' channel = yield self.create_channel( self.service, self.redis, id='channel-id') in_msg = TransportUserMessage( from_addr='+2789', to_addr='+1234', transport_name='channel-id', transport_type='_', transport_metadata={'foo': 'bar'}) yield self.api.inbounds.store_vumi_message('channel-id', in_msg) msg = yield channel.send_reply_message( self.message_sender, self.outbounds, self.inbounds, { 'reply_to': in_msg['message_id'], 'content': 'testcontent', }) expected = in_msg.reply(content='testcontent') expected = conjoin(api_from_message(expected), { 'timestamp': msg['timestamp'], 'message_id': msg['message_id'] }) self.assertEqual(msg, expected) [dispatched] = self.get_dispatched_messages('channel-id.outbound') self.assertEqual(msg['message_id'], dispatched['message_id']) self.assertEqual(api_from_message(dispatched), expected) @inlineCallbacks def test_send_reply_message_inbound_not_found(self): '''send_reply_message should raise an error if the inbound message is not found''' channel = yield self.create_channel( self.service, self.redis, id='channel-id') self.assertFailure(channel.send_reply_message( self.message_sender, self.outbounds, self.inbounds, { 'reply_to': 'i-do-not-exist', 'content': 'testcontent', }), MessageNotFound) @inlineCallbacks def test_send_reply_message_event_url(self): '''Sending a message with a specified event url should store the event url for sending events in the future''' channel = yield self.create_channel( self.service, self.redis, id='channel-id') in_msg = TransportUserMessage( from_addr='+2789', to_addr='+1234', transport_name='channel-id', transport_type='_', transport_metadata={'foo': 'bar'}) yield self.api.inbounds.store_vumi_message('channel-id', in_msg) msg = yield channel.send_reply_message( self.message_sender, self.outbounds, self.inbounds, { 'reply_to': in_msg['message_id'], 'content': 'testcontent', 'event_url': 'http://test.org', }) event_url = yield self.outbounds.load_event_url( 'channel-id', msg['message_id']) self.assertEqual(event_url, 'http://test.org') @inlineCallbacks def test_channel_status_inbound_message_rates(self): '''When inbound messages are being receive, it should affect the inbound message rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'inbound', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], inbound_message_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_outbound_message_rates(self): '''When outbound messages are being sent, it should affect the outbound message rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'outbound', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], outbound_message_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_submitted_event_rate(self): '''When submitted events are being received, it should affect the submitted event rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'submitted', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], submitted_event_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_rejected_event_rate(self): '''When rejected events are being received, it should affect the rejected event rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'rejected', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], rejected_event_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_delivery_succeeded_rate(self): '''When delivery_succeeded events are being received, it should affect the delivery succeeded event rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'delivery_succeeded', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], delivery_succeeded_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_delivery_failed_rate(self): '''When delivery_failed events are being received, it should affect the delivery failed event rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'delivery_failed', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], delivery_failed_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_channel_status_delivery_pending_rate(self): '''When delivery_pending events are being received, it should affect the delivery pending event rate reported by the status''' clock = self.patch_message_rate_clock() channel = yield self.create_channel( self.service, self.redis, id=u'channel-id') yield self.api.message_rate.increment( channel.id, 'delivery_pending', channel.config.metric_window) clock.advance(channel.config.metric_window) self.assert_status( (yield channel.status())['status'], delivery_pending_rate=1.0/channel.config.metric_window) @inlineCallbacks def test_get_logs_more_than_available(self): '''If the amount of available logs is less than what is requested, all the logs will be returned.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test message1') [log] = channel.get_logs(2) self.assert_log(log, { 'logger': channel.id, 'message': 'Test message1', 'level': logging.INFO}) @inlineCallbacks def test_get_logs_less_than_available(self): '''If the amount of available logs is more than what is requested, only the requested amount will be returned.''' channel = yield self.create_channel( self.service, self.redis, 'junebug.tests.helpers.LoggingTestTransport') worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test message1') channel.transport_worker.test_log('Test message2') channel.transport_worker.test_log('Test message3') [log1, log2] = channel.get_logs(2) self.assert_log(log1, { 'logger': channel.id, 'message': 'Test message3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test message2', 'level': logging.INFO}) @inlineCallbacks def test_get_logs_more_than_configured(self): '''If the amount of logs requested is more than the configured maximum, then only the configured maximum amount is returned.''' logpath = self.mktemp() config = yield self.create_channel_config( max_logs=2, channels={ 'logging': 'junebug.tests.helpers.LoggingTestTransport', }, logging_path=logpath ) properties = yield self.create_channel_properties(type='logging') channel = yield self.create_channel( self.service, self.redis, config=config, properties=properties) worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test message1') channel.transport_worker.test_log('Test message2') channel.transport_worker.test_log('Test message3') [log1, log2] = channel.get_logs(3) self.assert_log(log1, { 'logger': channel.id, 'message': 'Test message3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test message2', 'level': logging.INFO}) @inlineCallbacks def test_get_logs_n_is_none(self): '''If no value for n is supplied, then the configured maximum number of logs should be returned.''' logpath = self.mktemp() config = yield self.create_channel_config( max_logs=2, channels={ 'logging': 'junebug.tests.helpers.LoggingTestTransport', }, logging_path=logpath ) properties = yield self.create_channel_properties(type='logging') channel = yield self.create_channel( self.service, self.redis, config=config, properties=properties) worker_logger = channel.transport_worker.getServiceNamed( 'Junebug Worker Logger') worker_logger.startService() channel.transport_worker.test_log('Test message1') channel.transport_worker.test_log('Test message2') channel.transport_worker.test_log('Test message3') [log1, log2] = channel.get_logs(None) self.assert_log(log1, { 'logger': channel.id, 'message': 'Test message3', 'level': logging.INFO}) self.assert_log(log2, { 'logger': channel.id, 'message': 'Test message2', 'level': logging.INFO}) PKùh…HûˆDˆDjunebug/tests/test_stores.pyfrom twisted.internet.defer import inlineCallbacks, returnValue from vumi.message import TransportEvent, TransportUserMessage, TransportStatus from junebug.stores import ( BaseStore, InboundMessageStore, OutboundMessageStore, StatusStore, MessageRateStore) from junebug.tests.helpers import JunebugTestBase class TestBaseStore(JunebugTestBase): @inlineCallbacks def create_store(self, ttl=60): redis = yield self.get_redis() store = BaseStore(redis, ttl) returnValue(store) @inlineCallbacks def test_store_all(self): '''Stores all the keys and values in a hash in redis, and sets the expiry time''' store = yield self.create_store() properties = { 'foo': 'bar', 'bar': 'foo', } yield store.store_all('testid', properties) props = yield self.redis.hgetall('testid') self.assertEqual(properties, props) ttl = yield self.redis.ttl('testid') self.assertEqual(ttl, 60) @inlineCallbacks def test_store_property(self): '''Saves a single property into redis, and sets the expiry time''' store = yield self.create_store() yield store.store_property('testid', 'foo', 'bar') self.assertEqual((yield self.redis.hget('testid', 'foo')), 'bar') self.assertEqual((yield self.redis.ttl('testid')), 60) @inlineCallbacks def test_load_all_empty(self): '''If no data exists in redis, properties should be an empty dict''' store = yield self.create_store() properties = yield store.load_all('testid') self.assertEqual(properties, {}) @inlineCallbacks def test_load_all(self): '''If data exists in redis, properties should contain that data''' store = yield self.create_store() properties = { 'foo': 'bar', 'bar': 'foo', } yield self.redis.hmset('testid', properties) props = yield store.load_all('testid') self.assertEqual(properties, props) self.assertEqual((yield self.redis.ttl('testid')), 60) @inlineCallbacks def test_load_property(self): '''Loads a single property from redis''' store = yield self.create_store() yield self.redis.hset('testid', 'foo', 'bar') val = yield store.load_property('testid', 'foo') self.assertEqual(val, 'bar') self.assertEqual((yield self.redis.ttl('testid')), 60) @inlineCallbacks def test_load_property_empty(self): '''Loads None if property doesn't exist in redis''' store = yield self.create_store() val = yield store.load_property('testid', 'foo') self.assertEqual(val, None) @inlineCallbacks def test_override_ttl(self): '''If a ttl for an action is specified, it should override the default ttl''' store = yield self.create_store(ttl=1) yield store.store_all('testid1', {'foo': 'bar'}, ttl=2) self.assertEqual((yield self.redis.ttl('testid1')), 2) yield store.load_all('testid1', ttl=3) self.assertEqual((yield self.redis.ttl('testid1')), 3) yield store.store_property('testid2', 'foo', 'bar', ttl=2) self.assertEqual((yield self.redis.ttl('testid2')), 2) yield store.load_property('testid2', 'foo', ttl=3) self.assertEqual((yield self.redis.ttl('testid2')), 3) yield store.increment_id('testid3', ttl=2) self.assertEqual((yield self.redis.ttl('testid3')), 2) yield store.get_id('testid3', ttl=3) self.assertEqual((yield self.redis.ttl('testid3')), 3) @inlineCallbacks def test_none_ttl(self): '''If the ttl for an action is specified as None, no ttl should be set.''' store = yield self.create_store() yield store.store_all('testid1', {'foo': 'bar'}, ttl=None) self.assertEqual((yield self.redis.ttl('testid1')), None) yield store.load_all('testid1', ttl=None) self.assertEqual((yield self.redis.ttl('testid1')), None) yield store.store_property('testid2', 'foo', 'bar', ttl=None) self.assertEqual((yield self.redis.ttl('testid2')), None) yield store.load_property('testid2', 'foo', ttl=None) self.assertEqual((yield self.redis.ttl('testid2')), None) yield store.increment_id('testid3', ttl=None) self.assertEqual((yield self.redis.ttl('testid3')), None) yield store.get_id('testid3', ttl=None) self.assertEqual((yield self.redis.ttl('testid3')), None) class TestInboundMessageStore(JunebugTestBase): @inlineCallbacks def create_store(self, ttl=60): redis = yield self.get_redis() store = InboundMessageStore(redis, ttl) returnValue(store) @inlineCallbacks def test_store_vumi_message(self): '''Stores the vumi message.''' store = yield self.create_store() vumi_msg = TransportUserMessage.send(to_addr='+213', content='foo') yield store.store_vumi_message('channel_id', vumi_msg) msg = yield self.redis.hget( 'channel_id:inbound_messages:%s' % vumi_msg.get('message_id'), 'message') self.assertEqual(vumi_msg, TransportUserMessage.from_json(msg)) @inlineCallbacks def test_load_vumi_message(self): '''Returns a vumi message from the stored json''' store = yield self.create_store() vumi_msg = TransportUserMessage.send(to_addr='+213', content='foo') yield self.redis.hset( 'channel_id:inbound_messages:%s' % vumi_msg.get('message_id'), 'message', vumi_msg.to_json()) message = yield store.load_vumi_message( 'channel_id', vumi_msg.get('message_id')) self.assertEqual(message, vumi_msg) @inlineCallbacks def test_load_vumi_message_not_exist(self): '''`None` should be returned if the message cannot be found''' store = yield self.create_store() self.assertEqual((yield store.load_vumi_message( 'bad-channel', 'bad-id')), None) class TestOutboundMessageStore(JunebugTestBase): @inlineCallbacks def create_store(self, ttl=60): redis = yield self.get_redis() store = OutboundMessageStore(redis, ttl) returnValue(store) @inlineCallbacks def test_store_event_url(self): '''Stores the event URL under the message ID''' store = yield self.create_store() yield store.store_event_url( 'channel_id', 'messageid', 'http://test.org') event_url = yield self.redis.hget( 'channel_id:outbound_messages:messageid', 'event_url') self.assertEqual(event_url, 'http://test.org') @inlineCallbacks def test_load_event_url(self): '''Returns a vumi message from the stored json''' store = yield self.create_store() vumi_msg = TransportUserMessage.send(to_addr='+213', content='foo') yield self.redis.hset( 'channel_id:outbound_messages:%s' % vumi_msg.get('message_id'), 'event_url', 'http://test.org') event_url = yield store.load_event_url( 'channel_id', vumi_msg.get('message_id')) self.assertEqual(event_url, 'http://test.org') @inlineCallbacks def test_load_event_url_not_exist(self): '''`None` should be returned if the message cannot be found''' store = yield self.create_store() self.assertEqual((yield store.load_event_url( 'bad-channel', 'bad-id')), None) @inlineCallbacks def test_store_event(self): '''Stores the event under the message ID''' store = yield self.create_store() event = TransportEvent( user_message_id='message_id', sent_message_id='message_id', event_type='ack') yield store.store_event('channel_id', 'message_id', event) event_json = yield self.redis.hget( 'channel_id:outbound_messages:message_id', event['event_id']) self.assertEqual(event_json, event.to_json()) @inlineCallbacks def test_load_event(self): store = yield self.create_store() event = TransportEvent( user_message_id='message_id', sent_message_id='message_id', event_type='nack', nack_reason='error error') yield self.redis.hset( 'channel_id:outbound_messages:message_id', event['event_id'], event.to_json()) stored_event = yield store.load_event( 'channel_id', 'message_id', event['event_id']) self.assertEqual(stored_event, event) @inlineCallbacks def test_load_event_not_exist(self): '''`None` should be returned if the event doesn't exist''' store = yield self.create_store() stored_event = yield store.load_event( 'channel_id', 'message_id', 'bad_event_id') self.assertEqual(stored_event, None) @inlineCallbacks def test_load_all_events_none(self): '''Returns an empty list''' store = yield self.create_store() events = yield store.load_all_events('channel_id', 'message_id') self.assertEqual(events, []) @inlineCallbacks def test_load_all_events_one(self): '''Returns a list with one event inside''' store = yield self.create_store() event = TransportEvent( user_message_id='message_id', sent_message_id='message_id', event_type='delivery_report', delivery_status='pending') yield self.redis.hset( 'channel_id:outbound_messages:message_id', event['event_id'], event.to_json()) events = yield store.load_all_events('channel_id', 'message_id') self.assertEqual(events, [event]) @inlineCallbacks def test_load_all_events_multiple(self): '''Returns a list of all the stored events''' store = yield self.create_store() events = [] for i in range(5): event = TransportEvent( user_message_id='message_id', sent_message_id='message_id', event_type='delivery_report', delivery_status='pending') events.append(event) yield self.redis.hset( 'channel_id:outbound_messages:message_id', event['event_id'], event.to_json()) stored_events = yield store.load_all_events('channel_id', 'message_id') self.assertEqual( sorted(events, key=lambda e: e['event_id']), sorted(stored_events, key=lambda e: e['event_id'])) @inlineCallbacks def test_load_all_events_with_other_stored_fields(self): '''Should return just the stored events''' store = yield self.create_store() event = TransportEvent( user_message_id='message_id', sent_message_id='message_id', event_type='delivery_report', delivery_status='pending') yield self.redis.hset( 'channel_id:outbound_messages:message_id', event['event_id'], event.to_json()) yield self.redis.hset( 'channel_id:outbound_messages:message_id', 'event_url', 'test_url') stored_events = yield store.load_all_events('channel_id', 'message_id') self.assertEqual(stored_events, [event]) class TestStatusStore(JunebugTestBase): @inlineCallbacks def create_store(self): redis = yield self.get_redis() store = StatusStore(redis, ttl=None) returnValue(store) @inlineCallbacks def test_store_single_status(self): '''The single status is stored under the correct key''' store = yield self.create_store() status = TransportStatus( status='ok', component='foo', type='bar', message='foo') yield store.store_status('channelid', status) status_redis = yield self.redis.hget('channelid:status', 'foo') self.assertEqual(status_redis, status.to_json()) self.assertEqual((yield self.redis.ttl('channelid:status')), None) @inlineCallbacks def test_store_status_overwrite(self): '''New statuses override old statuses with the same component, but do not affect statuses of different components''' store = yield self.create_store() status_old = TransportStatus( status='ok', component='foo', type='bar', message='foo') status_new = TransportStatus( status='down', component='foo', type='bar', message='foo') status_other = TransportStatus( status='ok', component='bar', type='bar', message='foo') yield store.store_status('channelid', status_other) yield store.store_status('channelid', status_old) yield store.store_status('channelid', status_new) status_new_redis = yield self.redis.hget('channelid:status', 'foo') self.assertEqual(status_new_redis, status_new.to_json()) status_other_redis = yield self.redis.hget('channelid:status', 'bar') self.assertEqual(status_other_redis, status_other.to_json()) @inlineCallbacks def test_load_one_status(self): store = yield self.create_store() status = TransportStatus( status='ok', component='foo', type='bar', message='foo') yield store.store_status('channelid', status) stored_statuses = yield store.get_statuses('channelid') self.assertEqual( stored_statuses, {'foo': status}) @inlineCallbacks def test_load_many_statuses(self): store = yield self.create_store() expected = {} for i in range(5): status = TransportStatus( status='ok', component=i, type='bar', message='foo') yield store.store_status('channelid', status) expected[str(i)] = status stored_statuses = yield store.get_statuses('channelid') self.assertEqual(stored_statuses, expected) class TestMessageRateStore(JunebugTestBase): @inlineCallbacks def create_store(self, **kw): ''' Creates and returns a new message rate store. ''' redis = yield self.get_redis() returnValue(MessageRateStore(redis, **kw)) @inlineCallbacks def test_get_rate_no_messages(self): '''If no messages have been sent, the message rate should be 0.''' clock = self.patch_message_rate_clock() store = yield self.create_store() clock.advance(10) rate = yield store.get_messages_per_second('channelid', 'inbound', 10) self.assertEqual(rate, 0) @inlineCallbacks def test_get_rate_single_message(self): '''If there is a single message in the last time bucket, the message rate should be 1/bucket_length''' clock = self.patch_message_rate_clock() store = yield self.create_store() yield store.increment('channelid', 'inbound', 10) clock.advance(10) rate = yield store.get_messages_per_second('channelid', 'inbound', 10) self.assertEqual(rate, 1.0 / 10.0) @inlineCallbacks def test_get_rate_multiple_messages(self): '''If there are n messages in the last time bucket, the message rate should be n/bucket_length''' clock = self.patch_message_rate_clock() store = yield self.create_store() N = 15 for i in range(N): yield store.increment('channelid', 'inbound', 10) clock.advance(10) rate = yield store.get_messages_per_second('channelid', 'inbound', 10) self.assertEqual(rate, N / 10.0) @inlineCallbacks def test_get_rate_different_buckets(self): '''If there are n messages in the last time bucket, the message rate should be n/bucket_length, independant of the amount of messages in the current bucket.''' clock = self.patch_message_rate_clock() store = yield self.create_store() N = 15 M = 6 for i in range(N): yield store.increment('channelid', 'inbound', 10) clock.advance(10) for i in range(M): yield store.increment('channelid', 'inbound', 10) rate = yield store.get_messages_per_second('channelid', 'inbound', 10) self.assertEqual(rate, N / 10.0) @inlineCallbacks def test_old_redis_keys_are_expired(self): '''Redis keys that are no longer required should be expired.''' clock = self.patch_message_rate_clock() store = yield self.create_store() self.redis._client.clock = clock yield store.increment('channelid', 'inbound', 1.2) bucket0 = store.get_key('channelid', 'inbound', int(clock.seconds())) self.assertEqual((yield self.redis.get(bucket0)), '1') clock.advance(1.2) yield store.increment('channelid', 'inbound', 1.2) bucket1 = store.get_key('channelid', 'inbound', int(clock.seconds())) self.assertEqual((yield self.redis.get(bucket0)), '1') self.assertEqual((yield self.redis.get(bucket1)), '1') # We need to advance the clock here by 1.8, as the expiry time is an # int, and is rounded up. clock.advance(1.8) yield store.increment('channelid', 'inbound', 1.2) bucket2 = store.get_key('channelid', 'inbound', int(clock.seconds())) self.assertEqual((yield self.redis.get(bucket0)), None) self.assertEqual((yield self.redis.get(bucket1)), '1') self.assertEqual((yield self.redis.get(bucket2)), '1') PKwPGºÜщÇÇjunebug/tests/test_service.pyfrom twisted.internet.defer import inlineCallbacks from twisted.trial.unittest import TestCase from junebug import JunebugApi from junebug.service import JunebugService from junebug.config import JunebugConfig class TestJunebugService(TestCase): def setUp(self): self.old_setup = JunebugApi.setup self.old_teardown = JunebugApi.teardown def do_nothing(self): pass JunebugApi.setup = do_nothing JunebugApi.teardown = do_nothing def tearDown(self): JunebugApi.setup = self.old_setup JunebugApi.teardown = self.old_teardown @inlineCallbacks def test_start_service(self): service = JunebugService(JunebugConfig({ 'host': '127.0.0.1', 'port': 0, })) yield service.startService() server = service._port self.assertTrue(server.connected) yield service.stopService() self.assertFalse(server.connected) PKY]ŒH¾EdHâ/â/junebug/tests/helpers.pyimport json from copy import deepcopy import logging import logging.handlers from twisted.python.logfile import LogFile from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue, succeed from twisted.internet.task import Clock from twisted.trial.unittest import TestCase from twisted.web.server import Site from klein import Klein from txamqp.client import TwistedDelegate from vumi.utils import vumi_resource_path from vumi.service import get_spec from vumi.tests.fake_amqp import FakeAMQPBroker, FakeAMQPChannel from vumi.tests.helpers import PersistenceHelper from vumi.transports import Transport import junebug from junebug import JunebugApi from junebug.amqp import JunebugAMQClient, MessageSender from junebug.channel import Channel from junebug.plugin import JunebugPlugin from junebug.service import JunebugService from junebug.config import JunebugConfig from junebug.stores import MessageRateStore class DummyLogFile(object): '''LogFile that has a different path to its worker_id''' def __init__( self, worker_id, path, rotateLength, maxRotatedFiles): self.worker_id = worker_id self.path = path self.rotateLength = rotateLength self.maxRotatedFiles = maxRotatedFiles self.closed_count = 0 self.logfile = LogFile.fromFullPath( path, rotateLength=rotateLength, maxRotatedFiles=maxRotatedFiles) @property def logs(self): reader = self.logfile.getCurrentLog() logs = [] lines = reader.readLines() while lines: logs.extend(lines) lines = reader.readLines() return logs def write(self, data): self.logfile.write(data) self.logfile.flush() def close(self): self.closed_count += 1 def listLogs(self): return [] class FakeAmqpClient(JunebugAMQClient): '''Amqp client, base upon the real JunebugAMQClient, that uses a FakeAMQPBroker instead of a real broker''' def __init__(self, spec): super(FakeAmqpClient, self).__init__(TwistedDelegate(), '', spec) self.broker = FakeAMQPBroker() @inlineCallbacks def channel(self, id): yield self.channelLock.acquire() try: try: ch = self.channels[id] except KeyError: ch = FakeAMQPChannel(id, self) self.channels[id] = ch finally: self.channelLock.release() returnValue(ch) class RequestLoggingApi(object): app = Klein() def __init__(self): self.requests = [] self.url = None def setup(self): self.port = reactor.listenTCP( 0, Site(self.app.resource()), interface='127.0.0.1') addr = self.port.getHost() self.url = "http://%s:%s" % (addr.host, addr.port) def teardown(self): self.port.stopListening() @app.route('/') def log_request(self, request): self.requests.append({ 'request': request, 'body': request.content.read(), }) return '' @app.route('/bad/') def bad_request(self, request): self.requests.append({ 'request': request, 'body': request.content.read(), }) request.setResponseCode(500) return 'test-error-response' class LoggingTestTransport(Transport): def test_log(self, message='Test log'): self.log.msg(message, source=self) class JunebugTestBase(TestCase): '''Base test case that all junebug tests inherit from. Contains useful helper functions''' default_channel_properties = { 'type': 'telnet', 'config': { 'twisted_endpoint': 'tcp:0', }, 'mo_url': 'http://foo.bar', } default_channel_config = { 'ttl': 60, 'amqp': {}, } def patch_logger(self): ''' Patches the logger with an in-memory logger, which is acccessable at "self.logging_handler".''' self.logging_handler = logging.handlers.MemoryHandler(100) logging.getLogger().addHandler(self.logging_handler) self.addCleanup(self._cleanup_logging_patch) def patch_message_rate_clock(self): '''Patches the message rate clock, and returns the clock''' clock = Clock() self.patch(MessageRateStore, 'get_seconds', lambda _: clock.seconds()) return clock def _cleanup_logging_patch(self): self.logging_handler.close() logging.getLogger().removeHandler(self.logging_handler) def create_channel_properties(self, **kw): properties = deepcopy(self.default_channel_properties) config = kw.pop('config', {}) properties['config'].update(config) properties.update(kw) return properties @inlineCallbacks def create_channel_config(self, **kw): self.persistencehelper = PersistenceHelper() yield self.persistencehelper.setup() self.addCleanup(self.persistencehelper.cleanup) config = deepcopy(self.default_channel_config) config.update(kw) channel_config = self.persistencehelper.mk_config(config) channel_config['redis'] = channel_config['redis_manager'] returnValue(JunebugConfig(channel_config)) @inlineCallbacks def create_channel( self, service, redis, transport_class=None, properties=default_channel_properties, id=None, config=None, plugins=[]): '''Creates and starts, and saves a channel, with a TelnetServerTransport transport''' self.patch(junebug.logging_service, 'LogFile', DummyLogFile) if transport_class is None: transport_class = 'vumi.transports.telnet.TelnetServerTransport' properties = deepcopy(properties) logpath = self.mktemp() if config is None: config = yield self.create_channel_config( channels={ properties['type']: transport_class }, logging_path=logpath) channel = Channel( redis, config, properties, id=id, plugins=plugins) yield channel.start(self.service) properties['config']['transport_name'] = channel.id yield channel.save() self.addCleanup(channel.stop) returnValue(channel) @inlineCallbacks def create_channel_from_id(self, redis, config, id, service): '''Creates an existing channel given the channel id''' config = yield self.create_channel_config(**config) channel = yield Channel.from_id(redis, config, id, service) returnValue(channel) @inlineCallbacks def get_redis(self): '''Creates and returns a redis manager''' if hasattr(self, 'redis'): returnValue(self.redis) persistencehelper = PersistenceHelper() yield persistencehelper.setup() self.redis = yield persistencehelper.get_redis_manager() self.addCleanup(persistencehelper.cleanup) returnValue(self.redis) @inlineCallbacks def start_server(self, config=None): '''Starts a junebug server. Stores the service to "self.service", and the url at "self.url"''' # TODO: This setup is very manual, because we don't call # service.startService. This must be fixed to close mirror the real # program with our tests. if config is None: config = yield self.create_channel_config() self.service = JunebugService(config) self.api = JunebugApi( self.service, config) self.service.api = self.api redis = yield self.get_redis() yield self.api.setup(redis, self.get_message_sender()) self.config = self.api.config self.redis = self.api.redis self.inbounds = self.api.inbounds self.outbounds = self.api.outbounds self.message_sender = self.api.message_sender port = reactor.listenTCP( 0, Site(self.api.app.resource()), interface='127.0.0.1') self.service._port = port self.addCleanup(self.stop_server) addr = port.getHost() self.url = "http://%s:%s" % (addr.host, addr.port) @inlineCallbacks def stop_server(self): # TODO: This teardown is very messy, because we don't actually call # service.startService. This needs to be fixed in order to ensure that # our tests are mirroring the real program closely. yield self.service.stopService() for service in self.service: service.disownServiceParent() for service in self.service.namedServices.values(): service.disownServiceParent() def get_message_sender(self): '''Creates a new MessageSender object, with a fake amqp client''' message_sender = MessageSender('amqp-spec-0-8.xml', None) spec = get_spec(vumi_resource_path('amqp-spec-0-8.xml')) client = FakeAmqpClient(spec) message_sender.client = client return message_sender def get_dispatched_messages(self, queue): '''Gets all messages that have been dispatched to the amqp broker. Should only be called after start_server, as it looks in the api for the amqp client''' amqp_client = self.api.message_sender.client return amqp_client.broker.get_messages( 'vumi', queue) def assert_was_logged(self, msg): self.assertTrue(any( msg in log.getMessage() for log in self.logging_handler.buffer)) def assert_request(self, req, method=None, body=None, headers=None): if method is not None: self.assertEqual(req['request'].method, 'POST') if headers is not None: for name, values in headers.iteritems(): self.assertEqual( req['request'].requestHeaders.getRawHeaders(name), values) if body is not None: self.assertEqual(json.loads(req['body']), body) def assert_body_contains(self, req, **fields): body = json.loads(req['body']) self.assertEqual( dict((k, v) for k, v in body.iteritems() if k in fields), fields) def assert_log(self, log, expected): '''Assert that a log matches what is expected.''' timestamp = log.pop('timestamp') self.assertTrue(isinstance(timestamp, float)) self.assertEqual(log, expected) def generate_status( self, level=None, components={}, inbound_message_rate=0, outbound_message_rate=0, submitted_event_rate=0, rejected_event_rate=0, delivery_succeeded_rate=0, delivery_failed_rate=0, delivery_pending_rate=0): '''Generates a status that the http API would respond with, given the same parameters''' return { 'status': level, 'components': components, 'inbound_message_rate': inbound_message_rate, 'outbound_message_rate': outbound_message_rate, 'submitted_event_rate': submitted_event_rate, 'rejected_event_rate': rejected_event_rate, 'delivery_succeeded_rate': delivery_succeeded_rate, 'delivery_failed_rate': delivery_failed_rate, 'delivery_pending_rate': delivery_pending_rate, } def assert_status(self, status, **kwargs): '''Assert that the current channel status is correct''' self.assertEqual(status, self.generate_status(**kwargs)) class FakeJunebugPlugin(JunebugPlugin): def _add_call(self, func_name, *args): self.calls.append((func_name, args)) def start_plugin(self, config, junebug_config): self.calls = [] self._add_call('start_plugin', config, junebug_config) return succeed(None) def stop_plugin(self): self._add_call('stop_plugin') return succeed(None) def channel_started(self, channel): self._add_call('channel_started', channel) return succeed(None) def channel_stopped(self, channel): self._add_call('channel_stopped', channel) return succeed(None) PKwPGjunebug/tests/__init__.pyPKùh…HÄ0í„RR"junebug/tests/test_command_line.pyimport json import logging import os.path from twisted.internet.defer import inlineCallbacks import junebug from junebug import JunebugApi from junebug.command_line import parse_arguments, logging_setup, start_server from junebug.tests.helpers import JunebugTestBase from junebug.config import JunebugConfig class TestCommandLine(JunebugTestBase): def setUp(self): self.old_setup = JunebugApi.setup self.old_teardown = JunebugApi.teardown def do_nothing(self): pass JunebugApi.setup = do_nothing JunebugApi.teardown = do_nothing def tearDown(self): JunebugApi.setup = self.old_setup JunebugApi.teardown = self.old_teardown def patch_yaml_load(self, mappings): self.patch(junebug.command_line, 'load_config', mappings.get) def test_load_config(self): '''Given a filename with the file containing yaml content, the load_config function should load the yaml file.''' filename = self.mktemp() with open(filename, 'w') as f: f.write(''' foo: bar ''') config = junebug.command_line.load_config(filename) self.assertEqual(config, {'foo': 'bar'}) def test_load_config_none(self): '''If the filename is None, return an empty object''' config = junebug.command_line.load_config(None) self.assertEqual(config, {}) def test_parse_arguments_interface(self): '''The interface command line argument can be specified by "--interface" and "-i" and has a default value of "localhost"''' config = parse_arguments([]) self.assertEqual(config.interface, 'localhost') config = parse_arguments(['--interface', 'foobar']) self.assertEqual(config.interface, 'foobar') config = parse_arguments(['-i', 'foobar']) self.assertEqual(config.interface, 'foobar') def test_parse_arguments_port(self): '''The port command line argument can be specified by "--port" or "-p" and has a default value of 8080''' config = parse_arguments([]) self.assertEqual(config.port, 8080) config = parse_arguments(['--port', '80']) self.assertEqual(config.port, 80) config = parse_arguments(['-p', '80']) self.assertEqual(config.port, 80) def test_parse_arguments_log_file(self): '''The log file command line argument can be specified by "--log-file" or "-l" and has a default value of None''' config = parse_arguments([]) self.assertEqual(config.logfile, None) config = parse_arguments(['--log-file', 'foo.bar']) self.assertEqual(config.logfile, 'foo.bar') config = parse_arguments(['-l', 'foo.bar']) self.assertEqual(config.logfile, 'foo.bar') def test_parse_arguments_redis_host(self): '''The redis host command line argument can be specified by "--redis-host" or "-redish" and has a default value of "localhost"''' config = parse_arguments([]) self.assertEqual(config.redis['host'], 'localhost') config = parse_arguments(['--redis-host', 'foo.bar']) self.assertEqual(config.redis['host'], 'foo.bar') config = parse_arguments(['-redish', 'foo.bar']) self.assertEqual(config.redis['host'], 'foo.bar') def test_parse_arguments_redis_config_port(self): '''The redis port command line argument can be specified by "--redis-port" or "-redisp" and has a default value of 6379''' config = parse_arguments([]) self.assertEqual(config.redis['port'], 6379) config = parse_arguments(['--redis-port', '80']) self.assertEqual(config.redis['port'], 80) config = parse_arguments(['-redisp', '80']) self.assertEqual(config.redis['port'], 80) def test_parse_arguments_redis_database(self): '''The redis database command line argument can be specified by "--redis-db" or "-redisdb" and has a default value of 0''' config = parse_arguments([]) self.assertEqual(config.redis['db'], 0) config = parse_arguments(['--redis-db', '80']) self.assertEqual(config.redis['db'], 80) config = parse_arguments(['-redisdb', '80']) self.assertEqual(config.redis['db'], 80) def test_parse_arguments_redis_password(self): '''The redis password command line argument can be specified by "--redis-password" or "-redispass" and has a default value of None''' config = parse_arguments([]) self.assertEqual(config.redis['password'], None) config = parse_arguments(['--redis-password', 'foo.bar']) self.assertEqual(config.redis['password'], 'foo.bar') config = parse_arguments(['-redispass', 'foo.bar']) self.assertEqual(config.redis['password'], 'foo.bar') def test_parse_arguments_amqp_host(self): '''The amqp host command line argument can be specified by "--amqp-host" or "-amqph" and has a default value of "127.0.0.1"''' config = parse_arguments([]) self.assertEqual(config.amqp['hostname'], '127.0.0.1') config = parse_arguments(['--amqp-host', 'foo.bar']) self.assertEqual(config.amqp['hostname'], 'foo.bar') config = parse_arguments(['-amqph', 'foo.bar']) self.assertEqual(config.amqp['hostname'], 'foo.bar') def test_parse_arguments_amqp_port(self): '''The amqp port command line argument can be specified by "--amqp-port" or "-amqpp" and has a default value of 5672''' config = parse_arguments([]) self.assertEqual(config.amqp['port'], 5672) config = parse_arguments(['--amqp-port', '80']) self.assertEqual(config.amqp['port'], 80) config = parse_arguments(['-amqpp', '80']) self.assertEqual(config.amqp['port'], 80) def test_parse_arguments_amqp_username(self): '''The amqp username command line argument can be specified by "--amqp-user" or "-amqpu" and has a default value of "guest"''' config = parse_arguments([]) self.assertEqual(config.amqp['username'], 'guest') config = parse_arguments(['--amqp-user', 'test']) self.assertEqual(config.amqp['username'], 'test') config = parse_arguments(['-amqpu', 'test']) self.assertEqual(config.amqp['username'], 'test') def test_parse_arguments_amqp_password(self): '''The amqp password command line argument can be specified by "--amqp-password" or "-amqppass" and has a default value of "guest"''' config = parse_arguments([]) self.assertEqual(config.amqp['password'], 'guest') config = parse_arguments(['--amqp-password', 'foo.bar']) self.assertEqual(config.amqp['password'], 'foo.bar') config = parse_arguments(['-amqppass', 'foo.bar']) self.assertEqual(config.amqp['password'], 'foo.bar') def test_parse_arguments_amqp_vhost(self): '''The amqp vhost command line argument can be specified by "--amqp-vhost" or "-amqpv" and has a default value of "/"''' config = parse_arguments([]) self.assertEqual(config.amqp['vhost'], '/') config = parse_arguments(['--amqp-vhost', 'foo.bar']) self.assertEqual(config.amqp['vhost'], 'foo.bar') config = parse_arguments(['-amqpv', 'foo.bar']) self.assertEqual(config.amqp['vhost'], 'foo.bar') def test_parse_arguments_inbound_ttl(self): '''The inbound ttl command line argument can be specified by "--inbound-message-ttl" or "-ittl" and has a default value of 10 minutes''' config = parse_arguments([]) self.assertEqual(config.inbound_message_ttl, 60 * 10) config = parse_arguments(['--inbound-message-ttl', '80']) self.assertEqual(config.inbound_message_ttl, 80) config = parse_arguments(['-ittl', '80']) self.assertEqual(config.inbound_message_ttl, 80) def test_parse_arguments_outbound_ttl(self): '''The outbound ttl command line argument can be specified by "--outbound-message-ttl" or "-ottl" and has a default value of 2 days ''' config = parse_arguments([]) self.assertEqual(config.outbound_message_ttl, 60*60*24*2) config = parse_arguments(['--outbound-message-ttl', '90']) self.assertEqual(config.outbound_message_ttl, 90) config = parse_arguments(['-ottl', '90']) self.assertEqual(config.outbound_message_ttl, 90) def test_parse_arguments_channels(self): '''Each channel mapping be specified by "--channels" or "-ch"''' config = parse_arguments([]) self.assertEqual(config.channels, {}) config = parse_arguments(['--channels', 'foo:bar']) self.assertEqual(config.channels, {'foo': 'bar'}) config = parse_arguments([ '--channels', 'foo:bar', '--channels', 'bar:foo']) self.assertEqual(config.channels, {'foo': 'bar', 'bar': 'foo'}) config = parse_arguments(['-ch', 'foo:bar']) self.assertEqual(config.channels, {'foo': 'bar'}) config = parse_arguments(['-ch', 'foo:bar', '-ch', 'bar:foo']) self.assertEqual(config.channels, {'foo': 'bar', 'bar': 'foo'}) def test_parse_arguments_replace_channels(self): '''The replace channels command line argument can be specified by "--replace-channels" or "-rch" and has a default value of False ''' config = parse_arguments([]) self.assertEqual(config.replace_channels, False) config = parse_arguments(['--replace-channels', 'true']) self.assertEqual(config.replace_channels, True) config = parse_arguments(['-rch', 'true']) self.assertEqual(config.replace_channels, True) def test_parse_arguments_plugins(self): '''Each plugin config is specified by "--plugin" or "-pl"''' config = parse_arguments([]) self.assertEqual(config.plugins, []) config = parse_arguments(['--plugin', json.dumps({'type': 'foo.bar'})]) self.assertEqual(config.plugins, [{'type': 'foo.bar'}]) config = parse_arguments([ '--plugin', json.dumps({'type': 'foo.bar'}), '--plugin', json.dumps({'type': 'bar.foo'})]) self.assertEqual(sorted(config.plugins), [ {'type': 'bar.foo'}, {'type': 'foo.bar'}]) config = parse_arguments(['-pl', json.dumps({'type': 'foo.bar'})]) self.assertEqual(config.plugins, [{'type': 'foo.bar'}]) config = parse_arguments([ '-pl', json.dumps({'type': 'foo.bar'}), '-pl', json.dumps({'type': 'bar.foo'})]) self.assertEqual(sorted(config.plugins), [ {'type': 'bar.foo'}, {'type': 'foo.bar'}]) def test_parse_arguments_metric_window(self): '''The metric window can be specified by "--metric-window" or "-mw"''' config = parse_arguments([]) self.assertEqual(config.metric_window, 10.0) config = parse_arguments(['--metric-window', '2.0']) self.assertEqual(config.metric_window, 2.0) config = parse_arguments(['-mw', '2.0']) self.assertEqual(config.metric_window, 2.0) def test_parse_arguments_logging_path(self): '''The logging path can be specified by "--logging-path" or "-lp"''' config = parse_arguments([]) self.assertEqual(config.logging_path, 'logs/') config = parse_arguments(['--logging-path', 'other-logs/']) self.assertEqual(config.logging_path, 'other-logs/') config = parse_arguments(['-lp', 'other-logs/']) self.assertEqual(config.logging_path, 'other-logs/') def test_parse_arguments_log_rotate_size(self): '''The log rotate size can be specified by "--log-rotate-size" or "-lrs"''' config = parse_arguments([]) self.assertEqual(config.log_rotate_size, 1000000) config = parse_arguments(['--log-rotate-size', '7']) self.assertEqual(config.log_rotate_size, 7) config = parse_arguments(['-lrs', '7']) self.assertEqual(config.log_rotate_size, 7) def test_parse_arguments_max_log_files(self): '''The max log files can be specified by "--max-log-files" or "-mlf"''' config = parse_arguments([]) self.assertEqual(config.max_log_files, None) config = parse_arguments(['--max-log-files', '2']) self.assertEqual(config.max_log_files, 2) config = parse_arguments(['-mlf', '2']) self.assertEqual(config.max_log_files, 2) config = parse_arguments(['--max-log-files', '0']) self.assertEqual(config.max_log_files, None) config = parse_arguments(['-mlf', '0']) self.assertEqual(config.max_log_files, None) def test_parse_arguments_max_logs(self): '''The max logs can be specified by "--max-logs" or "-ml" and defaults to 100.''' config = parse_arguments([]) self.assertEqual(config.max_logs, 100) config = parse_arguments(['--max-logs', '2']) self.assertEqual(config.max_logs, 2) config = parse_arguments(['-ml', '2']) self.assertEqual(config.max_logs, 2) def test_config_file(self): '''The config file command line argument can be specified by "--config" or "-c"''' self.patch_yaml_load({ '/foo/bar.yaml': { 'interface': 'lolcathost', 'port': 1337, 'logfile': 'stuff.log', 'redis': { 'host': 'rawrcathost', 'port': 3223, 'db': 9000, 'password': 't00r' }, 'amqp': { 'hostname': 'xorcathost', 'port': 2332, 'vhost': '/root', 'username': 'admin', 'password': 'nimda', }, 'inbound_message_ttl': 80, 'outbound_message_ttl': 90, 'channels': {'foo': 'bar'}, 'plugins': [{'type': 'foo.bar'}], 'metric_window': 2.0, 'logging_path': 'other-logs/', 'log_rotate_size': 2, 'max_log_files': 3, 'max_logs': 4, } }) config = parse_arguments(['--config', '/foo/bar.yaml']) self.assertEqual(config.interface, 'lolcathost') self.assertEqual(config.port, 1337) self.assertEqual(config.logfile, 'stuff.log') self.assertEqual(config.redis['host'], 'rawrcathost') self.assertEqual(config.redis['port'], 3223) self.assertEqual(config.redis['db'], 9000) self.assertEqual(config.redis['password'], 't00r') self.assertEqual(config.amqp['hostname'], 'xorcathost') self.assertEqual(config.amqp['vhost'], '/root') self.assertEqual(config.amqp['port'], 2332) self.assertEqual(config.amqp['username'], 'admin') self.assertEqual(config.amqp['password'], 'nimda') self.assertEqual(config.inbound_message_ttl, 80) self.assertEqual(config.outbound_message_ttl, 90) self.assertEqual(config.channels, {'foo': 'bar'}) self.assertEqual(config.plugins, [{'type': 'foo.bar'}]) self.assertEqual(config.metric_window, 2.0) self.assertEqual(config.logging_path, 'other-logs/') self.assertEqual(config.log_rotate_size, 2) self.assertEqual(config.max_log_files, 3) self.assertEqual(config.max_logs, 4) config = parse_arguments(['-c', '/foo/bar.yaml']) self.assertEqual(config.interface, 'lolcathost') self.assertEqual(config.port, 1337) self.assertEqual(config.logfile, 'stuff.log') self.assertEqual(config.redis['host'], 'rawrcathost') self.assertEqual(config.redis['port'], 3223) self.assertEqual(config.redis['db'], 9000) self.assertEqual(config.redis['password'], 't00r') self.assertEqual(config.amqp['hostname'], 'xorcathost') self.assertEqual(config.amqp['vhost'], '/root') self.assertEqual(config.amqp['port'], 2332) self.assertEqual(config.amqp['username'], 'admin') self.assertEqual(config.amqp['password'], 'nimda') self.assertEqual(config.inbound_message_ttl, 80) self.assertEqual(config.outbound_message_ttl, 90) self.assertEqual(config.channels, {'foo': 'bar'}) self.assertEqual(config.plugins, [{'type': 'foo.bar'}]) self.assertEqual(config.metric_window, 2.0) self.assertEqual(config.logging_path, 'other-logs/') self.assertEqual(config.log_rotate_size, 2) self.assertEqual(config.max_log_files, 3) self.assertEqual(config.max_logs, 4) def test_config_file_overriding(self): '''Config file options are overriden by their corresponding command line arguments''' self.patch_yaml_load({ '/foo/bar.yaml': { 'interface': 'lolcathost', 'port': 1337, 'logfile': 'stuff.log', 'redis': { 'host': 'rawrcathost', 'port': 3223, 'db': 9000, 'password': 't00r' }, 'amqp': { 'hostname': 'xorcathost', 'port': 2332, 'vhost': '/root', 'username': 'admin', 'password': 'nimda', }, 'plugins': [{'type': 'foo.bar'}], 'metric_window': 2.0, 'logging_path': 'other-logs/', 'log_rotate_size': 2, 'max_log_files': 3, 'max_logs': 4, } }) config = parse_arguments([ '-c', '/foo/bar.yaml', '-i', 'zuulcathost', '-p', '1620', '-l', 'logs.log', '-redish', 'bluish', '-redisp', '2112', '-redisdb', '23', '-redispass', 'cat', '-amqph', 'soup', '-amqpp', '2112', '-amqpvh', '/soho', '-amqpu', 'koenji', '-amqppass', 'kodama', '-pl', json.dumps({'type': 'bar.foo'}), '-mw', '3.0', '-lp', 'my-logs/', '-lrs', '100', '-mlf', '10', '-ml', '5', ]) self.assertEqual(config.interface, 'zuulcathost') self.assertEqual(config.port, 1620) self.assertEqual(config.logfile, 'logs.log') self.assertEqual(config.redis['host'], 'bluish') self.assertEqual(config.redis['port'], 2112) self.assertEqual(config.redis['db'], 23) self.assertEqual(config.redis['password'], 'cat') self.assertEqual(config.amqp['hostname'], 'soup') self.assertEqual(config.amqp['vhost'], '/soho') self.assertEqual(config.amqp['port'], 2112) self.assertEqual(config.amqp['username'], 'koenji') self.assertEqual(config.amqp['password'], 'kodama') self.assertEqual(sorted(config.plugins), [ {'type': 'bar.foo'}, {'type': 'foo.bar'} ]) self.assertEqual(config.metric_window, 3.0) self.assertEqual(config.logging_path, 'my-logs/') self.assertEqual(config.log_rotate_size, 100) self.assertEqual(config.max_log_files, 10) self.assertEqual(config.max_logs, 5) def test_logging_setup(self): '''If filename is None, just a stdout logger is created, if filename is not None, both the stdout logger and a file logger is created''' logging_setup(None) [handler] = logging.getLogger().handlers self.assertEqual(handler.stream.name, '') logging.getLogger().removeHandler(handler) filename = self.mktemp() logging_setup(filename) [handler1, handler2] = sorted( logging.getLogger().handlers, key=lambda h: hasattr(h, 'baseFilename')) self.assertEqual( os.path.abspath(handler2.baseFilename), os.path.abspath(filename)) self.assertEqual(handler1.stream.name, '') logging.getLogger().removeHandler(handler1) logging.getLogger().removeHandler(handler2) @inlineCallbacks def test_start_server(self): '''Starting the server should listen on the specified interface and port''' redis = yield self.get_redis() config = JunebugConfig({ 'port': 0, 'interface': 'localhost', 'redis': redis._config, }) service = yield start_server(config) port = service._port host = port.getHost() self.assertEqual(host.host, '127.0.0.1') self.assertEqual(host.type, 'TCP') self.assertTrue(host.port > 0) yield service.stopService() PKùh…HТÃú2ú2%junebug/tests/test_logging_service.pyimport json import logging import sys from twisted.internet.defer import inlineCallbacks from twisted.python.failure import Failure from twisted.python.log import LogPublisher from twisted.python.logfile import LogFile import junebug from junebug.tests.helpers import JunebugTestBase, DummyLogFile from junebug.logging_service import ( JunebugLogObserver, JunebugLoggerService, read_logs) class TestSentryLogObserver(JunebugTestBase): def setUp(self): self.logpath = self.mktemp() self.logfile = DummyLogFile(None, self.logpath, None, None) self.obs = JunebugLogObserver(self.logfile, 'worker-1') def assert_log(self, log, expected): '''Assert that a log matches what is expected.''' log = json.loads(log) timestamp = log.pop('timestamp') self.assertTrue(isinstance(timestamp, float)) self.assertEqual(log, expected) def test_level_for_event(self): '''The correct logging level is returned by `level_for_event`.''' for expected_level, event in [ (logging.WARN, {'logLevel': logging.WARN}), (logging.ERROR, {'isError': 1}), (logging.INFO, {}), ]: self.assertEqual(self.obs.level_for_event(event), expected_level) def test_logger_for_event(self): '''The correct logger name is returned by `logger_for_event`.''' self.assertEqual(self.obs.logger_for_event( {'system': 'foo,bar'}), 'foo.bar') def test_log_failure(self): '''A failure should be logged with the correct format.''' e = ValueError("foo error") f = Failure(e) self.obs({ 'failure': f, 'system': 'worker-1', 'isError': 1, 'message': [e.message]}) [log] = self.logfile.logs self.assert_log(log, { 'level': JunebugLogObserver.DEFAULT_ERROR_LEVEL, 'message': 'foo error', 'logger': 'worker-1', 'exception': { 'class': repr(ValueError), 'instance': repr(e), 'stack': [], }, }) def test_log_traceback(self): '''Logging a log with a traceback should place the traceback in the logfile.''' try: raise ValueError("foo") except ValueError: f = Failure(*sys.exc_info()) self.obs({ 'failure': f, 'isError': 1, 'message': ['foo'], 'system': 'worker-1'}) [log] = self.logfile.logs self.assert_log(log, { 'message': 'foo', 'logger': 'worker-1', 'level': logging.ERROR, 'exception': { 'class': repr(f.type), 'instance': repr(ValueError), # json encoding changes all tuples to lists 'stack': json.loads(json.dumps(f.stack)), }, }) def test_log_warning(self): '''Logging an warning level log should generate the correct level log message''' self.obs({ 'message': ["a"], 'system': 'foo', 'logLevel': logging.WARN, 'system': 'worker-1'}) [log] = self.logfile.logs self.assert_log(log, { 'level': logging.WARN, 'logger': 'worker-1', 'message': 'a', }) def test_log_info(self): '''Logging an info level log should generate the correct level log message''' self.obs({'message': ["a"], 'system': 'worker-1'}) [log] = self.logfile.logs self.assert_log(log, { 'logger': 'worker-1', 'message': 'a', 'level': logging.INFO }) def test_log_debug(self): '''Logging a debug level log should not generate a log, since it is below the minimum log level.''' self.obs({'message': ["a"], 'system': 'worker-1', 'logLevel': logging.DEBUG}) self.assertEqual(len(self.logfile.logs), 0) def test_log_with_context_sentinel(self): '''If the context sentinel has been set for a log, it should not be logged again.''' event = {'message': ["a"], 'system': 'worker-1'} event.update(self.obs.log_context) self.obs(event) self.assertEqual(len(self.logfile.logs), 0) def test_log_only_worker_id(self): '''A log should only be created when the worker id is in the system id of the log.''' self.obs({'message': ["a"], 'system': 'worker-1,bar'}) self.assertEqual(len(self.logfile.logs), 1) self.obs({'message': ["a"], 'system': 'worker-2,foo'}) self.assertEqual(len(self.logfile.logs), 1) self.obs({'message': ["a"], 'system': 'worker-1foo,bar'}) self.assertEqual(len(self.logfile.logs), 1) self.obs({'message': ["a"], 'system': None}) self.assertEqual(len(self.logfile.logs), 1) class TestJunebugLoggerService(JunebugTestBase): def setUp(self): self.patch(junebug.logging_service, 'LogFile', DummyLogFile) self.logger = LogPublisher() self.logpath = self.mktemp() self.service = JunebugLoggerService( 'worker-id', self.logpath, 1000000, 7, logger=self.logger) def assert_log(self, log, expected): '''Assert that a log matches what is expected.''' log = json.loads(log) timestamp = log.pop('timestamp') self.assertTrue(isinstance(timestamp, float)) self.assertEqual(log, expected) @inlineCallbacks def test_logfile_parameters(self): '''When the logfile is created, it should be created with the correct parameters.''' yield self.service.startService() logfile = self.service.logfile self.assertEqual(logfile.worker_id, 'worker-id') self.assertEqual(logfile.path, self.logpath) self.assertEqual(logfile.rotateLength, 1000000) self.assertEqual(logfile.maxRotatedFiles, 7) @inlineCallbacks def test_logging(self): '''The logging service should write logs to the logfile when the service is running.''' self.logger.msg("Hello") self.assertFalse(hasattr(self.service, 'logfile')) yield self.service.startService() logfile = self.service.logfile self.logger.msg("Hello", logLevel=logging.WARN, system='worker-id') [log] = logfile.logs self.assert_log(log, { 'level': logging.WARN, 'logger': 'worker-id', 'message': 'Hello', }) yield self.service.stopService() self.assertEqual(len(logfile.logs), 1) self.logger.msg("Foo", logLevel=logging.WARN) self.assertEqual(len(logfile.logs), 1) @inlineCallbacks def test_stop_not_running(self): '''If stopService is called when the service is not running, there should be no exceptions raised.''' yield self.service.stopService() self.assertFalse(self.service.running) @inlineCallbacks def test_start_stop(self): '''Stopping the service after it has been started should result in properly closing the logfile.''' self.assertFalse(self.service.registered()) yield self.service.startService() self.assertEqual(self.service.logfile.closed_count, 0) self.assertTrue(self.service.registered()) yield self.service.stopService() self.assertFalse(self.service.registered()) self.assertEqual(self.service.logfile.closed_count, 1) class TestReadingLogs(JunebugTestBase): def create_logfile(self): '''Creates and returns a temporary LogFile.''' return LogFile.fromFullPath(self.mktemp()) def test_read_empty_log(self): '''Reading an empty log should return an empty list.''' logfile = self.create_logfile() logs = read_logs(logfile, 10) self.assertEqual(logs, []) def test_read_single_less_than_total(self): '''Reading a single log from a file with multiple logs should only return the last written log.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.flush() [log] = read_logs(logfile, 1) self.assertEqual(log, {'log_entry': 2}) def test_read_single_equal_to_total(self): '''Reading a single log from a file with a single log should just return that log.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.flush() [log] = read_logs(logfile, 1) self.assertEqual(log, {'log_entry': 1}) def test_read_multiple_less_than_total(self): '''Reading multiple logs from a file with more logs than required should just return the required number of logs.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.write(json.dumps({'log_entry': 3}) + '\n') logfile.flush() [log1, log2] = read_logs(logfile, 2) self.assertEqual(log1, {'log_entry': 3}) self.assertEqual(log2, {'log_entry': 2}) def test_read_multiple_more_than_total(self): '''Reading multiple logs from a file with less logs than required should just return the number of logs available.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.flush() [log] = read_logs(logfile, 2) self.assertEqual(log, {'log_entry': 1}) def test_read_multiple_equal_than_total(self): '''Reading multiple logs from a file with the required amount of logs should just return the all of logs available.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.flush() [log1, log2] = read_logs(logfile, 2) self.assertEqual(log1, {'log_entry': 2}) self.assertEqual(log2, {'log_entry': 1}) def test_read_logs_from_multiple_files_more_than_available(self): '''If there are not enough logs in the current log, it should check the rotated log files for more logs. Total logs more than required logs.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.rotate() logfile.write(json.dumps({'log_entry': 3}) + '\n') logfile.write(json.dumps({'log_entry': 4}) + '\n') logfile.flush() [log1, log2, log3] = read_logs(logfile, 3) self.assertEqual(log1, {'log_entry': 4}) self.assertEqual(log2, {'log_entry': 3}) self.assertEqual(log3, {'log_entry': 2}) def test_read_logs_from_multiple_files_equal_available(self): '''If there are not enough logs in the current log, it should check the rotated log files for more logs. Total logs equal to required logs.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.rotate() logfile.write(json.dumps({'log_entry': 3}) + '\n') logfile.flush() [log1, log2, log3] = read_logs(logfile, 3) self.assertEqual(log1, {'log_entry': 3}) self.assertEqual(log2, {'log_entry': 2}) self.assertEqual(log3, {'log_entry': 1}) def test_read_logs_from_multiple_files_less_than_available(self): '''If there are not enough logs in the current log, it should check the rotated log files for more logs. Total logs less than to required logs.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.rotate() logfile.write(json.dumps({'log_entry': 2}) + '\n') logfile.flush() [log1, log2] = read_logs(logfile, 3) self.assertEqual(log1, {'log_entry': 2}) self.assertEqual(log2, {'log_entry': 1}) def test_read_single_log_bigger_than_buffer(self): '''If a single log entry is greater than the buffer size, it should still read the log entry correctly.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.flush() [log] = read_logs(logfile, 2, buf=1) self.assertEqual(log, {'log_entry': 1}) def test_read_log_incomplete_last_entry(self): '''If the last log entry does not end in a new line, then discard it.''' logfile = self.create_logfile() logfile.write(json.dumps({'log_entry': 1}) + '\n') logfile.write(json.dumps({'log_entry': 2})) logfile.flush() [log] = read_logs(logfile, 2, buf=1) self.assertEqual(log, {'log_entry': 1}) PKùh…Hk(iZ´ ´ junebug/tests/test_plugin.pyfrom copy import deepcopy from twisted.internet.defer import inlineCallbacks from junebug.channel import Channel from junebug.tests.helpers import FakeJunebugPlugin, JunebugTestBase class TestFakeJunebugPlugin(JunebugTestBase): @inlineCallbacks def test_plugin_start_plugin(self): '''Stores the name of the function call and arguments in calls''' plugin = FakeJunebugPlugin() config = yield self.create_channel_config() yield plugin.start_plugin({'test': 'plugin_config'}, config) [(name, [plugin_config, config_arg])] = plugin.calls self.assertEqual(name, 'start_plugin') self.assertEqual(config_arg, config) self.assertEqual(plugin_config, {'test': 'plugin_config'}) @inlineCallbacks def test_plugin_stop_plugin(self): '''Stores the name of the function call and arguments in calls''' plugin = FakeJunebugPlugin() config = yield self.create_channel_config() yield plugin.start_plugin({'test': 'plugin_config'}, config) plugin.calls = [] yield plugin.stop_plugin() [(name, [])] = plugin.calls self.assertEqual(name, 'stop_plugin') @inlineCallbacks def test_plugin_channel_started(self): '''Stores the name of the function call and arguments in calls''' plugin = FakeJunebugPlugin() config = yield self.create_channel_config() yield plugin.start_plugin({'test': 'pluginconfig'}, config) plugin.calls = [] redis = yield self.get_redis() channel = Channel( redis, config, deepcopy(self.default_channel_properties)) yield plugin.channel_started(channel) [(name, [channel_arg])] = plugin.calls self.assertEqual(name, 'channel_started') self.assertEqual(channel_arg, channel) @inlineCallbacks def test_plugin_channel_stopped(self): '''Stores the name of the function call and arguments in calls''' plugin = FakeJunebugPlugin() config = yield self.create_channel_config() yield plugin.start_plugin({'test': 'plugin_config'}, config) plugin.calls = [] redis = yield self.get_redis() channel = Channel( redis, config, deepcopy(self.default_channel_properties)) yield plugin.channel_stopped(channel) [(name, [channel_arg])] = plugin.calls self.assertEqual(name, 'channel_stopped') self.assertEqual(channel_arg, channel) PKwPGÚ3"1junebug/tests/test_amqp.pyimport json from twisted.internet.defer import inlineCallbacks from vumi.message import TransportUserMessage from junebug.amqp import ( AmqpConnectionError, AmqpFactory, JunebugAMQClient, RoutingKeyError) from junebug.tests.helpers import JunebugTestBase class FakeChannel(object): def __init__(self): self.messages = [] def basic_publish(self, **kwargs): self.messages.append(kwargs) class TestMessageSender(JunebugTestBase): @inlineCallbacks def setUp(self): yield self.start_server() self.message_sender = self.api.message_sender def test_amqp_factory_create_client(self): '''The amqp factory should create an amqp client with the given parameters and of type JunebugAMQClient''' factory = AmqpFactory('amqp-spec-0-8.xml', { 'vhost': '/'}, None, None) client = factory.buildProtocol('localhost') self.assertTrue(isinstance(client, JunebugAMQClient)) self.assertEqual(client.vhost, '/') def test_amqp_client_publish_message_defaults(self): '''The amqp client should call basic_publish on the channel with the proper message details''' factory = AmqpFactory('amqp-spec-0-8.xml', { 'vhost': '/'}, None, None) client = factory.buildProtocol('localhost') client.cached_channel = FakeChannel() msg = TransportUserMessage.send( to_addr='+1234', content='test', transport_name='testtransport') client.publish_message(msg) [amq_msg] = client.cached_channel.messages self.assertEqual(amq_msg['content']['delivery mode'], 2) self.assertEqual(amq_msg['exchange'], 'vumi') self.assertEqual(amq_msg['routing_key'], 'routing_key') vumi_msg = json.loads(amq_msg['content'].body) self.assertEqual(vumi_msg['message_id'], msg['message_id']) def test_amqp_client_publish_message(self): '''The amqp client should call basic_publish on the channel with the specified message details''' factory = AmqpFactory('amqp-spec-0-8.xml', { 'vhost': '/'}, None, None) client = factory.buildProtocol('localhost') client.cached_channel = FakeChannel() msg = TransportUserMessage.send( to_addr='+1234', content='test', transport_name='testtransport') client.publish_message( msg, delivery_mode=1, exchange_name='foo', routing_key='bar') [amq_msg] = client.cached_channel.messages self.assertEqual(amq_msg['content']['delivery mode'], 1) self.assertEqual(amq_msg['exchange'], 'foo') self.assertEqual(amq_msg['routing_key'], 'bar') vumi_msg = json.loads(amq_msg['content'].body) self.assertEqual(vumi_msg['message_id'], msg['message_id']) @inlineCallbacks def test_message_sender_send_message(self): '''The message sender should add a message to the correct queue when send_message is called''' msg = TransportUserMessage.send( to_addr='+1234', content='test', transport_name='testtransport') yield self.message_sender.send_message( msg, routing_key='testtransport') [rec_msg] = self.get_dispatched_messages('testtransport') self.assertEqual(rec_msg, msg) @inlineCallbacks def test_message_sender_send_multiple_messages(self): '''The message sender should send all messages to their correct queues when send_message is called multiple times''' msg1 = TransportUserMessage.send( to_addr='+1234', content='test1', transport_name='testtransport') yield self.message_sender.send_message( msg1, routing_key='testtransport') msg2 = TransportUserMessage.send( to_addr='+1234', content='test2', transport_name='testtransport') yield self.message_sender.send_message( msg2, routing_key='testtransport') [rec_msg1, rec_msg2] = self.get_dispatched_messages('testtransport') self.assertEqual(rec_msg1, msg1) self.assertEqual(rec_msg2, msg2) def test_message_sender_send_message_no_connection(self): '''The message sender should raise an error when there is no connection to send the message over''' self.message_sender.client = None msg = TransportUserMessage.send( to_addr='+1234', content='test', transport_name='testtransport') err = self.assertRaises( AmqpConnectionError, self.message_sender.send_message, msg, routing_key='testtransport') self.assertTrue('Message not sent' in str(err)) @inlineCallbacks def test_message_sender_bad_routing_key(self): '''If the routing key is invalid, the message sender should raise an error''' msg = TransportUserMessage.send( to_addr='+1234', content='test', transport_name='testtransport') err = yield self.assertFailure( self.message_sender.send_message(msg, routing_key='Foo'), RoutingKeyError) self.assertTrue('Foo' in str(err)) PKùh…Hó sý!P!Pjunebug/tests/test_workers.pyimport treq from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web.client import HTTPConnectionPool from vumi.application.tests.helpers import ApplicationHelper from vumi.message import TransportUserMessage, TransportEvent, TransportStatus from vumi.tests.helpers import PersistenceHelper from junebug.utils import conjoin, api_from_event, api_from_status from junebug.workers import ChannelStatusWorker, MessageForwardingWorker from junebug.tests.helpers import JunebugTestBase, RequestLoggingApi class TestMessageForwardingWorker(JunebugTestBase): @inlineCallbacks def setUp(self): self.logging_api = RequestLoggingApi() self.logging_api.setup() self.addCleanup(self.logging_api.teardown) self.url = self.logging_api.url self.worker = yield self.get_worker() connection_pool = HTTPConnectionPool(reactor, persistent=False) treq._utils.set_global_pool(connection_pool) @inlineCallbacks def get_worker(self, config=None): '''Get a new MessageForwardingWorker with the provided config''' if config is None: config = {} self.app_helper = ApplicationHelper(MessageForwardingWorker) yield self.app_helper.setup() self.addCleanup(self.app_helper.cleanup) persistencehelper = PersistenceHelper() yield persistencehelper.setup() self.addCleanup(persistencehelper.cleanup) config = conjoin(persistencehelper.mk_config({ 'transport_name': 'testtransport', 'mo_message_url': self.url.decode('utf-8'), 'inbound_ttl': 60, 'outbound_ttl': 60 * 60 * 24 * 2, 'metric_window': 1.0, }), config) worker = yield self.app_helper.get_application(config) returnValue(worker) @inlineCallbacks def assert_event_stored(self, event): key = '%s:outbound_messages:%s' % ( self.worker.config['transport_name'], 'msg-21') event_json = yield self.worker.redis.hget(key, event['event_id']) self.assertEqual(event_json, event.to_json()) @inlineCallbacks def test_channel_id(self): worker = yield self.get_worker({'transport_name': 'foo'}) self.assertEqual(worker.channel_id, 'foo') @inlineCallbacks def test_send_message_http(self): '''A sent message should be forwarded to the configured URL if a URL is set.''' msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield self.worker.consume_user_message(msg) [req] = self.logging_api.requests self.assert_request(req, method='POST', headers={ 'content-type': ['application/json'] }) self.assert_body_contains(req, to='+1234', content='testcontent') @inlineCallbacks def test_send_message_amqp(self): '''A sent message should be forwarded to the correct AMQP queue if the config option is set.''' worker = yield self.get_worker(config={ 'message_queue': 'testqueue' }) msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield worker.consume_user_message(msg) [dispatched_msg] = self.app_helper.get_dispatched( 'testqueue', 'inbound', TransportUserMessage) self.assertEqual(dispatched_msg, msg) @inlineCallbacks def test_send_message_bad_response(self): '''If there is an error sending a message to the configured URL, the error and message should be logged''' self.patch_logger() self.worker = yield self.get_worker({ 'transport_name': 'testtransport', 'mo_message_url': self.url + '/bad/', }) msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield self.worker.consume_user_message(msg) self.assert_was_logged("'content': 'testcontent'") self.assert_was_logged("'to': '+1234'") self.assert_was_logged('500') self.assert_was_logged('test-error-response') @inlineCallbacks def test_send_message_storing(self): '''Inbound messages should be stored in the InboundMessageStore''' msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield self.worker.consume_user_message(msg) redis = self.worker.redis key = '%s:inbound_messages:%s' % ( self.worker.config['transport_name'], msg['message_id']) msg_json = yield redis.hget(key, 'message') self.assertEqual(TransportUserMessage.from_json(msg_json), msg) @inlineCallbacks def test_receive_message_amqp(self): '''A received message on the configured queue should be forwarded to the transport queue if the config option is set.''' worker = yield self.get_worker(config={ 'message_queue': 'testqueue' }) msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield self.app_helper.dispatch_outbound( msg, connector_name='testqueue') [dispatched_msg] = yield self.app_helper.wait_for_dispatched_outbound( connector_name=worker.transport_name) self.assertEqual(dispatched_msg, msg) @inlineCallbacks def test_forward_ack_http(self): event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', self.url) yield self.worker.consume_ack(event) [req] = self.logging_api.requests self.assert_request( req, method='POST', headers={'content-type': ['application/json']}, body=api_from_event(self.worker.channel_id, event)) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_ack_amqp(self): '''A sent ack event should be forwarded to the correct AMQP queue if the config option is set.''' worker = yield self.get_worker(config={ 'message_queue': 'testqueue' }) event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_ack(event) [dispatched_msg] = self.app_helper.get_dispatched( 'testqueue', 'event', TransportEvent) self.assertEqual(dispatched_msg['event_id'], event['event_id']) @inlineCallbacks def test_forward_ack_bad_response(self): self.patch_logger() event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', "%s/bad/" % self.url) yield self.worker.consume_ack(event) self.assert_was_logged(repr(event)) self.assert_was_logged('500') self.assert_was_logged('test-error-response') yield self.assert_event_stored(event) @inlineCallbacks def test_forward_ack_no_message(self): self.patch_logger() event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield self.worker.consume_ack(event) self.assertEqual(self.logging_api.requests, []) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_nack_http(self): event = TransportEvent( event_type='nack', user_message_id='msg-21', nack_reason='too many foos', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', self.url) yield self.worker.consume_nack(event) [req] = self.logging_api.requests self.assert_request( req, method='POST', headers={'content-type': ['application/json']}, body=api_from_event(self.worker.channel_id, event)) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_nack_amqp(self): '''A sent nack event should be forwarded to the correct AMQP queue if the config option is set.''' worker = yield self.get_worker(config={ 'message_queue': 'testqueue' }) event = TransportEvent( event_type='nack', user_message_id='msg-21', nack_reason='too many foos', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_nack(event) [dispatched_msg] = self.app_helper.get_dispatched( 'testqueue', 'event', TransportEvent) self.assertEqual(dispatched_msg['event_id'], event['event_id']) @inlineCallbacks def test_forward_nack_bad_response(self): self.patch_logger() event = TransportEvent( event_type='nack', user_message_id='msg-21', nack_reason='too many foos', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', "%s/bad/" % (self.url,)) yield self.worker.consume_nack(event) self.assert_was_logged(repr(event)) self.assert_was_logged('500') self.assert_was_logged('test-error-response') yield self.assert_event_stored(event) @inlineCallbacks def test_forward_nack_no_message(self): self.patch_logger() event = TransportEvent( event_type='nack', user_message_id='msg-21', nack_reason='too many foos', timestamp='2015-09-22 15:39:44.827794') yield self.worker.consume_nack(event) self.assertEqual(self.logging_api.requests, []) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_dr_http(self): event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='pending', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', self.url) yield self.worker.consume_delivery_report(event) [req] = self.logging_api.requests self.assert_request( req, method='POST', headers={'content-type': ['application/json']}, body=api_from_event(self.worker.channel_id, event)) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_dr_amqp(self): '''A sent delivery report event should be forwarded to the correct AMQP queue if the config option is set.''' worker = yield self.get_worker(config={ 'message_queue': 'testqueue' }) event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='pending', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_nack(event) [dispatched_msg] = self.app_helper.get_dispatched( 'testqueue', 'event', TransportEvent) self.assertEqual(dispatched_msg['event_id'], event['event_id']) @inlineCallbacks def test_forward_dr_bad_response(self): self.patch_logger() event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='pending', timestamp='2015-09-22 15:39:44.827794') yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', "%s/bad/" % self.url) yield self.worker.consume_delivery_report(event) self.assert_was_logged(repr(event)) self.assert_was_logged('500') self.assert_was_logged('test-error-response') yield self.assert_event_stored(event) @inlineCallbacks def test_forward_dr_no_message(self): self.patch_logger() event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='pending', timestamp='2015-09-22 15:39:44.827794') yield self.worker.consume_delivery_report(event) self.assertEqual(self.logging_api.requests, []) yield self.assert_event_stored(event) @inlineCallbacks def test_forward_event_bad_event(self): self.patch_logger() event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') event['event_type'] = 'bad' yield self.worker.outbounds.store_event_url( self.worker.channel_id, 'msg-21', self.url) yield self.worker._forward_event(event) self.assertEqual(self.logging_api.requests, []) self.assert_was_logged("Discarding unrecognised event %r" % (event,)) @inlineCallbacks def test_outbound_message_rates(self): '''Outbound messages should increase the message send rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) msg = TransportUserMessage.send(to_addr='+1234', content='testcontent') yield worker.consume_user_message(msg) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'inbound', 1.0)), 1.0) @inlineCallbacks def test_submitted_event_rates(self): '''Acknowledge events should increase the submitted event rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_ack(event) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'submitted', 1.0)), 1.0) @inlineCallbacks def test_rejected_event_rates(self): '''Not-acknowledge events should increase the rejected event rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) event = TransportEvent( event_type='nack', nack_reason='bad message', user_message_id='msg-21', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_nack(event) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'rejected', 1.0)), 1.0) @inlineCallbacks def test_delivery_succeeded_event_rates(self): '''Delivered delivery reports should increase the delivery_succeeded event rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='delivered', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_delivery_report(event) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'delivery_succeeded', 1.0)), 1.0) @inlineCallbacks def test_delivery_failed_event_rates(self): '''Failed delivery reports should increase the delivery_failed event rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='failed', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_delivery_report(event) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'delivery_failed', 1.0)), 1.0) @inlineCallbacks def test_delivery_pending_event_rates(self): '''Pending delivery reports should increase the delivery_pending event rates.''' clock = self.patch_message_rate_clock() worker = yield self.get_worker({ 'message_rate_bucket': 1.0, }) event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', delivery_status='pending', timestamp='2015-09-22 15:39:44.827794') yield worker.consume_delivery_report(event) clock.advance(1) self.assertEqual((yield worker.message_rate.get_messages_per_second( 'testtransport', 'delivery_pending', 1.0)), 1.0) class TestChannelStatusWorker(JunebugTestBase): @inlineCallbacks def setUp(self): self.worker = yield self.get_worker() self.logging_api = RequestLoggingApi() self.logging_api.setup() self.addCleanup(self.logging_api.teardown) connection_pool = HTTPConnectionPool(reactor, persistent=False) treq._utils.set_global_pool(connection_pool) @inlineCallbacks def get_worker(self, config=None): '''Get a new ChannelStatusWorker with the provided config''' if config is None: config = {} app_helper = ApplicationHelper(ChannelStatusWorker) yield app_helper.setup() self.addCleanup(app_helper.cleanup) persistencehelper = PersistenceHelper() yield persistencehelper.setup() self.addCleanup(persistencehelper.cleanup) config = conjoin(persistencehelper.mk_config({ 'channel_id': 'testchannel', }), config) worker = yield app_helper.get_application(config) returnValue(worker) @inlineCallbacks def test_status_stored_in_redis(self): '''The published status gets consumed and stored in redis under the correct key''' status = TransportStatus( component='foo', status='ok', type='bar', message='Bar') yield self.worker.consume_status(status) redis_status = yield self.worker.store.redis.hget( 'testchannel:status', 'foo') self.assertEqual(redis_status, status.to_json()) @inlineCallbacks def test_status_sent_to_status_url(self): '''The published status gets consumed and sent to the configured status_url''' worker = yield self.get_worker({ 'channel_id': 'channel-23', 'status_url': self.logging_api.url, }) status = TransportStatus( component='foo', status='ok', type='bar', message='Bar') yield worker.consume_status(status) [req] = self.logging_api.requests self.assert_request( req, method='POST', headers={'content-type': ['application/json']}, body=api_from_status('channel-23', status)) @inlineCallbacks def test_status_send_to_status_url_bad_response(self): '''If there is an error sending a status to the configured status_url, the error and status should be logged''' self.patch_logger() worker = yield self.get_worker({ 'channel_id': 'channel-23', 'status_url': "%s/bad/" % (self.logging_api.url,), }) status = TransportStatus( component='foo', status='ok', type='bar', message='Bar') yield worker.consume_status(status) self.assert_was_logged('500') self.assert_was_logged('test-error-response') self.assert_was_logged(repr(status)) PKY]ŒH¯k/{00junebug/tests/test_utils.pyimport json from datetime import date from twisted.web import http from twisted.trial.unittest import TestCase from twisted.internet.defer import inlineCallbacks import treq from klein import Klein from junebug.tests.utils import ToyServer from junebug.utils import ( response, json_body, conjoin, omit, message_from_api, api_from_message, api_from_event, api_from_status, channel_public_http_properties) from vumi.message import TransportUserMessage, TransportEvent, TransportStatus class TestUtils(TestCase): @inlineCallbacks def test_response_data(self): srv = yield ToyServer.from_test(self) @srv.app.route('/') def route(req): return response(req, 'bar', {'foo': 23}) resp = yield treq.get(srv.url, persistent=False) content = yield resp.json() self.assertEqual(content['result'], {'foo': 23}) self.assertEqual(content['code'], 'OK') self.assertEqual(content['status'], 200) self.assertEqual(content['description'], 'bar') @inlineCallbacks def test_response_content_type(self): srv = yield ToyServer.from_test(self) @srv.app.route('/') def route(req): return response(req, '', {}) resp = yield treq.get(srv.url, persistent=False) self.assertEqual( resp.headers.getRawHeaders('Content-Type'), ['application/json']) @inlineCallbacks def test_response_code(self): srv = yield ToyServer.from_test(self) @srv.app.route('/') def route(req): return response(req, '', {}, http.BAD_REQUEST) resp = yield treq.get(srv.url, persistent=False) self.assertEqual(resp.code, http.BAD_REQUEST) @inlineCallbacks def test_json_body(self): class Api(object): app = Klein() @app.route('/') @json_body def route(self, req, body): bodies.append(body) bodies = [] srv = yield ToyServer.from_test(self, Api().app) yield treq.get( srv.url, persistent=False, data=json.dumps({'foo': 23})) self.assertEqual(bodies, [{'foo': 23}]) def test_conjoin(self): a = { 'foo': 21, 'bar': 23, } b = { 'bar': 'baz', 'quux': 'corge', } self.assertEqual(conjoin(a, b), { 'foo': 21, 'bar': 'baz', 'quux': 'corge', }) self.assertEqual(a, { 'foo': 21, 'bar': 23, }) self.assertEqual(b, { 'bar': 'baz', 'quux': 'corge', }) def test_omit(self): coll = { 'foo': 'bar', 'baz': 'quux', 'corge': 'grault', 'garply': 'waldo', } self.assertEqual(omit(coll, 'foo', 'garply'), { 'baz': 'quux', 'corge': 'grault', }) self.assertEqual(coll, { 'foo': 'bar', 'baz': 'quux', 'corge': 'grault', 'garply': 'waldo', }) def test_api_from_message(self): '''The api from message function should take a vumi message, and return a dict with the appropriate values''' message = TransportUserMessage.send( content=None, from_addr='+1234', to_addr='+5432', transport_name='testtransport', continue_session=True, helper_metadata={'voice': {}}) dct = api_from_message(message) [dct.pop(f) for f in ['timestamp', 'message_id']] self.assertEqual(dct, { 'channel_data': { 'continue_session': True, 'voice': {}, }, 'from': '+1234', 'to': '+5432', 'channel_id': 'testtransport', 'content': None, 'reply_to': None, }) def test_message_from_api(self): msg = message_from_api( 'channel-id', { 'from': '+1234', 'content': None, 'channel_data': { 'continue_session': True, 'voice': {}, }, }) msg = TransportUserMessage.send(**msg) self.assertEqual(msg.get('continue_session'), True) self.assertEqual(msg.get('helper_metadata'), {'voice': {}}) self.assertEqual(msg.get('from_addr'), '+1234') self.assertEqual(msg.get('content'), None) def test_message_from_api_reply(self): msg = message_from_api( 'channel-id', { 'reply_to': 1234, 'content': 'foo', 'channel_data': { 'continue_session': True, 'voice': {}, }, }) self.assertFalse('to_addr' in msg) self.assertFalse('from_addr' in msg) self.assertEqual(msg['continue_session'], True) self.assertEqual(msg['helper_metadata'], {'voice': {}}) self.assertEqual(msg['content'], 'foo') def test_api_from_event_ack(self): self.assertEqual(api_from_event('channel-23', TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp=date(2321, 2, 3), )), { 'event_type': 'submitted', 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_event_nack(self): self.assertEqual(api_from_event('channel-23', TransportEvent( event_type='nack', user_message_id='msg-21', timestamp=date(2321, 2, 3), nack_reason='too many lemons', )), { 'event_type': 'rejected', 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {'reason': 'too many lemons'}, }) def test_api_from_event_dr_pending(self): self.assertEqual(api_from_event('channel-23', TransportEvent( event_type='delivery_report', user_message_id='msg-21', timestamp=date(2321, 2, 3), delivery_status='pending', )), { 'event_type': 'delivery_pending', 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_event_dr_delivered(self): self.assertEqual(api_from_event('channel-23', TransportEvent( event_type='delivery_report', user_message_id='msg-21', timestamp=date(2321, 2, 3), delivery_status='delivered', )), { 'event_type': 'delivery_succeeded', 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_event_dr_failed(self): self.assertEqual(api_from_event('channel-23', TransportEvent( event_type='delivery_report', user_message_id='msg-21', timestamp=date(2321, 2, 3), delivery_status='failed', )), { 'event_type': 'delivery_failed', 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_event_dr_unknown(self): event = TransportEvent( event_type='delivery_report', user_message_id='msg-21', timestamp=date(2321, 2, 3), delivery_status='pending') event['delivery_status'] = 'unknown' self.assertEqual(api_from_event('channel-23', event), { 'event_type': None, 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_event_unknown_type(self): event = TransportEvent( event_type='ack', user_message_id='msg-21', sent_message_id='msg-21', timestamp=date(2321, 2, 3)) event['event_type'] = 'unknown' self.assertEqual(api_from_event('channel-23', event), { 'event_type': None, 'channel_id': 'channel-23', 'message_id': 'msg-21', 'timestamp': date(2321, 2, 3), 'event_details': {}, }) def test_api_from_status(self): status = TransportStatus( component='foo', status='ok', type='bar', message='Bar', details={'baz': 'quux'}) self.assertEqual(api_from_status('channel-23', status), { 'channel_id': 'channel-23', 'status': 'ok', 'component': 'foo', 'type': 'bar', 'message': 'Bar', 'details': {'baz': 'quux'} }) def test_public_http_properties_explicit(self): result = channel_public_http_properties({ 'config': { 'web_path': '/baz/quux', 'web_port': 2121, }, 'public_http': { 'web_path': '/foo/bar', 'web_port': 2323, }, }) self.assertEqual(result, { 'enabled': True, 'web_path': '/foo/bar', 'web_port': 2323 }) def test_public_http_properties_explicit_enable(self): result = channel_public_http_properties({ 'public_http': { 'enabled': True, 'web_path': '/foo/bar', 'web_port': 2323, } }) self.assertTrue(result['enabled']) def test_public_http_properties_explicit_disable(self): result = channel_public_http_properties({ 'public_http': { 'enabled': False, 'web_path': '/foo/bar', 'web_port': 2323 } }) self.assertFalse(result['enabled']) def test_public_http_properties_explicit_no_port(self): result = channel_public_http_properties({ 'public_http': {'web_path': '/foo/bar'} }) self.assertEqual(result, None) def test_public_http_properties_explicit_no_path(self): result = channel_public_http_properties({ 'public_http': {'web_port': 2323} }) self.assertEqual(result, None) def test_public_http_properties_explicit_implicit_path(self): result = channel_public_http_properties({ 'config': { 'web_path': '/foo/bar', }, 'public_http': { 'web_port': 2323 }, }) self.assertEqual(result, { 'enabled': True, 'web_path': '/foo/bar', 'web_port': 2323 }) def test_public_http_properties_explicit_implicit_port(self): result = channel_public_http_properties({ 'config': { 'web_port': 2323, }, 'public_http': { 'web_path': '/foo/bar', }, }) self.assertEqual(result, { 'enabled': True, 'web_path': '/foo/bar', 'web_port': 2323 }) def test_public_http_properties_implicit(self): result = channel_public_http_properties({ 'config': { 'web_port': 2323, 'web_path': '/foo/bar', }, }) self.assertEqual(result, { 'enabled': True, 'web_path': '/foo/bar', 'web_port': 2323 }) def test_public_http_properties_implicit_no_port(self): result = channel_public_http_properties({'web_path': '/foo/bar'}) self.assertEqual(result, None) def test_public_http_properties_implicit_no_path(self): result = channel_public_http_properties({'web_port': 2323}) self.assertEqual(result, None) PK<_ŒHu˜øø'junebug-0.1.4.dist-info/DESCRIPTION.rstJunebug ======= |junebug-docs| .. |junebug-docs| image:: https://readthedocs.org/projects/junebug/badge/?version=latest :alt: Documentation :scale: 100% :target: http://junebug.readthedocs.org/ Junebug is an open-source server application providing SMS and USSD gateway connectivity for integrators, operators and application developers. Junebug enables integrators to automate the setup, monitoring, logging, and health checking of population scale messaging integrations. Junebug is a system for managing text messaging transports via a RESTful HTTP interface that supports: * Creating, introspecting, updating and deleting transports * Sending and receiving text messages * Receiving status updates on text messages sent * Monitoring transport health and performance * Retrieving recent transport logs for debugging transport issues. Design Principles ----------------- Junebug aims to satisfy the following broad criteria: * Easy to install * Minimal useful feature set * Sane set of dependencies Documentation ------------- Documentation is available online at http://junebug.readthedocs.org/ and in the `docs` directory of the repository. .. |junebug-docs| image:: https://readthedocs.org/projects/junebug/badge/?version=latest :alt: Documentation :scale: 100% :target: http://junebug.readthedocs.org/ To build the docs locally:: $ virtualenv ve $ source ve/bin/activate (ve)$ pip install -r requirements-docs.txt (ve)$ cd docs (ve)$ make html You'll find the docs in `docs/_build/index.html` You can contact the Junebug development team in the following ways: * via *email* by joining the the `junebug@googlegroups.com`_ mailing list * on *irc* in *#junebug* on the `Freenode IRC network`_ .. _junebug@googlegroups.com: https://groups.google.com/forum/?fromgroups#!forum/junebug .. _Freenode IRC network: https://webchat.freenode.net/?channels=#junebug Issues can be filed in the GitHub issue tracker. Please don't use the issue tracker for general support queries. PK<_ŒHÑç>•>>(junebug-0.1.4.dist-info/entry_points.txt [console_scripts] jb = junebug.command_line:main PK<_ŒH-”Я%junebug-0.1.4.dist-info/metadata.json{"classifiers": ["Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: BSD License", "Operating System :: POSIX", "Programming Language :: Python", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Internet :: WWW/HTTP"], "extensions": {"python.commands": {"wrap_console": {"jb": "junebug.command_line:main"}}, "python.details": {"contacts": [{"email": "dev@praekeltfoundation.org", "name": "Praekelt Foundation", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "http://github.com/praekelt/junebug"}}, "python.exports": {"console_scripts": {"jb": "junebug.command_line:main"}}}, "extras": [], "generator": "bdist_wheel (0.26.0)", "license": "BSD", "metadata_version": "2.0", "name": "junebug", "run_requires": [{"requires": ["PyYAML", "confmodel", "jsonschema", "klein", "pyasn1", "treq", "vumi (>=0.5.33)"]}], "summary": "('A system for managing text messaging transports via a RESTful HTTP ', 'interface')", "version": "0.1.4"}PK<_ŒHÌ‹ ð%junebug-0.1.4.dist-info/top_level.txtjunebug PK<_ŒHŒ''\\junebug-0.1.4.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py2-none-any PK<_ŒH’éà  junebug-0.1.4.dist-info/METADATAMetadata-Version: 2.0 Name: junebug Version: 0.1.4 Summary: ('A system for managing text messaging transports via a RESTful HTTP ', 'interface') Home-page: http://github.com/praekelt/junebug Author: Praekelt Foundation Author-email: dev@praekeltfoundation.org License: BSD Platform: UNKNOWN Classifier: Development Status :: 4 - Beta Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: BSD License Classifier: Operating System :: POSIX Classifier: Programming Language :: Python Classifier: Topic :: Software Development :: Libraries :: Python Modules Classifier: Topic :: Internet :: WWW/HTTP Requires-Dist: PyYAML Requires-Dist: confmodel Requires-Dist: jsonschema Requires-Dist: klein Requires-Dist: pyasn1 Requires-Dist: treq Requires-Dist: vumi (>=0.5.33) Junebug ======= |junebug-docs| .. |junebug-docs| image:: https://readthedocs.org/projects/junebug/badge/?version=latest :alt: Documentation :scale: 100% :target: http://junebug.readthedocs.org/ Junebug is an open-source server application providing SMS and USSD gateway connectivity for integrators, operators and application developers. Junebug enables integrators to automate the setup, monitoring, logging, and health checking of population scale messaging integrations. Junebug is a system for managing text messaging transports via a RESTful HTTP interface that supports: * Creating, introspecting, updating and deleting transports * Sending and receiving text messages * Receiving status updates on text messages sent * Monitoring transport health and performance * Retrieving recent transport logs for debugging transport issues. Design Principles ----------------- Junebug aims to satisfy the following broad criteria: * Easy to install * Minimal useful feature set * Sane set of dependencies Documentation ------------- Documentation is available online at http://junebug.readthedocs.org/ and in the `docs` directory of the repository. .. |junebug-docs| image:: https://readthedocs.org/projects/junebug/badge/?version=latest :alt: Documentation :scale: 100% :target: http://junebug.readthedocs.org/ To build the docs locally:: $ virtualenv ve $ source ve/bin/activate (ve)$ pip install -r requirements-docs.txt (ve)$ cd docs (ve)$ make html You'll find the docs in `docs/_build/index.html` You can contact the Junebug development team in the following ways: * via *email* by joining the the `junebug@googlegroups.com`_ mailing list * on *irc* in *#junebug* on the `Freenode IRC network`_ .. _junebug@googlegroups.com: https://groups.google.com/forum/?fromgroups#!forum/junebug .. _Freenode IRC network: https://webchat.freenode.net/?channels=#junebug Issues can be filed in the GitHub issue tracker. Please don't use the issue tracker for general support queries. PK<_ŒHásư ° junebug-0.1.4.dist-info/RECORDjunebug/__init__.py,sha256=jTcnufE3LvrnWN6tBfQnEyF7oUEUNKhXFy62YNn3YxU,905 junebug/amqp.py,sha256=DZglxN_BzLgxHJ-e_VS_AKRqlSb2sBg-3jjk7CHqd6I,5793 junebug/api.py,sha256=ZN1sRrgR_l8JKCGVMe4I5q6LL6TLJtYC9lI6ONWa4fY,11014 junebug/channel.py,sha256=LM7Ayjn-U37oTxGZnugBm_WzwVv88_IrHRl-ppa--Gc,15739 junebug/command_line.py,sha256=f5R7gTXx4rsWip5CUpJUA-VH6aLxH12rNZs63KKRPDs,8285 junebug/config.py,sha256=yO0uoSpVmSDdVu24zKJnEYiWvisMeDux4zRmghnri_g,2369 junebug/error.py,sha256=LoBgaLBhJxQu9UtQM-dxceO9zQusqwhXjgmT-THXkm8,241 junebug/logging_service.py,sha256=YxD9P1kWLtfkey13MPv8TW6RY4c3VomVUgWuGNcCbKY,6714 junebug/plugin.py,sha256=ilQeycbaxfpBv5hRNlSnmUI46wXnpGL4m53FoTdvWRE,1240 junebug/service.py,sha256=bAKV0JGJKaCEgdgQsUUrEUpBG4FhmaHHSE3wYuhMbyQ,1249 junebug/stores.py,sha256=zQOWXtz7XJFTd7TP_YilfWEnNW5Y4WSyHiAWJLeOm48,7586 junebug/utils.py,sha256=mfg1iXLOzanOXHfGcOgzSdPwOT__jRXGPg7kY0Kaq7g,3756 junebug/validate.py,sha256=ox7F_solfR9aMRsHsjFbYGu1kSXJKiEd21pnmpfgZ20,919 junebug/workers.py,sha256=aXogoOXWTODw0uNUWxpfPxGjH-7t4_G6ucLoEvCUpLc,8290 junebug/plugins/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 junebug/plugins/nginx/__init__.py,sha256=43Y2Eqq47-LINEkkB0HlJ6PW9NwdQnU4TQPvFAg_nCQ,121 junebug/plugins/nginx/location.template,sha256=fCRKIwBqwPm_aehly36ir2rGzTW2eS6QVWq-Vc8ZTpM,62 junebug/plugins/nginx/plugin.py,sha256=ZGti5_3orI0faG5f1AJD7bef_k7g9gzaiVlUI0MLrQc,4103 junebug/plugins/nginx/vhost.template,sha256=91LsabQ-iFsx6RR81l7Ofhmkr3be-S8_a-OccOEQj-A,79 junebug/plugins/nginx/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 junebug/plugins/nginx/tests/test_plugin.py,sha256=NMDsjOsCSFTYIHFNIUBErWHEAYL05xb9GGjSiqhK-W8,15113 junebug/tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 junebug/tests/helpers.py,sha256=2k3QQgsc-wDI9qGgftgBQYcdz4PUryTkdzsSf5hncnQ,12258 junebug/tests/test_amqp.py,sha256=11yZ23a_5IbEHiyC3qz5I2Xs9TX9HqyhvoHg355H1R0,5120 junebug/tests/test_api.py,sha256=WAp1p0RxXRNlW3dCB2L_AQLeChdp4hqNLmArPhSGp7o,32895 junebug/tests/test_channel.py,sha256=iXzs-O3FLooxKakIqSwhkLXXLmsGsFZsODbhoihQ058,33350 junebug/tests/test_command_line.py,sha256=ft1WqrExL4I_hIgxaWFUnSf1qbKVhBEnil5C1x8iE6I,21023 junebug/tests/test_logging_service.py,sha256=930yrdiEDL1-EIYNW1g-RZT_rjnuVMxgWlhDmM9o_II,13050 junebug/tests/test_plugin.py,sha256=I2XFpAA-DWqrsGalk4CJvpP5m3H-ESbsvO82W65KDzA,2484 junebug/tests/test_service.py,sha256=bE6c32jYS_1NFe_WoTi1WTZNjG_JMib0h7KNPeHU4YU,967 junebug/tests/test_stores.py,sha256=M22eQ7AqW7SEM78N0SjwVa-EPthHliLPZxqJgdW1y28,17544 junebug/tests/test_utils.py,sha256=-bBQExYFwmY8wpJlSEgRTPkf-ZE5YFdgfjbKfEx3Vdg,12292 junebug/tests/test_validate.py,sha256=bNwmrvwA7xMf11c5MF6L0uf-5d6KgL1EA2vwDApmslE,2857 junebug/tests/test_workers.py,sha256=t5Ehn5Ltnsg7AFAhy8ttZO5idq-lQYNXZx9PQ_LeckE,20513 junebug/tests/utils.py,sha256=0nxFbmio8k2y3B0JvrLJkGe6KiSGmoTCCZKJXKekdjA,752 junebug-0.1.4.dist-info/DESCRIPTION.rst,sha256=I6a7wMCVsIlDnoxeLsPK6P88OGhuUSErb9R0WOry-Kw,2040 junebug-0.1.4.dist-info/METADATA,sha256=dQsT7jj5dMaa2x5wL0WEK0IaDitKd_1ziB2tr3tMs5g,2833 junebug-0.1.4.dist-info/RECORD,, junebug-0.1.4.dist-info/WHEEL,sha256=JTb7YztR8fkPg6aSjc571Q4eiVHCwmUDlX8PhuuqIIE,92 junebug-0.1.4.dist-info/entry_points.txt,sha256=21R9rTAMy5YUnblx0D19kwWgALW3aeWyzpSqcdtkrUg,62 junebug-0.1.4.dist-info/metadata.json,sha256=9lVssuQgS556G5IRK-znhQW2mCrgxQDuS-ZgdOQQXu4,1047 junebug-0.1.4.dist-info/top_level.txt,sha256=fmQgvjTZDQDzNch_2Mt-hDDDBRRLKlw17xjNTUHOrEQ,8 PKY]ŒHYí++junebug/api.pyPKwPGÇŠ’Š¡¡2+junebug/amqp.pyPKY]ŒHçð:¬¬Bjunebug/utils.pyPKùh…HeILSb b ÚPjunebug/workers.pyPKùh…HÐPŠ˜——lqjunebug/validate.pyPKûh…H…îg^] ] 4ujunebug/command_line.pyPKY]ŒH» ¥Ô‰‰Æ•junebug/__init__.pyPKùh…HáŸó8¢¢€™junebug/stores.pyPKwPGÂ!ƒoááQ·junebug/service.pyPKùh…H;Éï£{={=b¼junebug/channel.pyPKwPGOÓÜhññ újunebug/error.pyPKùh…H¹¬::,ûjunebug/logging_service.pyPKùh…Hsh×ØØžjunebug/plugin.pyPKùh…H?vA A ¥junebug/config.pyPKùh…H$junebug/plugins/__init__.pyPKùh…Ho2Æœyy!N$junebug/plugins/nginx/__init__.pyPKùh…H?ÊOO$%junebug/plugins/nginx/vhost.templatePKCs…H(Q4—%junebug/plugins/nginx/plugin.pyPKùh…HÏê‡>>'Û5junebug/plugins/nginx/location.templatePKùh…H'^6junebug/plugins/nginx/tests/__init__.pyPKY]ŒH&qæ ; ;*£6junebug/plugins/nginx/tests/test_plugin.pyPKY]ŒH”ûé…€€ôqjunebug/tests/test_api.pyPKùh…HŠsm<) ) ªòjunebug/tests/test_validate.pyPKwPGví;ððþjunebug/tests/utils.pyPKùh…HrßžfF‚F‚3junebug/tests/test_channel.pyPKùh…HûˆDˆD´ƒjunebug/tests/test_stores.pyPKwPGºÜщÇÇvÈjunebug/tests/test_service.pyPKY]ŒH¾EdHâ/â/xÌjunebug/tests/helpers.pyPKwPGüjunebug/tests/__init__.pyPKùh…HÄ0í„RR"Çüjunebug/tests/test_command_line.pyPKùh…HТÃú2ú2%&Ojunebug/tests/test_logging_service.pyPKùh…Hk(iZ´ ´ c‚junebug/tests/test_plugin.pyPKwPGÚ3"1QŒjunebug/tests/test_amqp.pyPKùh…Hó sý!P!P‰ junebug/tests/test_workers.pyPKY]ŒH¯k/{00åðjunebug/tests/test_utils.pyPK<_ŒHu˜øø'"!junebug-0.1.4.dist-info/DESCRIPTION.rstPK<_ŒHÑç>•>>(_)junebug-0.1.4.dist-info/entry_points.txtPK<_ŒH-”Я%ã)junebug-0.1.4.dist-info/metadata.jsonPK<_ŒHÌ‹ ð%=.junebug-0.1.4.dist-info/top_level.txtPK<_ŒHŒ''\\ˆ.junebug-0.1.4.dist-info/WHEELPK<_ŒH’éà  /junebug-0.1.4.dist-info/METADATAPK<_ŒHásư ° n:junebug-0.1.4.dist-info/RECORDPK**ý ZH