PK!K CHANGELOG.md# Changelog All notable changes to this project will be documented in this file. This project follows [Semantic Versioning](https://semver.org/). ## [0.7.16] -- Pending ## [0.7.15] - Switch to `poetry` from `pipenv` for dependency management; allowed removal of __version.py__, setup.py, setup.cfg, and of course Pipfile* ## [0.7.14] - Support multiple elk instances in same process by associating _message_handlers, _sync_handlers, and the description processing code with elk object rather than as file globals. - Moved message encoders/decoders into new MessageDecode class to facilitate the above. ## [0.7.13] - Fix updating counter value - Add alarm memory processing ## [0.7.12] - Add system_trouble_status decoding. ## [0.7.11] - Add pytz as dependency - Add KC (keypress) handling ## [0.7.10] - Add level() helper to control lights - Remove turn_on and turn_off for lights ## [0.7.9] - Add time of change to IC message. The will force callbacks to be called. Useful when multiple IC messages with same content come in. This way the app using the callback can record the multiple attempts. - Add username utility function to get a user's name from an user number ## [0.7.8] - Add zt decode and helper - Add dm helper - Fix speak word. Was using wrong Elk message code. ## [0.7.7] - Add 30 second timeout to create connection - lint cleanups ## [0.7.6] - Set user name to default name if not configured - Enhance IC decode to handle prox - IC handler now saves user code - Heartbeat bug; connect was called on heartbeat timeout AND then again on disconnect callback - Made log messages on connect/disconnect/error clearer and more consistent - Change max time on connection retry to 60 seconds (was 120 seconds) - Tidied up a couple of comments ## [0.7.5] - Add heartbeat ## [0.7.4] - Fix triggered_alarm attr to zone - Add support for system trouble status (attached to panel) ## [0.7.3] - Add triggered_alarm attr to zone ## [0.7.2] - changed callback now dict instead of list ## [0.7.1] - Fix bug on changed callback ## [0.7.0] - Breaking change in attr changed callback ## [0.6.1] - Tweak to as_dict - ## [0.6.0] - Rename package from elkm1 to elkm1_lib (due to conflicts with HASS component) ## [0.5.0] - Many small fixes. - Fix lint errors. - Fix couple of syntax errors (speak_work for example) - Changed a number of initial values to make it easier to use library (examples are temperatures, initial area numbers) ## [0.4.9] - Allow lowercase and single digit housecodes ## [0.4.8] - Add formating to version strings - Lower retry max time on connection lost - Make `make clean` clean cleanerer ## [0.4.7] - Change connect/reconnect to not block for HASS - Update README with section on development setup ## [0.4.6] - Change ee_decode to return str for armed status (was int) ## [0.4.5] - Fixed typos ## [0.4.4] - Add entry/exit message handling ## [0.4.3] - Speak word, speak phrase helpers added (no constants for words/phrases) ## [0.4.2] - Add handling for ST (temperature) messages - Users were disabled and not working; enabled and fixed - Retrieve temperatures on startup added ## [0.4.1] - Add serial io dependency ## [0.4.0] - Breaking change: no longer need to call elk.loop.run_until_complete(elk.connect()); now call elk.connect() - Retrieve counter values that have a description - Fix for HASS Recorder errors about not being JSON serializable ## [0.3.7] - Add asyncio serial support - Add test program to test serial support (bin/test-serial); requires data file that can be grokked from debug output of bin/elk - Add proper command line parsing for bin/elk including URL as param - URL can also be read from environment variable ELKM1_URL - Add reconnect logic with exponetial backoff - Add pause writing to Elk when in remote programming mode - Add const for format of setting - Add const for RP mode - Change thermostat const to match semantics of other const (e.g.: FAN_ON changed to ON, MODE_AUTO to AUTO, etc). Makes for better formatting when using pretty_const ## [0.3.6] - Start of cleanup of cmdr.py; using attrs lib - Change return on pretty_const to string (was tuple) - First cut of adding disconnect handling - Reconnect to be handler by client; will get a callback on disconnect - Fix alarm armup/state/etc to char from int ## [0.3.5] - Lint cleanup - Constants now all uppercase with underscores (can use pretty_const to print) - Added encode for az, decode for AZ (no handler yet) ## [0.3.4] Fix syntax; more descriptive docstrings ## [0.3.3] - Make elk attribute in Element private - Allow separator in default_name() to be specified - Add constants for arming, alarms ## [0.3.1] - Add elk attribute to Element ## [0.3.0] - Add NS, NZ to no plans to implement list - Make default name 1-based - Add helpers to Output - Add helpers to Light - Add helper to Counter - Add helper to Task - Add helpers to Area - Add handler for task change (TC) message - Add thermostat setting - Add helper for Setting (custom value) - Added const for thermostat setting - Added pretty_const (see test_util.py for API) ## [0.2.1] - 2018-04-13 - Add display message encoder ## [0.2.0] - 2018-04-13 - Add thermostats (not Omnistat) ## [0.1.1] - 2018-04-08 ### Added - MANIFEST.in to include bin files and LICENSE in distro ### Changed - Makefile updated to include build and upload rules ## [0.1.0] - 2018-04-07 ### Added - Initial version, see README.rst for overview of project. ### Changed ### Removed PK!@n/n/ bin/cmdr.py""" Created on Aug 2, 2015 From: https://github.com/izderadicka/xmpp-tester/blob/master/commander.py @author: ivan """ from collections import deque import inspect import re import threading import traceback import urwid from importlib import import_module import attr import elkm1_lib.message @attr.s class Command: function = attr.ib() help= attr.ib() docs = attr.ib() def parse_subcommand(line): tokens = line.split() if len(tokens) == 0: return (None, None) cmd = tokens[0].lower() args = tokens[1:] converted = [] for arg in args: try: i = int(arg) converted.append(i) except ValueError: converted.append(arg) return (cmd, converted) def parse_range(rng, max): # e.g. rng = "<3, 45, 46, 48-51, 77" if rng.strip() == '*': return range(0, max) ids = [] for x in map(str.strip,rng.split(',')): if x.isdigit(): ids.append(int(x)) elif x[0] == '<': ids.extend(range(0,int(x[1:]))) elif '-' in x: xr = [s.strip() for s in x.split('-')] ids.extend(range(int(xr[0]),int(xr[1])+1)) else: raise ValueError('Unknown range type: "%s"'%x) return ids def parse_element_command(cmd, line, max): match = re.match( '([\d,\- <*]+)(\w*.*)', line) ids = parse_range(match.groups()[0], max) subcommand = parse_subcommand(match.groups()[1]) return (ids, subcommand[0], subcommand[1]) def find_class(module, class_name): for class_ in inspect.getmembers(module, inspect.isclass): if class_[0] == class_name: return class_ return None def get_helpers(element, clas): helpers = {} class_ = find_class(import_module('elkm1_lib.' + element), clas) if class_: for function_name, fn in inspect.getmembers(class_[1], inspect.isfunction): if fn.__doc__ and fn.__doc__.startswith('(Helper)'): params = [p for p in inspect.signature(fn).parameters] if params[0] == 'self': del params[0] params = ' '.join(['<'+p+'>' for p in params]) helpers[function_name] = (fn, '{} {}'.format(function_name,params), fn.__doc__[8:]) return helpers class Commands(): def __init__(self, elk): self.elk = elk self._quit_cmd = ['q', 'quit', 'exit'] self._help_cmd = ['?', 'help', 'h'] self.encode_cmds = {} for fn_name, fn in inspect.getmembers(elkm1_lib.message, inspect.isfunction): if not fn_name.endswith('_encode'): continue cmd = fn_name[0:2] params = [p for p in inspect.signature(fn).parameters] params = ' '.join(['<'+p+'>' for p in params]) self.encode_cmds[cmd] = Command(fn, '{} {}'.format(cmd, params), fn.__doc__) self.element_cmds = {} self.subcommands = {} for element in elk.element_list: if element == 'panel': fn = self.panel_print cmd = element else: fn = self.print_elements cmd = element[:-1] self.element_cmds[cmd] = (fn, '{} [subcommand]'.format(cmd), 'Displays internal state of {}'.format(element), get_helpers(element, cmd.capitalize())) def __call__(self, line): tokens = line.split() cmd = tokens[0].lower() args = tokens[1:] if cmd in self._quit_cmd: return Commander.Exit print("#blue#{}".format(line)) if cmd in self._help_cmd: return self.help(cmd, args) if cmd in self.encode_cmds: return self.encoder(cmd, args) if cmd == 'recv': return self.elk._got_data(' '.join(args)) if cmd in self.element_cmds: return self.element_cmds[cmd][0](cmd, args) return '#error#Unknown command: {}'.format(cmd) def help(self, cmd, args): if len(args) == 0: res = '#green#Type "[?|h|help] " to get more help\n' res += 'Type "[q|quit|exit]" to quit program\n' res += 'Element display commands:\n {}\n\n'.format( ' '.join(list(self.element_cmds.keys()))) cl = [fnname for fnname in self.encode_cmds] res += 'Send message commands:\n {}\n'.format( ' '.join(sorted(cl))) else: help_for = args[0] if help_for in self.encode_cmds: command = self.encode_cmds[help_for] res = "#green#{}\n{}".format(command.help, command.docs) elif help_for in self.element_cmds: res = "#green#{}\n{}".format(self.element_cmds[help_for][1], self.element_cmds[help_for][2]) for k, v in self.element_cmds[help_for][3].items(): res += '\nSubcommand: {}\n{}'.format(v[1], v[2]) else: res = "#error#Unknown command: {}".format(help_for) return res def print_elements(self, cmd, args): element_list = getattr(self.elk, cmd+'s', None) args = parse_element_command(cmd, " ".join(args), element_list.max_elements) if args[1]: if args[1] in self.element_cmds[cmd][3]: fn = self.element_cmds[cmd][3][args[1]][0] else: raise NotImplemented for i in args[0]: print(fn) fn(element_list[i], *args[2]) else: for i in args[0]: print(element_list[i]) def panel_print(self, cmd, args): print(self.elk.panel) def encoder(self, cmd, args): converted = [] for arg in args: try: i = int(arg) converted.append(i) except ValueError: converted.append(arg) self.elk.send(self.encode_cmds[cmd].function(*converted)) class FocusMixin(object): def mouse_event(self, size, event, button, x, y, focus): if focus and hasattr(self, '_got_focus') and self._got_focus: self._got_focus() return super(FocusMixin,self).mouse_event(size, event, button, x, y, focus) class ListView(FocusMixin, urwid.ListBox): def __init__(self, model, got_focus, max_size=None): urwid.ListBox.__init__(self,model) self._got_focus=got_focus self.max_size=max_size self._lock=threading.Lock() def mouse_event(self, size, event, button, x, y, focus): direction = 'up' if button == 4 else 'down' return super(ListView,self).keypress(size, direction) def add(self,line): with self._lock: was_on_end=self.get_focus()[1] == len(self.body)-1 if self.max_size and len(self.body)>self.max_size: del self.body[0] self.body.append(urwid.Text(line)) last=len(self.body)-1 if was_on_end: self.set_focus(last,'above') class Input(FocusMixin, urwid.Edit): signals=['line_entered'] def __init__(self, got_focus=None): urwid.Edit.__init__(self) self.history=deque(maxlen=1000) self._history_index=-1 self._got_focus=got_focus def keypress(self, size, key): if key=='enter': line=self.edit_text.strip() if line: urwid.emit_signal(self,'line_entered', line) self.history.append(line) self._history_index=len(self.history) self.edit_text=u'' if key=='up': self._history_index-=1 if self._history_index< 0: self._history_index= 0 else: self.edit_text=self.history[self._history_index] if key=='down': self._history_index+=1 if self._history_index>=len(self.history): self._history_index=len(self.history) self.edit_text=u'' else: self.edit_text=self.history[self._history_index] else: urwid.Edit.keypress(self, size, key) class Commander(urwid.Frame): """Simple terminal UI with command input on bottom line and display frame above similar to chat client etc. Initialize with your Command instance to execute commands and the start main loop Commander.loop(). You can also asynchronously output messages with Commander.output('message') """ class Exit(object): pass PALLETE=[('reversed', urwid.BLACK, urwid.LIGHT_GRAY), ('normal', urwid.LIGHT_GRAY, urwid.BLACK), ('error', urwid.LIGHT_RED, urwid.BLACK), ('green', urwid.DARK_GREEN, urwid.BLACK), ('blue', urwid.LIGHT_BLUE, urwid.BLACK), ('magenta', urwid.DARK_MAGENTA, urwid.BLACK), ] def __init__(self, title, command_caption='Command: (Tab to switch focus to upper frame, where you can scroll text)', cmd_cb=None, max_size=1000): self.header=urwid.Text(title) self.model=urwid.SimpleListWalker([]) self.body=ListView(self.model, lambda: self._update_focus(False), max_size=max_size ) self.input=Input(lambda: self._update_focus(True)) foot=urwid.Pile([urwid.AttrMap(urwid.Text(command_caption), 'reversed'), urwid.AttrMap(self.input,'normal')]) urwid.Frame.__init__(self, urwid.AttrWrap(self.body, 'normal'), urwid.AttrWrap(self.header, 'reversed'), foot) self.set_focus_path(['footer',1]) self._focus=True urwid.connect_signal(self.input,'line_entered',self.on_line_entered) self._cmd=cmd_cb self._output_styles=[s[0] for s in self.PALLETE] self.eloop=None def loop(self, handle_mouse=False): # self.eloop=urwid.MainLoop(self, self.PALLETE, handle_mouse=handle_mouse) # self._eloop_thread=threading.current_thread() # self.eloop.run() import asyncio evl = urwid.AsyncioEventLoop(loop=asyncio.get_event_loop()) self.eloop = urwid.MainLoop(self, self.PALLETE, event_loop=evl, handle_mouse=True) self._eloop_thread=threading.current_thread() self.eloop.run() def on_line_entered(self,line): if self._cmd: try: res = self._cmd(line) except Exception as e: traceback.print_exc() self.output('Error: %s'%e, 'error') return if res==Commander.Exit: raise urwid.ExitMainLoop() elif res: self.output(str(res)) else: if line in ('q','quit','exit'): raise urwid.ExitMainLoop() else: self.output(line) def output(self, line, style=None): match = re.search(r'^#(\w+)#(.*)', line, re.DOTALL) if match and match.group(1) in self._output_styles: line = (match.group(1), match.group(2)) elif style and style in self._output_styles: line=(style,line) self.body.add(line) # since output could be called asynchronously form other threads # we need to refresh screen in these cases if self.eloop and self._eloop_thread != threading.current_thread(): self.eloop.draw_screen() def _update_focus(self, focus): self._focus=focus def switch_focus(self): if self._focus: self.set_focus('body') self._focus=False else: self.set_focus_path(['footer',1]) self._focus=True def keypress(self, size, key): if key=='tab': self.switch_focus() return urwid.Frame.keypress(self, size, key) PK!  bin/elk#!/usr/bin/env python import argparse import asyncio import io import os import logging import sys import textwrap import traceback from elkm1_lib import Elk from elkm1_lib.message import MessageEncode from elkm1_lib.const import Max import cmdr LOG = logging.getLogger(__name__) class StdOutWrapper: def __init__(self, cmdr): sys.stdout = self sys.stderr = self self.cmdr = cmdr self.log = None #self.log = open('elk.log','w') def write(self,txt): txt = txt.rstrip() if len(txt) > 0: self.cmdr.output(txt) if self.log: print(txt, file=self.log) def flush(self): pass def _unknown_handler(msg_code, data): LOG.debug("No decoder for message type: %s Contents: %s", msg_code, data) def _timeout_handler(msg_code): LOG.debug("Timeout waiting for '%s'", msg_code) class SmartFormatter(argparse.HelpFormatter): def _split_lines(self, text, width): if text.startswith('R|'): return text[2:].splitlines() return argparse.HelpFormatter._split_lines(self, text, width) def parse_args(): parser = argparse.ArgumentParser("elk", formatter_class=SmartFormatter) parser.add_argument('-i', '--interactive', action='store_true', default=False, dest='interactive', help='Run in interactive mode (type help for more info)') parser.add_argument('-u', '--url', action='store', dest='url', help=("R|URL to connect to in one of the following formats:\n" " elk://host[:port] -- connect to Elk over Ethernet\n" " elks://host[:port] -- securely connect to Elk over Ethernet\n" " serial://port[:baud] -- connect to Elk over serial port\n")) parser.add_argument('--userid', action='store', dest='userid', help='Userid when using secure connection (elks://)') parser.add_argument('--password', action='store', dest='password', help='Password when using secure connection (elks://)') results = parser.parse_args() return results def main(): config = {} args = parse_args() url = args.url if args.url else os.environ.get('ELKM1_URL') if url: config['url'] = url config['userid'] = args.userid config['password'] = args.password # config['element_list'] = ['panel'] elk = Elk(config) if args.interactive: c=cmdr.Commander('Elk console', cmd_cb=cmdr.Commands(elk)) mystdout = StdOutWrapper(c) logging.basicConfig(stream=mystdout, level=logging.DEBUG, format='%(message)s') else: logging.basicConfig(level=logging.DEBUG, format='%(message)s') try: elk.add_handler('unknown', _unknown_handler) elk.add_handler('timeout', _timeout_handler) elk.connect() if args.interactive: c.loop() else: elk.run() except KeyboardInterrupt: exit(0) if __name__ == '__main__': main() PK!lQQ bin/mkdoc#!/usr/bin/env python import inspect import re from elkm1_lib.const import MESSAGE_MAP import elkm1_lib.message def main(): """Create markdown doc with messages that have encoders/decoders""" no_plans_to_implement = {'AP', 'ar', 'AR', 'at', 'AT', 'ca', 'CA', 'cd', 'CD', 'ds', 'DS', 'DK', 'ip', 'IP', 'ir', 'IR', 'NS', 'NZ', 'RE', 'rr', 'RR', 'rs', 'ua', 'UA', 'XB', 'xk'} # There is one "magic" encoder 'al' which is used to encode # messages a0-a8 encoders = {'a0','a1','a2','a3','a4','a5','a6','a7','a8','a9','a:'} for fn_name, fn in inspect.getmembers(elkm1_lib.message, inspect.isfunction): if not fn_name.endswith('_encode'): continue encoders.add(fn_name[0:2]) decoders = set() for fn_name, fn in inspect.getmembers(elkm1_lib.message, inspect.isfunction): if not re.match(r'_\w\w_decode', fn_name): continue decoders.add(fn_name[1:3].upper()) message_codes = sorted(MESSAGE_MAP.keys(), key=lambda s: s.lower()) messages_done = 0 print('Msg | Status | Description') print('----|--------|------------') for msg_code in message_codes: if msg_code in encoders or msg_code in decoders: status = '✓' messages_done += 1 elif msg_code in no_plans_to_implement: status = '✗' else: status = '' print('{msg_code:4s}| {status:5s}| {desc}'.format( msg_code=msg_code, status=status, desc=MESSAGE_MAP[msg_code])) done = messages_done + len(no_plans_to_implement) print('\n{d} of {t} messages implemented or no plans to implement'.format( d=done, t=len(message_codes))) print('Remaining {l} messages will be implemented based on most requested' .format(l=len(message_codes)-done)) if __name__ == '__main__': main() PK!3L bin/simple#!/usr/bin/env python from elkm1_lib import Elk import logging import os LOG = logging.getLogger(__name__) def main(): logging.basicConfig(level=logging.DEBUG, format='%(message)s') try: url = os.environ.get('ELKM1_URL') if not url: print("Specify url to connect to in ELKM1_URL environment variable") exit(0) elk = Elk({'url': url}) elk.connect() elk.run() except KeyboardInterrupt: exit(0) if __name__ == "__main__": main() PK!~bin/test-serial#!/usr/bin/env python import os import termios import tty responses = {} with open('test-serial.data') as fp: line1 = fp.readline() while line1: line2 = fp.readline() if not line2: break (cmd1, data1) = line1.split(None, 1) (cmd2, data2) = line2.split(None, 1) if cmd1 == 'write_data' and cmd2 == 'got_data': responses[data1.strip().strip("'")] = data2.strip().strip("'") line1 = fp.readline() master, slave = os.openpty() # tty.setraw(master, termios.TCSANOW) print("Connect to:", os.ttyname(slave)) while True: try: data = os.read(master, 10000) except OSError: break if not data: break data = data.strip().decode('ISO-8859-1') print("got_data: '{}'".format(data)) if data in responses: os.write(master, (responses[data] + '\r\n').encode('ISO-8859-1')) print("write_data: '{}'".format(responses[data])) PK!ESB))elkm1_lib/__init__.py"""ElkM1 library""" from .elk import Elk PK!I  elkm1_lib/areas.py"""Definition of an ElkM1 Area""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import as_encode, az_encode, al_encode, dm_encode class Area(Element): """Class representing an Area""" def __init__(self, index, elk): super().__init__(index, elk) self.armed_status = None self.arm_up_state = None self.alarm_state = None self.alarm_memory = None self.is_exit = False self.timer1 = 0 self.timer2 = 0 def arm(self, level, code): """(Helper) Arm system at specified level (away, vacation, etc)""" self._elk.send(al_encode(level, self._index, code)) def disarm(self, code): """(Helper) Disarm system.""" self.arm(0, code) def display_message(self, clear, beep, timeout, line1, line2): """Display a message on all of the keypads in this area.""" self._elk.send( dm_encode(self._index, clear, beep, timeout, line1, line2) ) class Areas(Elements): """Handling for multiple areas""" def __init__(self, elk): super().__init__(elk, Area, Max.AREAS.value) elk.add_handler('AM', self._am_handler) elk.add_handler('AS', self._as_handler) elk.add_handler('EE', self._ee_handler) def sync(self): """Retrieve areas from ElkM1""" self.elk.send(as_encode()) self.get_descriptions(TextDescriptions.AREA.value) def _am_handler(self, alarm_memory): for area in self.elements: area.setattr('alarm_memory', alarm_memory[area.index], True) def _as_handler(self, armed_statuses, arm_up_states, alarm_states): update_alarm_triggers = False for area in self.elements: area.setattr('armed_status', armed_statuses[area.index], False) area.setattr('arm_up_state', arm_up_states[area.index], False) if area.alarm_state != alarm_states[area.index] or \ alarm_states[area.index] != '0': update_alarm_triggers = True area.setattr('alarm_state', alarm_states[area.index], True) if update_alarm_triggers: self.elk.send(az_encode()) # pylint: disable=too-many-arguments def _ee_handler(self, area, is_exit, timer1, timer2, armed_status): area = self.elements[area] area.setattr('armed_status', armed_status, False) area.setattr('timer1', timer1, False) area.setattr('timer2', timer2, False) area.setattr('is_exit', is_exit, True) PK!څ8%%elkm1_lib/const.py""" Constants used across package """ from enum import Enum class Max(Enum): """Max number of elements on the panel""" AREAS = 8 COUNTERS = 64 KEYPADS = 16 OUTPUTS = 208 SETTINGS = 20 TASKS = 32 THERMOSTATS = 16 USERS = 203 LIGHTS = 256 ZONES = 208 ZONE_TEMPS = 16 class ZoneType(Enum): """Types of Elk zones""" DISABLED = 0 BURGLAR_ENTRY_EXIT_1 = 1 BURGLAR_ENTRY_EXIT_2 = 2 BURGLAR_PERIMETER_INSTANT = 3 BURGLAR_INTERIOR = 4 BURGLAR_INTERIOR_FOLLOWER = 5 BURGLAR_INTERIOR_NIGHT = 6 BURGLAR_INTERIOR_NIGHT_DELAY = 7 BURGLAR24_HOUR = 8 BURGLAR_BOX_TAMPER = 9 FIRE_ALARM = 10 FIRE_VERIFIED = 11 FIRE_SUPERVISORY = 12 AUX_ALARM_1 = 13 AUX_ALARM_2 = 14 KEYFOB = 15 NON_ALARM = 16 CARBON_MONOXIDE = 17 EMERGENCY_ALARM = 18 FREEZE_ALARM = 19 GAS_ALARM = 20 HEAT_ALARM = 21 MEDICAL_ALARM = 22 POLICE_ALARM = 23 POLICE_NO_INDICATION = 24 WATER_ALARM = 25 KEY_MOMENTARY_ARM_DISARM = 26 KEY_MOMENTARY_ARM_AWAY = 27 KEY_MOMENTARY_ARM_STAY = 28 KEY_MOMENTARY_DISARM = 29 KEY_ON_OFF = 30 MUTE_AUDIBLES = 31 POWER_SUPERVISORY = 32 TEMPERATURE = 33 ANALOG_ZONE = 34 PHONE_KEY = 35 INTERCOM_KEY = 36 class ZonePhysicalStatus(Enum): """Zone physical status.""" UNCONFIGURED = 0 OPEN = 1 EOL = 2 SHORT = 3 class ZoneLogicalStatus(Enum): """Zone logical status.""" NORMAL = 0 TROUBLED = 1 VIOLATED = 2 BYPASSED = 3 class ArmedStatus(Enum): """Area arming status: armed status""" DISARMED = '0' ARMED_AWAY = '1' ARMED_STAY = '2' ARMED_STAY_INSTANT = '3' ARMED_TO_NIGHT = '4' ARMED_TO_NIGHT_INSTANT = '5' ARMED_TO_VACATION = '6' class ArmUpState(Enum): """Area arming status: Ability to arm""" NOT_READY_TO_ARM = '0' READY_TO_ARM = '1' CAN_BE_FORCE_ARMED = '2' ARMED_AND_EXIT_TIMER_RUNNING = '3' FULLY_ARMED = '4' FORCE_ARMED = '5' ARMED_WITH_BYPASS = '6' class AlarmState(Enum): """Area arming status: Current alarm state""" NO_ALARM_ACTIVE = '0' ENTRANCE_DELAY_ACTIVE = '1' ALARM_ABORT_DELAY_ACTIVE = '2' FIRE_ALARM = '3' MEDICAL_ALARM = '4' POLICE_ALARM = '5' BURGLAR_ALARM = '6' AUX_1_ALARM = '7' AUX_2_ALARM = '8' AUX_3_ALARM = '9' AUX_4_ALARM = ':' CARBON_MONOXIDE_ALARM = ';' EMERGENCY_ALARM = '<' FREEZE_ALARM = '=' GAS_ALARM = '>' HEAT_ALARM = '?' WATER_ALARM = '@' FIRE_SUPERVISORY = 'A' VERIFY_FIRE = 'B' class ArmLevel(Enum): """Levels for Arm/Disarm al_encode messages""" DISARM = '0' ARMED_AWAY = '1' ARMED_STAY = '2' ARMED_STAY_INSTANT = '3' ARMED_NIGHT = '4' ARMED_NIGHT_INSTANT = '5' ARMED_VACATION = '6' ARM_TO_NEXT_AWAY_MODE = '7' ARM_TO_NEXT_STAY_MODE = '8' FORCE_ARM_TO_AWAY_MODE = '9' FORCE_ARM_TO_STAY_MODE = ':' class ZoneAlarmState(Enum): """Alarm state for a zone""" NO_ALARM = '0' BURGLAR_ENTRY_EXIT_1 = '1' BURGLAR_ENTRY_EXIT_2 = '2' BURGLAR_PERIMETER_INSTANT = '3' BURGLAR_INTERIOR = '4' BURGLAR_INTERIOR_FOLLOWER = '5' BURGLAR_INTERIOR_NIGHT = '6' BURGLAR_INTERIOR_NIGHT_DELAY = '7' BURGLAR_24_HOUR = '8' BURGLAR_BOX_TAMPER = '9' FIRE_ALARM = ':' FIRE_VERIFIED = ';' FIRE_SUPERVISORY = '<' AUX_ALARM_1 = '=' AUX_ALARM_2 = '>' KEYFOB = '?' # not used NON_ALARM = '@' # not used CARBON_MONOXIDE = 'A' EMERGENCY_ALARM = 'B' FREEZE_ALARM = 'C' GAS_ALARM = 'D' HEAT_ALARM = 'E' MEDICAL_ALARM = 'F' POLICE_ALARM = 'G' POLICE_NO_INDICATION = 'H' WATER_ALARM = 'I' class KeypadKeys(Enum): """Keys on the keypad.""" STAR = 11 POUND = 12 F1 = 13 F2 = 14 F3 = 15 F4 = 16 STAY = 17 EXIT = 18 CHIME = 19 BYPASS = 20 ELK = 21 DOWN = 22 UP = 23 RIGHT = 24 LEFT = 25 F6 = 26 F5 = 27 DATA_KEY_MODE = 28 class TextDescriptions(Enum): """Types of description strings that can be retrieved from the panel""" ZONE = (0, Max.ZONES.value) AREA = (1, Max.AREAS.value) USER = (2, Max.USERS.value) KEYPAD = (3, Max.KEYPADS.value) OUTPUT = (4, 64) TASK = (5, Max.TASKS.value) TELEPHONE = 6 LIGHT = (7, Max.LIGHTS.value) ALARM_DURATION = 8 SETTING = (9, Max.SETTINGS.value) COUNTER = (10, Max.COUNTERS.value) THERMOSTAT = (11, Max.THERMOSTATS.value) FUNCTION_KEY_1 = 12 FUNCTION_KEY_2 = 13 FUNCTION_KEY_3 = 14 FUNCTION_KEY_4 = 15 FUNCTION_KEY_5 = 16 FUNCTION_KEY_6 = 17 AUDIO_ZONE = 18 AUDIO_SOURCE = 19 # Map to convert message code to descriptive string MESSAGE_MAP = { 'AM': "Alarm memory update", 'AP': "Send ASCII String", 'AR': "Alarm Reporting to Ethernet", 'AS': "Arming status report data", 'AT': "Ethernet Test to IP", 'AZ': "Alarm by zone reply", 'CA': "Reply Touchscreen audio command", 'CC': "Control output change update", 'CD': "Outgoing Audio Equip Command", 'CR': "Custom value report data", 'CS': "Control output status report data", 'CU': "Change user code reply", 'CV': "Counter Value Data", 'DK': "Display KP LCD Data, not used", 'DS': "Lighting Poll Response", 'EE': "Entry/Exit Time Data", 'EM': "Email Trigger to M1XEP", 'IC': "Send invalid user code digits", 'IE': "Installer program exited", 'IP': "M1XSP Insteon Program", 'IR': "M1XSP Insteon Read", 'KA': "Keypad areas report data", 'KC': "Keypad key change update", 'KF': "Function key pressed data", 'LD': "Log data with index", 'LW': "Reply temperature data", 'NS': "Reply Source Name", 'NZ': "Reply Zone Name", 'PC': "PLC change update", 'PS': "PLC status report data", 'RE': "Reset Ethernet Module", 'RP': "ELKRP connected", 'RR': "Real Time Clock Data", 'SD': "Text string description report data", 'SS': "System Trouble Status data", 'ST': "Temperature report data", 'T2': "Reply Omnistat 2 data", 'TC': "Task change update", 'TR': "Thermostat data report", 'UA': "User code areas report data", 'VN': "Reply Version Number of M1", 'XB': "reserved by ELKRP", 'XK': "Request Ethernet test", 'ZB': "Zone bypass report data", 'ZC': "Zone change update", 'ZD': "Zone definition report data", 'ZP': "Zone partition report data", 'ZS': "Zone status report data", 'ZV': "Zone analog voltage data", 'a0': "Disarm", 'a1': "Arm to away", 'a2': "Arm to stay", 'a3': "Arm to stay instant", 'a4': "Arm to night", 'a5': "Arm to night instant", 'a6': "Arm to vacation", 'a7': "Arm, step to next Away Mode", 'a8': "Arm, step to next Stay Mode", 'a9': "Force Arm to Away Mode", 'a:': "Force Arm to Stay Mode", 'ar': "Alarm Reporting Acknowledge", 'as': "Request arming status", 'at': "Ethernet Test Acknowledge", 'az': "Alarm by zone request", 'ca': "Request Touchscreen audio command", 'cd': "Incoming Audio Equip Command", 'cf': "Control output OFF", 'cn': "Control output ON", 'cp': "Request ALL custom values", 'cr': "Request custom value", 'cs': "Control output status request", 'ct': "Control output TOGGLE", 'cu': "Change user code request", 'cv': "Request Counter value", 'cw': "Write custom value data", 'cx': "Write counter value", 'dm': "Display message", 'ds': "Lighting Poll Request", 'ip': "M1XSP Insteon Program", 'ir': "M1XSP Insteon Read", 'ka': "Request keypad areas", 'kc': "Request F Key illumination status", 'kf': "Request simulated function key press", 'ld': "Request log data, with index", 'le': "Write Log Data Entry", 'lw': "Request temperature data", 'pc': "Control any PLC device", 'pf': "Turn OFF PLC device", 'pn': "Turn ON PLC device", 'ps': "Request PLC status", 'pt': "Toggle PLC device", 'rr': "Request Real Time Clock Read", 'rs': "Used by Touchscreen", 'rw': "Real Time Clock Write", 'sd': "Request text string descriptions", 'sp': "Speak phrase", 'ss': "Request System Trouble Status", 'st': "Request temperature", 'sw': "Speak word", 't2': "Request Omnistat 2 data", 'tn': "Task activation", 'tr': "Request thermostat data", 'ts': "Set thermostat data", 'ua': "Request user code areas", 'vn': "request Version Number of M1", 'xk': "Reply from Ethernet test", 'zb': "Zone bypass request", 'zd': "Request zone definition data", 'zp': "Zone partition request", 'zs': "Zone status request", 'zv': "Request Zone analog voltage", } class ThermostatSetting(Enum): """Thermostat consts when setting""" MODE = 0 HOLD = 1 FAN = 2 GET_TEMPERATURE = 3 COOL_SETPOINT = 4 HEAT_SETPOINT = 5 class ThermostatMode(Enum): """Thermostat modes""" OFF = 0 HEAT = 1 COOL = 2 AUTO = 3 EMERGENCY_HEAT = 4 class ThermostatFan(Enum): """Thermostat fan""" AUTO = 0 ON = 1 class ThermostatHold(Enum): """Thermostat hold""" OFF = 0 ON = 1 class SettingFormat(Enum): """Types of values for settings.""" NUMBER = 0 TIMER = 1 TIME_OF_DAY = 2 class ElkRPStatus(Enum): """Elk remote programming status.""" DISCONNECTED = 0 CONNECTED = 1 INITIALIZING = 2 PK!elkm1_lib/counters.py"""Definition of an ElkM1 Custom Value""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import cx_encode, cv_encode class Counter(Element): """Class representing an Counter""" def __init__(self, index, elk): super().__init__(index, elk) self.value = None def set(self, value): """(Helper) Set counter to value""" self._elk.send(cx_encode(self._index, value)) # pylint: disable=R0903 class Counters(Elements): """Handling for multiple counters""" def __init__(self, elk): super().__init__(elk, Counter, Max.COUNTERS.value) elk.add_handler('CV', self._cv_handler) def sync(self): """Retrieve values from ElkM1 on demand""" self.get_descriptions(TextDescriptions.COUNTER.value) def _got_desc(self, descriptions): super()._got_desc(descriptions) # Only poll counters that have a name defined for counter in self.elements: if not counter.is_default_name(): self.elk.send(cv_encode(counter.index)) def _cv_handler(self, counter, value): self.elements[counter].setattr('value', value, True) PK!1&&elkm1_lib/elements.py""" Base of all the elements found on the Elk panel... Zone, Keypad, etc. """ from abc import abstractmethod from .message import sd_encode class Element: """Element class""" def __init__(self, index, elk): self._index = index self._elk = elk self._callbacks = [] self.name = self.default_name() self._changeset = {} @property def index(self): """Get the index, immutable once class created""" return self._index def add_callback(self, callback): """Callbacks when attribute of element changes""" self._callbacks.append(callback) def remove_callback(self, callback): """Callbacks when attribute of element changes""" if callback in self._callbacks: self._callbacks.remove(callback) def _call_callbacks(self): """Callbacks when attribute of element changes""" for callback in self._callbacks: callback(self, self._changeset) self._changeset = {} def setattr(self, attr, new_value, close_the_changeset=True): """If attribute value has changed then set it and call the callbacks""" existing_value = getattr(self, attr, None) if existing_value != new_value: setattr(self, attr, new_value) self._changeset[attr] = new_value if close_the_changeset and self._changeset: self._call_callbacks() def default_name(self, separator='-'): """Return a default name for based on class and index of element""" return self.__class__.__name__ + '{}{:03d}'.format( separator, self._index+1) def is_default_name(self): """Check if the name assigned is the default_name""" return self.name == self.default_name() def __str__(self): varlist = {k: v for (k, v) in vars(self).items() if not k.startswith('_') and k != 'name'}.items() varstr = ' '.join("%s:%s" % item for item in varlist) return "{} '{}' {}".format(self._index, self.name, varstr) def as_dict(self): """Package up the public attributes as a dict.""" attrs = vars(self) return {key: attrs[key] for key in attrs if not key.startswith('_')} class Elements: """Base for list of elements.""" def __init__(self, elk, class_, max_elements): self.elk = elk self.max_elements = max_elements self.elements = [class_(i, elk) for i in range(max_elements)] self.elk.add_sync_handler(self.sync) def __iter__(self): for element in self.elements: yield element def __getitem__(self, key): return self.elements[key] def _got_desc(self, descriptions): for element in self.elements: if element.index >= len(descriptions): break if descriptions[element.index] is not None: element.setattr('name', descriptions[element.index], True) def get_descriptions(self, description_type): """ Gets the descriptions for specified type. When complete the callback is called with a list of descriptions """ (desc_type, max_units) = description_type results = [None] * max_units self.elk._descriptions_in_progress[desc_type] = (max_units, results, self._got_desc) self.elk.send(sd_encode(desc_type=desc_type, unit=0)) @abstractmethod def sync(self): """Synchronize elements""" pass PK!e{elkm1_lib/elk.py"""Master class that combines all ElkM1 pieces together.""" import asyncio from functools import partial import logging from importlib import import_module import serial_asyncio from .message import MessageDecode, sd_encode from .proto import Connection from .util import parse_url, url_scheme_is_secure LOG = logging.getLogger(__name__) class Elk: """Represents all the components on an Elk panel.""" def __init__(self, config, loop=None): """Initialize a new Elk instance.""" self.loop = loop if loop else asyncio.get_event_loop() self._config = config self._conn = None self._transport = None self.connection_lost_callbk = None self._connection_retry_timer = 1 self._message_decode = MessageDecode() self._sync_handlers = [] self._descriptions_in_progress = {} self._heartbeat = None self.add_handler('XK', self._xk_handler) self.add_handler('SD', self._sd_handler) # Setup for all the types of elements tracked if 'element_list' in config: self.element_list = config['element_list'] else: self.element_list = ['panel', 'zones', 'lights', 'areas', 'tasks', 'keypads', 'outputs', 'thermostats', 'counters', 'settings', 'users'] for element in self.element_list: self._create_element(element) def _create_element(self, element): module = import_module('elkm1_lib.'+element) class_ = getattr(module, element.capitalize()) setattr(self, element, class_(self)) async def _connect(self, connection_lost_callbk=None): """Asyncio connection to Elk.""" self.connection_lost_callbk = connection_lost_callbk url = self._config['url'] LOG.info("Connecting to ElkM1 at %s", url) scheme, dest, param, ssl_context = parse_url(url) conn = partial(Connection, self.loop, self._connected, self._disconnected, self._got_data, self._timeout) try: if scheme == 'serial': await serial_asyncio.create_serial_connection( self.loop, conn, dest, baudrate=param) else: await asyncio.wait_for(self.loop.create_connection( conn, host=dest, port=param, ssl=ssl_context), timeout=30) except (ValueError, OSError, asyncio.TimeoutError) as err: LOG.warning("Could not connect to ElkM1 (%s). Retrying in %d seconds", err, self._connection_retry_timer) self.loop.call_later(self._connection_retry_timer, self.connect) self._connection_retry_timer = 2 * self._connection_retry_timer \ if self._connection_retry_timer < 32 else 60 def _connected(self, transport, conn): """Login and sync the ElkM1 panel to memory.""" LOG.info("Connected to ElkM1") self._conn = conn self._transport = transport self._connection_retry_timer = 1 if url_scheme_is_secure(self._config['url']): self._conn.write_data(self._config['userid'], raw=True) self._conn.write_data(self._config['password'], raw=True) self.call_sync_handlers() if not self._config['url'].startswith('serial://'): self._heartbeat = self.loop.call_later(120, self._reset_connection) def _reset_connection(self): LOG.warning("ElkM1 connection heartbeat timed out, disconnecting") self._transport.close() self._heartbeat = None # pylint: disable=unused-argument def _xk_handler(self, real_time_clock): if not self._heartbeat: return self._heartbeat.cancel() self._heartbeat = self.loop.call_later(120, self._reset_connection) def _disconnected(self): LOG.warning("ElkM1 at %s disconnected", self._config['url']) self._conn = None self.loop.call_later(self._connection_retry_timer, self.connect) if self._heartbeat: self._heartbeat.cancel() self._heartbeat = None def add_handler(self, msg_type, handler): self._message_decode.add_handler(msg_type, handler) def _got_data(self, data): # pylint: disable=no-self-use LOG.debug("got_data '%s'", data) try: self._message_decode.decode(data) except (ValueError, AttributeError) as err: LOG.debug(err) def _timeout(self, msg_code): self._message_decode.timeout_handler(msg_code) def add_sync_handler(self, sync_handler): """Register a fn that synchronizes part of the panel.""" self._sync_handlers.append(sync_handler) def call_sync_handlers(self): """Invoke the synchronization handlers.""" LOG.debug("Synchronizing panel...") for sync_handler in self._sync_handlers: sync_handler() # pylint: disable=unused-argument def _sd_handler(self, desc_type, unit, desc, show_on_keypad): """Text description""" if desc_type not in self._descriptions_in_progress: LOG.debug("Text description response ignored for " + str(desc_type)) return (max_units, results, callback) = self._descriptions_in_progress[desc_type] if unit < 0 or unit >= max_units: callback(results) del self._descriptions_in_progress[desc_type] return results[unit] = desc self.send(sd_encode(desc_type=desc_type, unit=unit+1)) def is_connected(self): """Status of connection to Elk.""" return self._conn is not None def connect(self): """Connect to the panel""" asyncio.ensure_future(self._connect()) def run(self): """Enter the asyncio loop.""" self.loop.run_forever() def send(self, msg): """Send a message to Elk panel.""" if self._conn: self._conn.write_data(msg.message, msg.response_command) def pause(self): """Pause the connection from sending/receiving.""" if self._conn: self._conn.pause() def resume(self): """Restart the connection from sending/receiving.""" if self._conn: self._conn.resume() PK!4elkm1_lib/keypads.py"""Definition of an ElkM1 Keypad.""" import pytz import datetime as dt from .const import Max, TextDescriptions from .elements import Element, Elements from .message import ka_encode class Keypad(Element): """Class representing an Keypad""" def __init__(self, index, elk): super().__init__(index, elk) self.area = -1 self.temperature = -40 self.last_user_time = dt.datetime.now(pytz.UTC) self.last_user = -1 self.code = '' self.last_keypress = None class Keypads(Elements): """Handling for multiple areas""" def __init__(self, elk): super().__init__(elk, Keypad, Max.KEYPADS.value) elk.add_handler('IC', self._ic_handler) elk.add_handler('KA', self._ka_handler) elk.add_handler('KC', self._kc_handler) elk.add_handler('LW', self._lw_handler) elk.add_handler('ST', self._st_handler) def sync(self): """Retrieve areas from ElkM1""" self.elk.send(ka_encode()) self.get_descriptions(TextDescriptions.KEYPAD.value) # pylint: disable=unused-argument def _ic_handler(self, code, user, keypad): keypad_ = self.elements[keypad] # By setting a time this will force the IC change to always be reported keypad_.setattr('last_user_time', dt.datetime.now(pytz.UTC), False) # If user is negative then invalid code entered keypad_.setattr('code', code if user < 0 else '****', False) keypad_.setattr('last_user', user, True) def _ka_handler(self, keypad_areas): for keypad in self.elements: if keypad_areas[keypad.index] >= 0: keypad.setattr('area', keypad_areas[keypad.index], True) def _kc_handler(self, keypad, key): if key > 0: self.elements[keypad].setattr('last_keypress', key, True) # pylint: disable=unused-argument def _lw_handler(self, keypad_temps, zone_temps): for keypad in self.elements: if keypad_temps[keypad.index] > -40: keypad.setattr('temperature', keypad_temps[keypad.index], True) def _st_handler(self, group, device, temperature): if group == 1: self.elements[device].setattr('temperature', temperature, True) PK!UIelkm1_lib/lights.py"""Definition of an ElkM1 Light""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import ps_encode, pc_encode, pf_encode, pn_encode, pt_encode class Light(Element): """Class representing a Light""" def __init__(self, index, elk): super().__init__(index, elk) self.status = 0 def level(self, level, time=0): """(Helper) Set light to specified level""" if level <= 0: self._elk.send(pf_encode(self._index)) elif level >= 98: self._elk.send(pn_encode(self._index)) else: self._elk.send(pc_encode(self._index, 9, level, time)) def toggle(self): """(Helper) Toggle light""" self._elk.send(pt_encode(self._index)) class Lights(Elements): """Handling for multiple lights""" def __init__(self, elk): super().__init__(elk, Light, Max.LIGHTS.value) elk.add_handler('PC', self._pc_handler) elk.add_handler('PS', self._ps_handler) def sync(self): """Retrieve lights from ElkM1""" for i in range(4): self.elk.send(ps_encode(i)) self.get_descriptions(TextDescriptions.LIGHT.value) # pylint: disable=unused-argument def _pc_handler(self, housecode, index, light_level): self.elements[index].setattr('status', light_level, True) def _ps_handler(self, bank, statuses): for i in range(bank*64, (bank+1)*64): self.elements[i].setattr('status', statuses[i-bank*64], True) PK!v::elkm1_lib/message.py""" ElkM1 message encode/decode. Message format: LLMM00CC, where: LL - length in ASCII hex MM - message code - message contents; length varies by message code 00 - constant two characters reserved (almost all messages) CC - checksum The panel numbers zones, keypads, etc. in base 1. The interface of this module refers to them using base 0. Conversion to base 1 is done on sending and conversion to base 0 is done on receiving. So, base 0/1 conversion is encapsulated in this module. """ from collections import namedtuple import re from .const import Max MessageEncode = namedtuple('MessageEncode', ['message', 'response_command']) class MessageDecode: """Message decode and dispatcher.""" def __init__(self): """Initialize a new Message instance.""" self._handlers = {} def add_handler(self, message_type, handler): """Manage callbacks for message handlers.""" if message_type not in self._handlers: self._handlers[message_type] = [] if handler not in self._handlers[message_type]: self._handlers[message_type].append(handler) def decode(self, msg): """Decode an Elk message by passing to appropriate decoder""" _check_message_valid(msg) cmd = msg[2:4] decoder = getattr(self, '_{}_decode'.format(cmd.lower()), None) if not decoder: cmd = 'unknown' decoder = self._unknown_decode decoded_msg = decoder(msg) for handler in self._handlers.get(cmd, []): handler(**decoded_msg) def _am_decode(self, msg): """AM: Alarm memory by area report.""" return {'alarm_memory': [x for x in msg[4:4+Max.AREAS.value]]} def _as_decode(self, msg): """AS: Arming status report.""" return {'armed_statuses': [x for x in msg[4:12]], 'arm_up_states': [x for x in msg[12:20]], 'alarm_states': [x for x in msg[20:28]]} def _az_decode(self, msg): """AZ: Alarm by zone report.""" return {'alarm_status': [x for x in msg[4:4+Max.ZONES.value]]} def _cr_one_custom_value_decode(self, index, part): value = int(part[0:5]) value_format = int(part[5]) if value_format == 2: value = ((value >> 8) & 0xff, value & 0xff) return {'index': index, 'value': value, 'value_format': value_format} def _cr_decode(self, msg): """CR: Custom values""" if int(msg[4:6]) > 0: index = int(msg[4:6])-1 return {'values': [self._cr_one_custom_value_decode(index, msg[6:12])]} else: part = 6 ret = [] for i in range(Max.SETTINGS.value): ret.append(self._cr_one_custom_value_decode(i, msg[part:part+6])) part += 6 return {'values': ret} def _cc_decode(self, msg): """CC: Output status for single output.""" return {'output': int(msg[4:7])-1, 'output_status': msg[7] == '1'} def _cs_decode(self, msg): """CS: Output status for all outputs.""" output_status = [x == '1' for x in msg[4:4+Max.OUTPUTS.value]] return {'output_status': output_status} def _cv_decode(self, msg): """CV: Counter value.""" return {'counter': int(msg[4:6])-1, 'value': int(msg[6:11])} def _ee_decode(self, msg): """EE: Entry/exit timer report.""" return {'area': int(msg[4:5])-1, 'is_exit': msg[5:6] == '0', 'timer1': int(msg[6:9]), 'timer2': int(msg[9:12]), 'armed_status': msg[12:13]} def _ic_decode(self, msg): """IC: Send Valid Or Invalid User Code Format.""" code = msg[4:16] if re.match(r'(0\d){6}', code): code = re.sub(r'0(\d)', r'\1', code) return {'code': code, 'user': int(msg[16:19])-1, 'keypad': int(msg[19:21])-1} def _ie_decode(self, _msg): """IE: Installer mode exited.""" return {} def _ka_decode(self, msg): """KA: Keypad areas for all keypads.""" return {'keypad_areas': [ord(x)-0x31 for x in msg[4:4+Max.KEYPADS.value]]} def _kc_decode(self, msg): """KC: Keypad key change.""" return {'keypad': int(msg[4:6])-1, 'key': int(msg[6:8])} def _lw_decode(self, msg): """LW: temperatures from all keypads and zones 1-16.""" keypad_temps = [] zone_temps = [] for i in range(16): keypad_temps.append(int(msg[4+3*i:7+3*i]) - 40) zone_temps.append(int(msg[52+3*i:55+3*i]) - 60) return {'keypad_temps': keypad_temps, 'zone_temps': zone_temps} def _pc_decode(self, msg): """PC: PLC (lighting) change.""" housecode = msg[4:7] return {'housecode': housecode, 'index': housecode_to_index(housecode), 'light_level': int(msg[7:9])} def _ps_decode(self, msg): """PS: PLC (lighting) status.""" return {'bank': ord(msg[4]) - 0x30, 'statuses': [ord(x)-0x30 for x in msg[5:69]]} def _rp_decode(self, msg): """RP: Remote programming status.""" return {'remote_programming_status': int(msg[4:6])} def _sd_decode(self, msg): """SD: Description text.""" desc_ch1 = msg[9] show_on_keypad = ord(desc_ch1) >= 0x80 if show_on_keypad: desc_ch1 = chr(ord(desc_ch1) & 0x7f) return {'desc_type': int(msg[4:6]), 'unit': int(msg[6:9])-1, 'desc': (desc_ch1+msg[10:25]).rstrip(), 'show_on_keypad': show_on_keypad} def _ss_decode(self, msg): """SS: System status.""" return {'system_trouble_status': msg[4:-2]} def _st_decode(self, msg): """ST: Temperature update.""" group = int(msg[4:5]) temperature = int(msg[7:10]) if group == 0: temperature -= 60 elif group == 1: temperature -= 40 return {'group': group, 'device': int(msg[5:7])-1, 'temperature': temperature} def _tc_decode(self, msg): """TC: Task change.""" return {'task': int(msg[4:7])-1} def _tr_decode(self, msg): """TR: Thermostat data response.""" return {'thermostat_index': int(msg[4:6])-1, 'mode': int(msg[6]), 'hold': msg[7] == '1', 'fan': int(msg[8]), 'current_temp': int(msg[9:11]), 'heat_setpoint': int(msg[11:13]), 'cool_setpoint': int(msg[13:15]), 'humidity': int(msg[15:17])} def _vn_decode(self, msg): """VN: Version information.""" elkm1_version = "{}.{}.{}".format(int(msg[4:6], 16), int(msg[6:8], 16), int(msg[8:10], 16)) xep_version = "{}.{}.{}".format(int(msg[10:12], 16), int(msg[12:14], 16), int(msg[14:16], 16)) return {'elkm1_version': elkm1_version, 'xep_version': xep_version} def _xk_decode(self, msg): """XK: Ethernet Test.""" return {'real_time_clock': msg[4:20]} def _zb_decode(self, msg): """ZB: Zone bypass report.""" return {'zone_number': int(msg[4:7])-1, 'zone_bypassed': msg[7] == '1'} def _zc_decode(self, msg): """ZC: Zone Change.""" status = _status_decode(int(msg[7:8], 16)) return {'zone_number': int(msg[4:7])-1, 'zone_status': status} def _zd_decode(self, msg): """ZD: Zone definitions.""" zone_definitions = [ord(x)-0x30 for x in msg[4:4+Max.ZONES.value]] return {'zone_definitions': zone_definitions} def _zp_decode(self, msg): """ZP: Zone partitions.""" zone_partitions = [ord(x)-0x31 for x in msg[4:4+Max.ZONES.value]] return {'zone_partitions': zone_partitions} def _zs_decode(self, msg): """ZS: Zone statuses.""" status = [_status_decode(int(x, 16)) for x in msg[4:4+Max.ZONES.value]] return {'zone_statuses': status} def _zv_decode(self, msg): """ZV: Zone voltage.""" return {'zone_number': int(msg[4:7])-1, 'zone_voltage': int(msg[7:10])/10} def _unknown_decode(self, msg): """Generic handler called when no specific handler exists""" return {'msg_code': msg[2:4], 'data': msg[4:-2]} def timeout_handler(self, msg_code): """Called directly when a timeout happens when response not received""" for handler in self._handlers.get('timeout', []): handler({'msg_code': msg_code}) def housecode_to_index(housecode): """Convert a X10 housecode to a zero-based index""" match = re.search(r'^([A-P])(\d{1,2})$', housecode.upper()) if match: house_index = int(match.group(2)) if 1 <= house_index <= 16: return (ord(match.group(1)) - ord('A')) * 16 + house_index - 1 raise ValueError("Invalid X10 housecode: %s" % housecode) def index_to_housecode(index): """Convert a zero-based index to a X10 housecode.""" if index < 0 or index > 255: raise ValueError quotient, remainder = divmod(index, 16) return chr(quotient+ord('A')) + '{:02d}'.format(remainder+1) def get_elk_command(line): """Return the 2 character command in the message.""" if len(line) < 4: return '' return line[2:4] def _status_decode(status): """Decode a 1 byte status into logical and physical statuses.""" logical_status = (status & 0b00001100) >> 2 physical_status = status & 0b00000011 return (logical_status, physical_status) def _check_checksum(msg): """Ensure checksum in message is good.""" checksum = int(msg[-2:], 16) for char in msg[:-2]: checksum += ord(char) if (checksum % 256) != 0: raise ValueError("Elk message checksum invalid") def _check_message_valid(msg): """Check packet length valid and that checksum is good.""" try: if int(msg[:2], 16) != (len(msg) - 2): raise ValueError("Elk message length incorrect") _check_checksum(msg) except IndexError: raise ValueError("Elk message length incorrect") def al_encode(arm_mode, area, user_code): """al: Arm system. Note in 'al' the 'l' can vary""" return MessageEncode('0Da{}{:1d}{:06d}00'.format(arm_mode, area+1, user_code), 'AS') def as_encode(): """as: Get area status.""" return MessageEncode('06as00', 'AS') def az_encode(): """az: Get alarm by zone.""" return MessageEncode('06az00', 'AZ') def cf_encode(output): """cf: Turn off output.""" return MessageEncode('09cf{:03d}00'.format(output+1), None) def ct_encode(output): """ct: Toggle output.""" return MessageEncode('09ct{:03d}00'.format(output+1), None) def cn_encode(output, time): """cn: Turn on output.""" return MessageEncode('0Ecn{:03d}{:05d}00'.format(output+1, time), None) def cs_encode(): """cs: Get all output status.""" return MessageEncode('06cs00', 'CS') def cp_encode(): """cp: Get ALL custom values.""" return MessageEncode('06cp00', 'CR') def cr_encode(index): """cr: Get a custom value.""" return MessageEncode('08cr{cv:02d}00'.format(cv=index+1), 'CR') def cw_encode(index, value, value_format): """cw: Write a custom value.""" if value_format == 2: value = value[0] << 8 + value[1] return MessageEncode('0Dcw{:02d}{:05d}00'.format(index+1, value), None) def cv_encode(counter): """cv: Get counter.""" return MessageEncode('08cv{c:02d}00'.format(c=counter+1), 'CV') def cx_encode(counter, value): """cx: Change counter value.""" return MessageEncode('0Dcx{:02d}{:05d}00'.format(counter+1, value), 'CV') # pylint: disable=too-many-arguments def dm_encode(keypad_area, clear, beep, timeout, line1, line2): """dm: Display message on keypad.""" return MessageEncode( '2Edm{:1d}{:1d}{:1d}{:05d}{:^<16.16}{:^<16.16}00' .format(keypad_area+1, clear, beep, timeout, line1, line2), None ) def ka_encode(): """ka: Get keypad areas.""" return MessageEncode('06ka00', 'KA') def lw_encode(): """lw: Get temperature data.""" return MessageEncode('06lw00', 'LW') def pc_encode(index, function_code, extended_code, time): """pc: Control any PLC device.""" return MessageEncode('11pc{hc}{fc:02d}{ec:02d}{time:04d}00'. format(hc=index_to_housecode(index), fc=function_code, ec=extended_code, time=time), None) def pf_encode(index): """pf: Turn off light.""" return MessageEncode('09pf{}00'.format(index_to_housecode(index)), None) def pn_encode(index): """pn: Turn on light.""" return MessageEncode('09pn{}00'.format(index_to_housecode(index)), None) def ps_encode(bank): """ps: Get lighting status.""" return MessageEncode('07ps{:1d}00'.format(bank), 'PS') def pt_encode(index): """pt: Toggle light.""" return MessageEncode('09pt{}00'.format(index_to_housecode(index)), None) def sd_encode(desc_type, unit): """sd: Get description.""" return MessageEncode('0Bsd{:02d}{:03d}00'.format(desc_type, unit+1), 'SD') def sp_encode(phrase): """sp: Speak phrase.""" return MessageEncode('09sp{:03d}00'.format(phrase), None) def ss_encode(): """ss: Get system trouble status.""" return MessageEncode('06ss00', 'SS') def sw_encode(word): """sp: Speak word.""" return MessageEncode('09sw{:03d}00'.format(word), None) def tn_encode(task): """tn: Activate task.""" return MessageEncode('09tn{:03d}00'.format(task+1), None) def tr_encode(thermostat): """tr: Request thermostat data.""" return MessageEncode('08tr{:02d}00'.format(thermostat+1), None) def ts_encode(thermostat, value, element): """ts: Set thermostat data.""" return MessageEncode('0Bts{:02d}{:02d}{:1d}00'.format( thermostat+1, value, element), None) def vn_encode(): """zd: Get panel software version information.""" return MessageEncode('06vn00', 'VN') def zb_encode(zone, area, user_code): """zb: Zone bypass. Zone < 0 unbypass all; Zone > Max bypass all.""" if zone < 0: zone = 0 elif zone > Max.ZONES.value: zone = 999 else: zone += 1 return MessageEncode('10zb{zone:03d}{area:1d}{code:06d}00'.format( zone=zone, area=area+1, code=user_code), 'ZB') def zd_encode(): """zd: Get zone definitions""" return MessageEncode('06zd00', 'ZD') def zp_encode(): """zp: Get zone partitions""" return MessageEncode('06zp00', 'ZP') def zs_encode(): """zs: Get zone statuses""" return MessageEncode('06zs00', 'ZS') def zt_encode(zone): """zt: Trigger zone.""" return MessageEncode('09zt{zone:03d}00'.format(zone=zone+1), None) def zv_encode(zone): """zv: Get zone voltage""" return MessageEncode('09zv{zone:03d}00'.format(zone=zone+1), 'ZV') PK!8Z\bbelkm1_lib/outputs.py"""Definition of an ElkM1 Output""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import cs_encode, cf_encode, cn_encode, ct_encode class Output(Element): """Class representing an Output""" def __init__(self, index, elk): super().__init__(index, elk) self.output_on = False def turn_off(self): """(Helper) Turn of an output""" self._elk.send(cf_encode(self._index)) def turn_on(self, time): """(Helper) Turn on an output""" self._elk.send(cn_encode(self._index, time)) def toggle(self): """(Helper) Toggle an output""" self._elk.send(ct_encode(self._index)) class Outputs(Elements): """Handling for multiple areas""" def __init__(self, elk): super().__init__(elk, Output, Max.OUTPUTS.value) elk.add_handler('CC', self._cc_handler) elk.add_handler('CS', self._cs_handler) def sync(self): """Retrieve areas from ElkM1""" self.elk.send(cs_encode()) self.get_descriptions(TextDescriptions.OUTPUT.value) def _cc_handler(self, output, output_status): self.elements[output].setattr('output_on', output_status, True) def _cs_handler(self, output_status): for output in self.elements: output.setattr('output_on', output_status[output.index], True) PK!d  elkm1_lib/panel.py"""Definition of an ElkM1 Area""" from .const import ElkRPStatus from .elements import Element from .message import vn_encode, lw_encode, sw_encode, sp_encode, ss_encode class Panel(Element): """Class representing an Area""" def __init__(self, elk): super().__init__(0, elk) self.real_time_clock = None self.elkm1_version = None self.xep_version = None self.remote_programming_status = 0 self.system_trouble_status = '' self.setattr('name', 'ElkM1', True) self._elk.add_sync_handler(self.sync) def sync(self): """Retrieve panel information from ElkM1""" self._elk.add_handler('VN', self._vn_handler) self._elk.add_handler('XK', self._xk_handler) self._elk.add_handler('RP', self._rp_handler) self._elk.add_handler('IE', self._elk.call_sync_handlers) self._elk.add_handler('SS', self._ss_handler) self._elk.send(vn_encode()) self._elk.send(lw_encode()) self._elk.send(ss_encode()) def speak_word(self, word): """(Helper) Speak word.""" self._elk.send(sw_encode(word)) def speak_phrase(self, phrase): """(Helper) Speak phrase.""" self._elk.send(sp_encode(phrase)) def _vn_handler(self, elkm1_version, xep_version): self.setattr('elkm1_version', elkm1_version, False) self.setattr('xep_version', xep_version, True) def _xk_handler(self, real_time_clock): self.setattr('real_time_clock', real_time_clock, True) def _ss_handler(self, system_trouble_status): def _get_status(index, trouble, zone_encoded=False): if system_trouble_status[index] != '0': if zone_encoded: zone = ord(system_trouble_status[index]) - 0x30 statuses.append('{} zone {}'.format(trouble, zone)) else: statuses.append(trouble) statuses = [] _get_status(0, "AC Fail") _get_status(1, "Box Tamper", True) _get_status(2, "Fail To Communicate") _get_status(3, "EEProm Memory Error") _get_status(4, "Low Battery Control") _get_status(5, "Transmitter Low Battery", True) _get_status(6, "Over Current") _get_status(7, "Telephone Fault") _get_status(9, "Output 2") _get_status(10, "Missing Keypad") _get_status(11, "Zone Expander") _get_status(12, "Output Expander") _get_status(14, "ELKRP Remote Access") _get_status(16, "Common Area Not Armed") _get_status(17, "Flash Memory Error") _get_status(18, "Security Alert", True) _get_status(19, "Serial Port Expander") _get_status(20, "Lost Transmitter", True) _get_status(21, "GE Smoke CleanMe") _get_status(22, "Ethernet") _get_status(31, "Display Message In Keypad Line 1") _get_status(32, "Display Message In Keypad Line 2") _get_status(33, "Fire", True) self.setattr('system_trouble_status', ', '.join(statuses), True) def _rp_handler(self, remote_programming_status): if remote_programming_status == ElkRPStatus.DISCONNECTED.value: self._elk.resume() else: self._elk.pause() self.setattr('remote_programming_status', remote_programming_status, True) PK!6/E elkm1_lib/proto.py"""Async IO.""" import asyncio from functools import reduce import logging from .message import get_elk_command LOG = logging.getLogger(__name__) class Connection(asyncio.Protocol): """asyncio Protocol with line parsing and queuing writes""" # pylint: disable=too-many-instance-attributes def __init__(self, loop, connected, disconnected, got_data, timeout): self.loop = loop self._connected_callback = connected self._disconnected_callback = disconnected self._got_data_callback = got_data self._timeout = timeout self._transport = None self._waiting_for_response = None self._timeout_task = None self._queued_writes = [] self._buffer = '' self._paused = False def connection_made(self, transport): LOG.debug("connected callback") self._transport = transport self._connected_callback(transport, self) def connection_lost(self, exc): LOG.debug("disconnected callback") self._transport = None self._cleanup() self._disconnected_callback() def _cleanup(self): self._cancel_timer() self._waiting_for_response = None self._queued_writes = [] self._buffer = '' def pause(self): """Pause the connection from sending/receiving.""" self._cleanup() self._paused = True def resume(self): """Restart the connection from sending/receiving.""" self._paused = False def _response_required_timeout(self): self._timeout(self._waiting_for_response) self._timeout_task = None self._waiting_for_response = None self._process_write_queue() def _cancel_timer(self): if self._timeout_task: self._timeout_task.cancel() self._timeout_task = None def data_received(self, data): self._buffer += data.decode('ISO-8859-1') while "\r\n" in self._buffer: line, self._buffer = self._buffer.split("\r\n", 1) if get_elk_command(line) == self._waiting_for_response: self._waiting_for_response = None self._cancel_timer() self._got_data_callback(line) self._process_write_queue() def _process_write_queue(self): while self._queued_writes and not self._waiting_for_response: to_write = self._queued_writes.pop(0) self.write_data(to_write[0], to_write[1], timeout=to_write[2]) def write_data(self, data, response_required=None, timeout=5.0, raw=False): """Write data on the asyncio Protocol""" if self._transport is None: return if self._paused: return if self._waiting_for_response: LOG.debug("queueing write %s", data) self._queued_writes.append((data, response_required, timeout)) return if response_required: self._waiting_for_response = response_required if timeout > 0: self._timeout_task = self.loop.call_later( timeout, self._response_required_timeout) if not raw: cksum = 256 - reduce(lambda x, y: x+y, map(ord, data)) % 256 data = data + '{:02X}'.format(cksum) if int(data[0:2], 16) != len(data)-2: LOG.debug("message length wrong: %s", data) LOG.debug("write_data '%s'", data) self._transport.write((data + '\r\n').encode()) PK!=P0MZZelkm1_lib/settings.py"""Definition of an ElkM1 Custom Value""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import cp_encode, cw_encode class Setting(Element): """Class representing an Custom Value""" def __init__(self, index, elk): super().__init__(index, elk) self.value_format = 0 self.value = None def set(self, value): """(Helper) Set custom value.""" self._elk.send(cw_encode(self._index, value, self.value_format)) class Settings(Elements): """Handling for multiple custom values""" def __init__(self, elk): super().__init__(elk, Setting, Max.SETTINGS.value) elk.add_handler('CR', self._cr_handler) def sync(self): """Retrieve custom values from ElkM1""" self.elk.send(cp_encode()) self.get_descriptions(TextDescriptions.SETTING.value) def _cr_handler(self, values): for value in values: custom_value = self.elements[value['index']] custom_value.value_format = value['value_format'] custom_value.value = value['value'] PK!i Gelkm1_lib/tasks.py"""Definition of an ElkM1 Task""" from time import time from .const import Max, TextDescriptions from .elements import Element, Elements from .message import tn_encode class Task(Element): """Class representing an Task""" def __init__(self, index, elk): # pylint: disable=useless-super-delegation super().__init__(index, elk) self.last_change = None def activate(self): """(Helper) Activate task""" self._elk.send(tn_encode(self._index)) class Tasks(Elements): """Handling for multiple tasks""" def __init__(self, elk): super().__init__(elk, Task, Max.TASKS.value) elk.add_handler('TC', self._tc_handler) def sync(self): """Retrieve tasks from ElkM1""" self.get_descriptions(TextDescriptions.TASK.value) def _tc_handler(self, task): self.elements[task].setattr('last_change', time(), True) PK!v\\elkm1_lib/thermostats.py"""Definition of an ElkM1 Thermostat""" from .const import Max, TextDescriptions from .elements import Element, Elements from .message import tr_encode, ts_encode class Thermostat(Element): """Class representing an Thermostat""" def __init__(self, index, elk): # pylint: disable=useless-super-delegation super().__init__(index, elk) self.mode = 0 self.hold = False self.fan = 0 self.current_temp = 0 self.heat_setpoint = 0 self.cool_setpoint = 0 self.humidity = 0 def set(self, element_to_set, value): """(Helper) Set thermostat""" self._elk.send(ts_encode(self.index, value, element_to_set)) class Thermostats(Elements): """Handling for multiple areas""" def __init__(self, elk): super().__init__(elk, Thermostat, Max.THERMOSTATS.value) elk.add_handler('ST', self._st_handler) elk.add_handler('TR', self._tr_handler) def sync(self): """Retrieve areas from ElkM1""" self.get_descriptions(TextDescriptions.THERMOSTAT.value) def _got_desc(self, descriptions): super()._got_desc(descriptions) # Only poll thermostats that have a name defined for thermostat in self.elements: if not thermostat.is_default_name(): self.elk.send(tr_encode(thermostat.index)) def _st_handler(self, group, device, temperature): if group == 2: self.elements[device].setattr('current_temp', temperature, True) # pylint: disable=too-many-arguments def _tr_handler(self, thermostat_index, mode, hold, fan, current_temp, heat_setpoint, cool_setpoint, humidity): thermostat = self.elements[thermostat_index] thermostat.setattr('mode', mode, False) thermostat.setattr('hold', hold, False) thermostat.setattr('fan', fan, False) thermostat.setattr('current_temp', current_temp, False) thermostat.setattr('heat_setpoint', heat_setpoint, False) thermostat.setattr('cool_setpoint', cool_setpoint, False) thermostat.setattr('humidity', humidity, True) PK!]__elkm1_lib/users.py"""Definition of an ElkM1 User""" import re from .const import Max, TextDescriptions from .elements import Element, Elements class User(Element): """Class representing an User""" def __init__(self, index, elk): # pylint: disable=useless-super-delegation super().__init__(index, elk) class Users(Elements): """Handling for multiple areas""" def __init__(self, elk): super().__init__(elk, User, Max.USERS.value) def sync(self): """Retrieve areas from ElkM1""" self.get_descriptions(TextDescriptions.USER.value) def _got_desc(self, descriptions): super()._got_desc(descriptions) # Elk returns all user descriptions rather than just configured users for user in self.elements: if re.match(r'^USER \d\d\d', user.name): user.name = user.default_name() PK!8elkm1_lib/util.py"""Utility functions""" import logging import ssl LOG = logging.getLogger(__name__) def url_scheme_is_secure(url): """Check if the URL is one that requires SSL/TLS.""" scheme, _dest = url.split('://') return scheme == 'elks' def parse_url(url): """Parse a Elk connection string """ scheme, dest = url.split('://') host = None ssl_context = None if scheme == 'elk': host, port = dest.split(':') if ':' in dest else (dest, 2101) elif scheme == 'elks': host, port = dest.split(':') if ':' in dest else (dest, 2601) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ssl_context.verify_mode = ssl.CERT_NONE elif scheme == 'serial': host, port = dest.split(':') if ':' in dest else (dest, 115200) else: raise ValueError("Invalid scheme '%s'" % scheme) return (scheme, host, int(port), ssl_context) def pretty_const(value): """Make a constant pretty for printing in GUI""" words = value.split('_') pretty = words[0].capitalize() for word in words[1:]: pretty += ' ' + word.lower() return pretty def username(elk, user_number): """Return name of user.""" if user_number >= 0 and user_number < elk.users.max_elements: return elk.users[user_number].name if user_number == 201: return "*Program*" if user_number == 202: return "*Elk RP*" if user_number == 203: return "*Quick arm*" return "" PK!elkm1_lib/zones.py"""Definition of an ElkM1 Zone""" from .const import Max, TextDescriptions, ZoneType, \ ZoneLogicalStatus, ZonePhysicalStatus from .elements import Element, Elements from .message import az_encode, zd_encode, zp_encode, zs_encode, zt_encode class Zone(Element): """Class representing a Zone""" def __init__(self, index, elk): super().__init__(index, elk) self.definition = 0 self.area = -1 self.bypassed = False self.logical_status = 0 self.physical_status = 0 self.voltage = 0 self.temperature = -60 self.triggered_alarm = False def __str__(self): return ("{indx:d} '{name}' type:{typ} status:{logl}/{phys}" " area:{area:d} trig:{trig} v:{volt} temp:{temp}").format( name=self.name, indx=self._index, typ=ZoneType(self.definition).name, logl=ZoneLogicalStatus(self.logical_status).name, trig=self.triggered_alarm, volt=self.voltage, area=self.area, temp=self.temperature, phys=ZonePhysicalStatus(self.physical_status).name) def zone_trigger(self): """(Helper) Trigger zone.""" self._elk.send(zt_encode(self._index)) class Zones(Elements): """Handling for multiple zones""" def __init__(self, elk): super().__init__(elk, Zone, Max.ZONES.value) elk.add_handler('AZ', self._az_handler) elk.add_handler('LW', self._lw_handler) elk.add_handler('ST', self._st_handler) elk.add_handler('ZB', self._zb_handler) elk.add_handler('ZC', self._zc_handler) elk.add_handler('ZD', self._zd_handler) elk.add_handler('ZP', self._zp_handler) elk.add_handler('ZS', self._zs_handler) elk.add_handler('ZV', self._zv_handler) def sync(self): """Retrieve zones from ElkM1""" self.elk.send(az_encode()) self.elk.send(zd_encode()) self.elk.send(zp_encode()) self.elk.send(zs_encode()) self.get_descriptions(TextDescriptions.ZONE.value) def _az_handler(self, alarm_status): for zone in self.elements: zone.setattr('triggered_alarm', alarm_status[zone.index] != '0', True) # pylint: disable=unused-argument def _lw_handler(self, keypad_temps, zone_temps): for i in range(16): zone = self.elements[i] if zone_temps[zone.index] > -60: zone.setattr('temperature', zone_temps[zone.index], True) def _st_handler(self, group, device, temperature): if group == 0: self.elements[device].setattr('temperature', temperature, True) def _zb_handler(self, zone_number, zone_bypassed): self.elements[zone_number].setattr('bypassed', zone_bypassed, True) def _zc_handler(self, zone_number, zone_status): self.elements[zone_number].setattr('logical_status', zone_status[0], False) self.elements[zone_number].setattr('physical_status', zone_status[1], True) def _zd_handler(self, zone_definitions): for zone in self.elements: zone.setattr('definition', zone_definitions[zone.index], True) def _zp_handler(self, zone_partitions): for zone in self.elements: zone.setattr('area', zone_partitions[zone.index], True) def _zs_handler(self, zone_statuses): for zone in self.elements: zone.setattr('logical_status', zone_statuses[zone.index][0], False) zone.setattr('physical_status', zone_statuses[zone.index][1], True) def _zv_handler(self, zone_number, zone_voltage): self.elements[zone_number].setattr('voltage', zone_voltage, True) PK!HڽTU elkm1_lib-0.7.15.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H;N s#elkm1_lib-0.7.15.dist-info/METADATAYms6_iD[y&r&S;N:"! ge]$eI"ž<.|\Vd.[Vz!'scVTM5z%7*k yW-3RNYt;Mys9YI9Se2dժLR&s]l[%N6:WŲkqֹimZH^Tr!_}CoIIӭ{1߿x<>Wef^kٯխ*MS+B>3:XzB+sE+~l6A+]˳i-.ز`lڬȥY.Z K'Ͼx_!,u juV2sm b4}~NGnVƢ։*Yo%kSс;Ǜey)m]V;1B9xBR9X)QOI<@X#,kRlwirdۚ QQ~^B>?ci]=rYcDnĿF7Ђ{\b |ZxYԵʣH/u[Ƨ*^DJ]Bl!Wׯ}qaQ,5h{oy2?==8N&ÛKV ÁeV~+JxiHEӪQ9fZS%$ P^!7yx]\%OTyUhSћ?XZtV󦽡lTwYՔݐu172SLW>ƬC]Cx&ER8B+gNED QI64)szt\+0Qw"&GbDN;؄S- $<PhU76(2˥ur{[am DEAo85\d3bp̠> +BZJ*B6z|)oޯ kO jsloѺ̐6h.J(xӓ_&E)RXYlKͨ Г 6;@\e!t6 [uI(t{~th1ɷAo?oLX$'-;V:[ 5eQ!5V(^E aѭQؙTum=b7ڭ^Fj< BR zME0N:+AoYwJQ%[Z1~|sfez9Cr:,ΨD0Nux`n IDr1βҽ Ref0/89REj9V~|I4Q/]]Ky<2.1(:4C,0 Mq0AxמXb-`BcDcɴ̚EqeyaE*G%B4qtCR4Á/0(ƺqcf3Ӏt_$QEB6`_xZ&6w|/ 7L c6Wau%y6r{[HgaEB;%ѽ/_ְ5 䀒+6fQ?>.?rc%Q 5%gBCm]᧋CUɌ3 j+J-OUj4\f ~7/!5kDΓ!B[9l#l&InghYeCb& bJ|F{]A8#!G8!Uw<ҙvȭNW PCH755W'c\uAV@uٷG8lWt:̪旼kɺN+ , t~҉A(ud~e*-hbAS[mS7YF=@[RexS&7фA;*p&CtAS7B+ r.EnT)9D`2+N`w$jfLEQ(_;iI|t*1)OS3]Yă8Y HBDx )k4g>>F_ zf-@7[%^OXhg'r? Cl&:ל gcy=zp)J vh\d kQ16Kн_~Yȍ88bHE"Q8;:[DjY'Q$\7ͭƀM(iDcz1;JCIkʼn>ܿekۍ1L;ќtm0߃"JP^^RًoE^KX#JHC6c>޳;*uq(1#%m;d"xlT $8^TTLB[ih*TNȾ^ ?k B$ G@c-9 v^mݓ:=Fq#8-PK!Hm!elkm1_lib-0.7.15.dist-info/RECORD}ɲX}= TqYqC xۨLAϊt2u,)̲tc xR m[^&S-fP`~=OTc]B (tL0FZ*Ă_$ dQqsJn~k\ɅehwN[΍:E#_qTl %Fm9K޹ (X:9O "e`^wUCEnCbي\kFCU0F~J $Y_aLd)]ofrs:GDz![`0_jTOM>.Yc(}$4,ʀh];+ Gu(Rlls ((n6kk|OmรXhb=-dmAgFGn%^P+>B#91WX.? 3\&_&"pNKNq= KOQpǞۃ>Mix]2ӁEAd6K4ê<;:̸a/ozdnBxsdEh=La~,iφu[Mr#8{rM]3@0ҺI۝a  eJ}i(75CB_[ Z.oa:QSwJqv[]%S0zxyA ; &7q[6o49l'L+Iqk="Os{=P4Ym CGDff~wu޼?*Ix~pg*6WqR źNἹac ʋo)-$Q A},f|N gے`"PK!K CHANGELOG.mdPK!@n/n/ bin/cmdr.pyPK!  NEbin/elkPK!lQQ 큈Qbin/mkdocPK!3L Ybin/simplePK!~/[bin/test-serialPK!ESB))_elkm1_lib/__init__.pyPK!I  v_elkm1_lib/areas.pyPK!څ8%%ielkm1_lib/const.pyPK!elkm1_lib/counters.pyPK!1&&֓elkm1_lib/elements.pyPK!e{/elkm1_lib/elk.pyPK!4#elkm1_lib/keypads.pyPK!UI/elkm1_lib/lights.pyPK!v::belkm1_lib/message.pyPK!8Z\bb1elkm1_lib/outputs.pyPK!d   elkm1_lib/panel.pyPK!6/E elkm1_lib/proto.pyPK!=P0MZZ%elkm1_lib/settings.pyPK!i G*elkm1_lib/tasks.pyPK!v\\=.elkm1_lib/thermostats.pyPK!]__6elkm1_lib/users.pyPK!8^:elkm1_lib/util.pyPK!I@elkm1_lib/zones.pyPK!HڽTU Oelkm1_lib-0.7.15.dist-info/WHEELPK!H;N s#Oelkm1_lib-0.7.15.dist-info/METADATAPK!Hm![elkm1_lib-0.7.15.dist-info/RECORDPK`