PKUOL#@@micropede/__init__.py""" Python interface for Micropede """ __version__ = '0.0.27' PKJILmicropede/api.pyfrom onoff import OnOffMixin class Topics(OnOffMixin): """ Mixins for listening and subscribing to mqtt messages, with helpers to automate common app framework uri structures like: state, put, notify, status, trigger, and signal. These mixins only automate the naming conventions and the expected retain behaviour. """ def on_state_msg(self, sender, val, method): return self.add_subscription(f'{self.app_name}/{sender}/state/{val}', method) def bind_state_msg(self, val, event, persist=True): return self.add_binding(f'{self.app_name}/{self.name}/state/{val}', event, persist) def on_put_msg(self, val, method): return self.add_subscription(f'{self.app_name}/put/{self.name}/{val}', method) def bind_put_msg(self, receiver, val, event): return self.add_binding(f'{self.app_name}/put/{receiver}/{val}', event) def on_notify_msg(self, sender, topic, method): return self.add_subscription(f'{self.app_name}/{sender}/notify/{self.name}/{topic}', method) def bind_notify_msg(self, receiver, topic, event): return self.add_binding(f'{self.app_name}/{self.name}/notify/{receiver}/{topic}', event) def on_status_msg(self, sender, method): return self.add_subscription(f'{self.app_name}/status/{sender}', method) def bind_status_msg(self, event): return self.add_binding(f'{self.app_name}/status/{self.name}', event) def on_trigger_msg(self, action, method): return self.add_subscription(f'{self.app_name}/trigger/{self.name}/{action}', method) def bind_trigger_msg(self, receiver, action, event): return self.add_binding(f'{self.app_name}/trigger/{receiver}/{action}', event) def on_signal_msg(self, sender, topic, method): return self.add_subscription(f'{self.app_name}/{sender}/signal/{topic}', method) def bind_signal_msg(self, topic, event): return self.add_binding(f'{self.app_name}/{self.name}/signal/{topic}', event) PKJILKmicropede/async.pyimport asyncio import json import uuid import pydash as _ from .client import MicropedeClient, generate_client_id, set_timeout DEFAULT_TIMEOUT = 5000 class MicropedeAsync(): def __init__(self, app_name, host='localhost', port=None, version='0.0.0'): try: if (app_name is None): raise(Exception("app_name is None")) name = f'micropede-async-{uuid.uuid1()}-{uuid.uuid4()}' self.client = MicropedeClient(app_name, host, port, name, version) self.safe = self.client.safe self.client.listen = _.noop except Exception as e: raise Exception(self.dump_stack(self.client.name, e)) async def reset(self): host = self.client.host port = self.client.port name = self.client.name app_name = self.client.app_name self.client.client_id = generate_client_id(name, app_name) await self.client.disconnect_client() await self.client.connect_client(self.client.client_id, host, port) return async def get_state(self, sender, prop, timeout=DEFAULT_TIMEOUT): label = f'{self.client.app_name}::get_state' topic = f'{self.client.app_name}/{sender}/state/{prop}' timer = None future = asyncio.Future() try: self.enforce_single_subscription(label) await self.reset() except Exception as e: raise Exception(self.dump_stack(self.client.name, e)) def on_state_msg(payload, params): # clear timeout ?? def on_disconnect(d): if (future.done() == False): future.set_result(payload) f1 = self.client.disconnect_client() f1.add_done_callback(self.safe(on_disconnect)) self.client.on_state_msg(sender, prop, on_state_msg) def on_timeout(): if future.done(): return def on_disconnect(d): future.set_exception( self.dump_stack(label, [topic, f'timeout {timeout}ms'])) f1 = self.client.disconnect_client() f1.add_done_callback(self.safe(on_disconnect)) set_timeout(self.safe(on_timeout), timeout) return await future async def get_subscriptions(self, receiver, timeout=DEFAULT_TIMEOUT): payload = await self.trigger_plugin(receiver, 'get-subscriptions', {}, timeout) return _.get(payload, 'response') async def put_plugin(self, receiver, prop, val, timeout=DEFAULT_TIMEOUT): if (not _.is_dict(val)): msg = {} _.set_(msg, prop, val) val = msg result = await self.call_action(receiver, prop, val, 'put', timeout) return result async def trigger_plugin(self, receiver, action, val={}, timeout=DEFAULT_TIMEOUT): result = await self.call_action(receiver, action, val, 'trigger', timeout) return result async def call_action(self, receiver, action, val={}, msg_type='trigger', timeout=DEFAULT_TIMEOUT): label = f'{self.client.app_name}::callAction::{msg_type}::{action}' future = asyncio.Future() # import pdb; pdb.set_trace() done = False timer = None no_timeout = False if timeout is -1: no_timeout = True _.set_(val, '__head__.plugin_name', self.client.name) _.set_(val, '__head__.version', self.client.version) topic = f'{self.client.app_name}/{msg_type}/{receiver}/{action}' try: self.enforce_single_subscription(label) await self.reset() except Exception as e: raise self.dump_stack(label, [topic, e]) def on_notify(payload, params): # clear timer?? def on_disconnect(d): if future.done(): return if (_.get(payload, 'status') is not None): if (_.get(payload, 'status') != 'success'): future.set_exception( self.dump_stack(label, _.get(payload, 'status'))) else: print("WARNING: ", label, 'message did not contain status') if future.done() == False: future.set_result(payload) f1 = self.client.disconnect_client() f1.add_done_callback(self.safe(on_disconnect)) self.client.on_notify_msg(receiver, action, self.safe(on_notify)) self.client.send_message(topic, val) if (no_timeout is not None): def on_timeout(): if future.done(): return def on_disconnect(d): future.set_exception( self.dump_stack(label, [topic, f'timeout {timeout}ms'])) f1 = self.client.disconnect_client() f1.add_done_callback(self.safe(on_disconnect)) set_timeout(self.safe(on_timeout), timeout) return await future def enforce_single_subscription(self, label): total_subscriptions = len(self.client.subscriptions) default_subscriptions = self.client.default_sub_count if (total_subscriptions - default_subscriptions > 1): msg = 'only one active sub per async client' raise self.dump_stack(label, msg) def dump_stack(self, label, err): if (err is None): return Exception(_.flatten_deep([label, 'unknown error'])) if (_.get(err, 'args')): return Exception(_.flatten_deep([label, err.args])) else: return Exception(_.flatten_deep([label, err])) PKJIL.@)@)micropede/client.py"""Micropede Client for Python""" __version__ = '0.0.2' import asyncio import functools import inspect import json import os import time import random import re import urllib import uuid from threading import Timer, Thread import paho.mqtt.client as mqtt from wheezy.routing import PathRouter import pydash as _ from .api import Topics DEFAULT_PORT = 1884 DEFAULT_TIMEOUT = 5000 _underscorer1 = re.compile(r'(.)([A-Z][a-z]+)') _underscorer2 = re.compile('([a-z0-9])([A-Z])') def set_timeout(callback, timeout=DEFAULT_TIMEOUT): Timer(timeout/1000.0, callback).start() def camel_to_snake(s): # https://gist.github.com/jaytaylor/3660565 subbed = _underscorer1.sub(r'\1_\2', s) return _underscorer2.sub(r'\1_\2', subbed).lower() def get_class_name(self): safe_chars = '~@#$&()*!+=:;,.?/\'' return urllib.parse.quote(camel_to_snake(self.__class__.__name__), safe=safe_chars) def get_receiver(payload): return _.get(payload, "__head__.plugin_name") def wrap_data(key, value, name, version): msg = {} if (_.is_object(value) and value is not None): msg = value else: msg[key] = value _.set_(msg, "__head__.plugin_name", name) _.set_(msg, "__head__.plugin_version", version) return msg def channel_to_route_path(channel): return channel def channel_to_subscription(channel): return re.sub(r"\{(.+?)\}", "+", channel) def generate_client_id(name, app_name, path='unknown'): return f'{name}>>{path}>>{app_name}>>{uuid.uuid1()}-{uuid.uuid4()}' def safe(loop): def __safe(function): @functools.wraps(function) def _safe(*args, **kwargs): loop.call_soon_threadsafe(function, *args, **kwargs) return _safe return __safe # Connection state mqtt_cs_new = 0 mqtt_cs_connected = 1 mqtt_cs_disconnecting = 2 mqtt_cs_connect_async = 3 class MicropedeClient(Topics): """ Python based client for Micropede Application Framework Used with the following broker: https://github.com/sci-bots/microdrop-3.0/blob/master/MoscaServer.js """ def __init__(self, app_name, host="localhost", port=None, name=None, version='0.0.0'): if (app_name is None): raise("app_name is undefined") if (port is None): port = 1884 if (name is None): name = get_class_name(self) self.router = PathRouter() client_id = generate_client_id(name, app_name) self.__listen = _.noop self.app_name = app_name self.client_id = client_id self.name = name self.subscriptions = [] self.host = host self.port = port self.version = version self.last_message = None self.loop = asyncio.get_event_loop() self.safe = safe(self.loop) self.wait_for = self.loop.run_until_complete self.client = None self.wait_for(self.connect_client(client_id, host, port)) @property def is_plugin(self): return not _.is_equal(self.listen, _.noop) @property def listen(self): return self.__listen @listen.setter def listen(self, val): self.__listen = val def exit(): pass def add_binding(self, channel, event, retain=False, qos=0, dup=False): return self.on(event, lambda d: self.send_message( channel, d, retain, qos, dup)) def add_subscription(self, channel, handler): path = channel_to_route_path(channel) sub = channel_to_subscription(channel) route_name = f'{uuid.uuid1()}-{uuid.uuid4()}' future = asyncio.Future() try: if self.client._state != mqtt_cs_connected: if (future.done() == False): future.set_exception(Exception( f'Failed to add subscription. '+ +'Client is not connected {self.name}, {self.channel}' )) return future def add_sub(*args, **kwargs): self.client.on_unsubscribe = _.noop def on_sub(client, userdata, mid, granted_qos): self.client.on_subscribe = _.noop if (future.done() == False): future.set_result('done') self.client.on_subscribe = self.safe(on_sub) self.client.subscribe(sub) if sub in self.subscriptions: self.client.on_unsubscribe = self.safe(add_sub) self.client.unsubscribe(sub) else: self.subscriptions.append(sub) self.router.add_route(path, handler) add_sub() except Exception as e: if (future.done() == False): future.set_exception(e) return future def remove_subscription(self, channel): sub = channel_to_subscription(channel) future = asyncio.Future() def on_unsub(client, userdata, mid): self.client.on_unsubscribe = _.noop _.pull(self.subscriptions, sub) if (future.done() == False): future.set_result('done') self.client.unsubscribe(sub) return future def _get_subscriptions(self, payload, name): LABEL = f'{this.app_name}::get_subscriptions' return this.notify_sender(payload, this.subscriptions, 'get-subscriptions') def notify_sender(payload, response, endpoint, status='success'): if (satus != 'success'): response = _.flatten_deep(response) receiver = get_receiver(payload) if (receiver is not None): return receiver self.send_message( f'{this.app_name}/{this.name}/notify/{receiver}/{endpoint}', wrap_data(None, {'status': status, 'response': response}, self.name, self.version) ) return response def connect_client(self, client_id, host, port, timeout=DEFAULT_TIMEOUT): self.client = mqtt.Client(client_id) future = asyncio.Future() def on_connect(client, userdata, flags, rc): if future.done(): return self.subscriptions = [] if self.is_plugin: def on_done_1(d): def on_done_2(d): if future.done(): return self.listen() self.default_sub_count = len(self.subscriptions) self.client.on_disconnect = self.safe(self.exit) future.set_result('done') f2 = self.on_trigger_msg("exit", self.safe(self.exit)) f2.add_done_callback(self.safe(on_done_2)) f1 = self.on_trigger_msg("get-subscriptions", self.safe(self._get_subscriptions)) f1.add_done_callback(self.safe(on_done_1)) else: self.listen() self.default_sub_count = 0 if (future.done() == False): future.set_result('done') self.client.on_connect = self.safe(on_connect) self.client.on_message = self.safe(self.on_message) self.client.connect(host=self.host, port=self.port) self.client.loop_start() def on_timeout(): if (future.done() == False): future.set_exception(Exception(f'timeout {timeout}ms')) set_timeout(self.safe(on_timeout), timeout) return future def disconnect_client(self, timeout=DEFAULT_TIMEOUT): future = asyncio.Future() self.subscriptions = [] self.router = PathRouter() def off(): if hasattr(self, '_on_off_events'): del self._on_off_events if (_.get(self, 'client._state') != mqtt_cs_connected): off() if (hasattr(self, 'client')): del self.client future.set_result('done') return future else: def on_disconnect(*args, **kwargs): if future.done(): return off() if (hasattr(self, 'client')): del self.client future.set_result('done') if hasattr(self, 'client'): self.client.on_disconnect = self.safe(on_disconnect) self.client.disconnect() set_timeout(self.safe(on_disconnect), timeout) else: if (future.done() == False): future.set_result('done') return future def on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload) except ValueError: print("Message contains invalid json") print(f'topic: {msg.topic}') payload = None topic = msg.topic if (topic is None or topic is ''): return method, args = self.router.match(topic) if method: method(payload, args) def send_message(self, topic, msg={}, retain=False, qos=0, dup=False, timeout=DEFAULT_TIMEOUT): future = asyncio.Future() if (_.is_dict(msg) and _.get(msg, '__head__') is None): head = wrap_data(None, None, self.name, self.version)['__head__'] _.set_(msg, '__head__', head) message = json.dumps(msg) _mid = None def on_publish(client, userdata, mid): if (mid == _mid): if (future.done() == False): future.set_result('done') def on_timeout(): if (future.done() == False): future.set_exception(Exception(f'timeout {timeout}ms')) self.client.on_publish = self.safe(on_publish) (qos, _mid) = self.client.publish(topic, payload=message, qos=qos, retain=retain) set_timeout(self.safe(on_timeout), timeout) return future async def set_state(self, key, value): topic = f'{self.app_name}/{self.name}/state/{key}'; await self.send_message(topic, value, True, 0, False) PK!HxQP micropede-0.0.27.dist-info/WHEEL1 0 RZq+D-Dv;_[*7Fp 8MRq%_:==ߘPT PK!Hlr#micropede-0.0.27.dist-info/METADATA}OK1[PTjC+tM&M& ~3 ol|-Tƣ ".pxoҤ#cf@a)ϝ(9V9̓Y a M?J5MZuuSW޸al\\,vT׿e"?3ۈ>D]8)`}i) @7|PK!HiP!micropede-0.0.27.dist-info/RECORD}90޳DRLh b