PK!cchwi.py#! /usr/bin/env python3 # Hardware wallet interaction script from hwilib.cli import main main() PK!(frhwilib/__init__.py__version__ = '1.0.0' PK!;QYL hwilib/base58.py # # base58.py # Original source: git://github.com/joric/brutus.git # which was forked from git://github.com/samrushing/caesure.git # # Distributed under the MIT/X11 software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' from binascii import hexlify, unhexlify import struct from .serializations import hash256, hash160 def encode(b): """Encode bytes to a base58-encoded string""" # Convert big-endian bytes to integer n = int('0x0' + hexlify(b).decode('utf8'), 16) # Divide that integer into bas58 res = [] while n > 0: n, r = divmod (n, 58) res.append(b58_digits[r]) res = ''.join(res[::-1]) # Encode leading zeros as base58 zeros import sys czero = b'\x00' if sys.version > '3': # In Python3 indexing a bytes returns numbers, not characters. czero = 0 pad = 0 for c in b: if c == czero: pad += 1 else: break return b58_digits[0] * pad + res def decode(s): """Decode a base58-encoding string, returning bytes""" if not s: return b'' # Convert the string to an integer n = 0 for c in s: n *= 58 if c not in b58_digits: raise InvalidBase58Error('Character %r is not a valid base58 character' % c) digit = b58_digits.index(c) n += digit # Convert the integer to bytes h = '%x' % n if len(h) % 2: h = '0' + h res = unhexlify(h.encode('utf8')) # Add padding back. pad = 0 for c in s[:-1]: if c == b58_digits[0]: pad += 1 else: break return b'\x00' * pad + res def get_xpub_fingerprint(s): data = decode(s) fingerprint = data[5:9] return struct.unpack("> 25 chk = (chk & 0x1ffffff) << 5 ^ value for i in range(5): chk ^= generator[i] if ((top >> i) & 1) else 0 return chk def bech32_hrp_expand(hrp): """Expand the HRP into values for checksum computation.""" return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] def bech32_verify_checksum(hrp, data): """Verify a checksum given HRP and converted data characters.""" return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 def bech32_create_checksum(hrp, data): """Compute the checksum values given HRP and data.""" values = bech32_hrp_expand(hrp) + data polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] def bech32_encode(hrp, data): """Compute a Bech32 string given HRP and data values.""" combined = data + bech32_create_checksum(hrp, data) return hrp + '1' + ''.join([CHARSET[d] for d in combined]) def bech32_decode(bech): """Validate a Bech32 string, and determine HRP and data.""" if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or (bech.lower() != bech and bech.upper() != bech)): return (None, None) bech = bech.lower() pos = bech.rfind('1') if pos < 1 or pos + 7 > len(bech) or len(bech) > 90: return (None, None) if not all(x in CHARSET for x in bech[pos+1:]): return (None, None) hrp = bech[:pos] data = [CHARSET.find(x) for x in bech[pos+1:]] if not bech32_verify_checksum(hrp, data): return (None, None) return (hrp, data[:-6]) def convertbits(data, frombits, tobits, pad=True): """General power-of-2 base conversion.""" acc = 0 bits = 0 ret = [] maxv = (1 << tobits) - 1 max_acc = (1 << (frombits + tobits - 1)) - 1 for value in data: if value < 0 or (value >> frombits): return None acc = ((acc << frombits) | value) & max_acc bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad: if bits: ret.append((acc << (tobits - bits)) & maxv) elif bits >= frombits or ((acc << (tobits - bits)) & maxv): return None return ret def decode(hrp, addr): """Decode a segwit address.""" hrpgot, data = bech32_decode(addr) if hrpgot != hrp: return (None, None) decoded = convertbits(data[1:], 5, 8, False) if decoded is None or len(decoded) < 2 or len(decoded) > 40: return (None, None) if data[0] > 16: return (None, None) if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32: return (None, None) return (data[0], decoded) def encode(hrp, witver, witprog): """Encode a segwit address.""" ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5)) if decode(hrp, ret) == (None, None): return None return retPK!,, hwilib/cli.py#! /usr/bin/env python3 from .commands import backup_device, displayaddress, enumerate, find_device, \ get_client, getmasterxpub, getxpub, getkeypool, prompt_pin, restore_device, send_pin, setup_device, \ signmessage, signtx, wipe_device from .errors import ( HWWError, NO_DEVICE_PATH, DEVICE_CONN_ERROR, NO_PASSWORD, UNKNWON_DEVICE_TYPE, UNKNOWN_ERROR, UNAVAILABLE_ACTION ) from . import __version__ import argparse import getpass import logging import json import sys def backup_device_handler(args, client): return backup_device(client, label=args.label, backup_passphrase=args.backup_passphrase) def displayaddress_handler(args, client): return displayaddress(client, desc=args.desc, path=args.path, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh) def enumerate_handler(args): return enumerate(password=args.password) def getmasterxpub_handler(args, client): return getmasterxpub(client) def getxpub_handler(args, client): return getxpub(client, path=args.path) def getkeypool_handler(args, client): return getkeypool(client, path=args.path, start=args.start, end=args.end, internal=args.internal, keypool=args.keypool, account=args.account, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh) def restore_device_handler(args, client): if args.interactive: return restore_device(client, label=args.label) return {'error': 'restore requires interactive mode', 'code': UNAVAILABLE_ACTION} def setup_device_handler(args, client): if args.interactive: return setup_device(client, label=args.label, backup_passphrase=args.backup_passphrase) return {'error': 'setup requires interactive mode', 'code': UNAVAILABLE_ACTION} def signmessage_handler(args, client): return signmessage(client, message=args.message, path=args.path) def signtx_handler(args, client): return signtx(client, psbt=args.psbt) def wipe_device_handler(args, client): return wipe_device(client) def prompt_pin_handler(args, client): return prompt_pin(client) def send_pin_handler(args, client): return send_pin(client, pin=args.pin) def process_commands(cli_args): parser = argparse.ArgumentParser(description='Hardware Wallet Interface, version {}.\nAccess and send commands to a hardware wallet device. Responses are in JSON format.'.format(__version__), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--device-path', '-d', help='Specify the device path of the device to connect to') parser.add_argument('--device-type', '-t', help='Specify the type of device that will be connected. If `--device-path` not given, the first device of this type enumerated is used.') parser.add_argument('--password', '-p', help='Device password if it has one (e.g. DigitalBitbox)', default='') parser.add_argument('--stdinpass', help='Enter the device password on the command line', action='store_true') parser.add_argument('--testnet', help='Use testnet prefixes', action='store_true') parser.add_argument('--debug', help='Print debug statements', action='store_true') parser.add_argument('--fingerprint', '-f', help='Specify the device to connect to using the first 4 bytes of the hash160 of the master public key. It will connect to the first device that matches this fingerprint.') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(__version__)) parser.add_argument('--stdin', help='Enter commands and arguments via stdin', action='store_true') parser.add_argument('--interactive', '-i', help='Use some commands interactively. Currently required for all device configuration commands', action='store_true') subparsers = parser.add_subparsers(description='Commands', dest='command') # work-around to make subparser required subparsers.required = True enumerate_parser = subparsers.add_parser('enumerate', help='List all available devices') enumerate_parser.set_defaults(func=enumerate_handler) getmasterxpub_parser = subparsers.add_parser('getmasterxpub', help='Get the extended public key at m/44\'/0\'/0\'') getmasterxpub_parser.set_defaults(func=getmasterxpub_handler) signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT') signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign') signtx_parser.set_defaults(func=signtx_handler) getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key') getxpub_parser.add_argument('path', help='The BIP 32 derivation path to derive the key at') getxpub_parser.set_defaults(func=getxpub_handler) signmsg_parser = subparsers.add_parser('signmessage', help='Sign a message') signmsg_parser.add_argument('message', help='The message to sign') signmsg_parser.add_argument('path', help='The BIP 32 derivation path of the key to sign the message with') signmsg_parser.set_defaults(func=signmessage_handler) getkeypool_parser = subparsers.add_parser('getkeypool', help='Get JSON array of keys that can be imported to Bitcoin Core with importmulti') getkeypool_parser.add_argument('--keypool', action='store_true', help='Indicates that the keys are to be imported to the keypool') getkeypool_parser.add_argument('--internal', action='store_true', help='Indicates that the keys are change keys') getkeypool_parser.add_argument('--sh_wpkh', action='store_true', help='Generate p2sh-nested segwit addresses (default path: m/49h/0h/0h/[0,1]/*)') getkeypool_parser.add_argument('--wpkh', action='store_true', help='Generate bech32 addresses (default path: m/84h/0h/0h/[0,1]/*)') getkeypool_parser.add_argument('--account', help='BIP43 account (default: 0)', type=int, default=0) getkeypool_parser.add_argument('--path', help='Derivation path, default follows BIP43 convention, e.g. m/84h/0h/0h/1/* with --wpkh --internal') getkeypool_parser.add_argument('start', type=int, help='The index to start at.') getkeypool_parser.add_argument('end', type=int, help='The index to end at.') getkeypool_parser.set_defaults(func=getkeypool_handler) displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address') group = displayaddr_parser.add_mutually_exclusive_group(required=True) group.add_argument('--desc', help='Output Descriptor. E.g. wpkh([00000000/84h/0h/0h]xpub.../0/0), where 00000000 must match --fingerprint and xpub can be obtained with getxpub. See doc/descriptors.md in Bitcoin Core') group.add_argument('--path', help='The BIP 32 derivation path of the key embedded in the address, default follows BIP43 convention, e.g. m/84h/0h/0h/1/*') displayaddr_parser.add_argument('--sh_wpkh', action='store_true', help='Display the p2sh-nested segwit address associated with this key path') displayaddr_parser.add_argument('--wpkh', action='store_true', help='Display the bech32 version of the address associated with this key path') displayaddr_parser.set_defaults(func=displayaddress_handler) setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p. Requires interactive mode') setupdev_parser.add_argument('--label', '-l', help='The name to give to the device', default='') setupdev_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') setupdev_parser.set_defaults(func=setup_device_handler) wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device') wipedev_parser.set_defaults(func=wipe_device_handler) restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process. Requires interactive mode') restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='') restore_parser.set_defaults(func=restore_device_handler) backup_parser = subparsers.add_parser('backup', help='Initiate the device backup creation process') backup_parser.add_argument('--label', '-l', help='The name to give to the device', default='') backup_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') backup_parser.set_defaults(func=backup_device_handler) promptpin_parser = subparsers.add_parser('promptpin', help='Have the device prompt for your PIN') promptpin_parser.set_defaults(func=prompt_pin_handler) sendpin_parser = subparsers.add_parser('sendpin', help='Send the numeric positions for your PIN to the device') sendpin_parser.add_argument('pin', help='The numeric positions of the PIN') sendpin_parser.set_defaults(func=send_pin_handler) if any(arg == '--stdin' for arg in cli_args): blank_count = 0 while True: try: line = input() # Exit loop when we see 2 consecutive newlines (i.e. an empty line) if line == '': break # Split the line and append it to the cli args import shlex cli_args.extend(shlex.split(line)) except EOFError: # If we see EOF, stop taking input break # Parse arguments again for anything entered over stdin args = parser.parse_args(cli_args) device_path = args.device_path device_type = args.device_type password = args.password command = args.command # Setup debug logging logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARNING) # Enter the password on stdin if args.stdinpass: password = getpass.getpass('Enter your device password: ') args.password = password # List all available hardware wallet devices if command == 'enumerate': return args.func(args) # Auto detect if we are using fingerprint or type to identify device if args.fingerprint or (args.device_type and not args.device_path): client = find_device(args.device_path, args.password, args.device_type, args.fingerprint) if not client: return {'error':'Could not find device with specified fingerprint','code':DEVICE_CONN_ERROR} elif args.device_type and args.device_path: try: client = get_client(device_type, device_path, password) except HWWError as e: return {'error': e.get_msg(), 'code': e.get_code()} except Exception as e: return {'error':str(e),'code':DEVICE_CONN_ERROR} else: return {'error':'You must specify a device type or fingerprint for all commands except enumerate','code':NO_DEVICE_PATH} client.is_testnet = args.testnet # Do the commands try: result = args.func(args, client) except HWWError as e: result = {'error': e.get_msg(), 'code': e.get_code()} except Exception as e: if args.debug: import traceback traceback.print_exc() result = {'error': str(e), 'code': UNKNOWN_ERROR} # Close the device try: client.close() except HWWError as e: result = {'error': e.get_msg(), 'code': e.get_code()} except Exception as e: if args.debug: import traceback traceback.print_exc() result = {'error': str(e), 'code': UNKNOWN_ERROR} return result def main(): result = process_commands(sys.argv[1:]) print(json.dumps(result)) PK! 3||hwilib/commands.py#! /usr/bin/env python3 # Hardware wallet interaction script import importlib from .serializations import PSBT, Base64ToHex, HexToBase64, hash160 from .base58 import get_xpub_fingerprint_as_id, get_xpub_fingerprint_hex, xpub_to_pub_hex from .errors import NoPasswordError, UnavailableActionError, DeviceAlreadyInitError, DeviceAlreadyUnlockedError, UnknownDeviceError, BAD_ARGUMENT, NOT_IMPLEMENTED from .descriptor import Descriptor from .devices import __all__ as all_devs # Get the client for the device def get_client(device_type, device_path, password=''): class_name = device_type.capitalize() module = device_type.lower() client = None try: imported_dev = importlib.import_module('.devices.' + module, __package__) client_constructor = getattr(imported_dev, class_name + 'Client') client = client_constructor(device_path, password) except ImportError as e: if client: client.close() raise UnknownDeviceError('Unknown device type specified') return client # Get a list of all available hardware wallets def enumerate(password=''): result = [] for module in all_devs: try: imported_dev = importlib.import_module('.devices.' + module, __package__) result.extend(imported_dev.enumerate(password)) except ImportError as e: pass # Ignore ImportErrors, the user may not have all device dependencies installed return result # Fingerprint or device type required def find_device(device_path, password='', device_type=None, fingerprint=None): devices = enumerate(password) for d in devices: if device_type is not None and d['type'] != device_type: continue client = None try: client = get_client(d['type'], d['path'], password) master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] master_fpr = get_xpub_fingerprint_hex(master_xpub) if fingerprint and master_fpr != fingerprint: client.close() continue else: client.fingerprint = master_fpr return client except: if client: client.close() pass # Ignore things we wouldn't get fingerprints for return None def getmasterxpub(client): return client.get_master_xpub() def signtx(client, psbt): # Deserialize the transaction tx = PSBT() tx.deserialize(psbt) return client.sign_tx(tx) def getxpub(client, path): return client.get_pubkey_at_path(path) def signmessage(client, message, path): return client.sign_message(message, path) def getkeypool(client, path, start, end, internal=False, keypool=False, account=0, sh_wpkh=False, wpkh=True): if sh_wpkh == True and wpkh == True: return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT} try: master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] except NotImplementedError as e: return {'error': str(e), 'code': NOT_IMPLEMENTED} master_fpr = get_xpub_fingerprint_as_id(master_xpub) if not path: # Master key: path = "m/" # Purpose if wpkh == True: path += "84'/" elif sh_wpkh == True: path += "49'/" else: path += "44'/" # Coin type if client.is_testnet == True: path += "1'/" else: path += "0'/" # Account path += str(account) + '\'/' # Receive or change if internal == True: path += "1/*" else: path += "0/*" else: if path[0] != "m": return {'error':'Path must start with m/','code':BAD_ARGUMENT} if path[-1] != "*": return {'error':'Path must end with /*','code':BAD_ARGUMENT} # Find the last hardened derivation: path = path.replace('\'','h') path_suffix = '' for component in path.split("/")[::-1]: if component[-1] == 'h' or component[-1] == 'm': break path_suffix = '/' + component + path_suffix path_base = path.rsplit(path_suffix)[0] # Get the key at the base base_key = client.get_pubkey_at_path(path_base)['xpub'] import_data = [] this_import = {} desc = Descriptor(master_fpr, path_base.replace('m', ''), base_key, path_suffix, client.is_testnet, sh_wpkh, wpkh) this_import['desc'] = desc.serialize() this_import['range'] = [start, end] this_import['timestamp'] = 'now' this_import['internal'] = internal this_import['keypool'] = keypool this_import['watchonly'] = True import_data.append(this_import) return import_data def displayaddress(client, path=None, desc=None, sh_wpkh=False, wpkh=False): if path is not None: if sh_wpkh == True and wpkh == True: return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT} return client.display_address(path, sh_wpkh, wpkh) elif desc is not None: if sh_wpkh == True or wpkh == True: return {'error':' `--wpkh` and `--sh_wpkh` can not be combined with --desc','code':BAD_ARGUMENT} descriptor = Descriptor.parse(desc, client.is_testnet) if descriptor is None: return {'error':'Unable to parse descriptor: ' + desc,'code':BAD_ARGUMENT} if descriptor.m_path is None: return {'error':'Descriptor missing origin info: ' + desc,'code':BAD_ARGUMENT} if descriptor.origin_fingerprint != client.fingerprint: return {'error':'Descriptor fingerprint does not match device: ' + desc,'code':BAD_ARGUMENT} xpub = client.get_pubkey_at_path(descriptor.m_path_base)['xpub'] if descriptor.base_key != xpub and descriptor.base_key != xpub_to_pub_hex(xpub): return {'error':'Key in descriptor does not match device: ' + desc,'code':BAD_ARGUMENT} return client.display_address(descriptor.m_path, descriptor.sh_wpkh, descriptor.wpkh) def setup_device(client, label='', backup_passphrase=''): return client.setup_device(label, backup_passphrase) def wipe_device(client): return client.wipe_device() def restore_device(client, label): return client.restore_device(label) def backup_device(client, label='', backup_passphrase=''): return client.backup_device(label, backup_passphrase) def prompt_pin(client): return client.prompt_pin() def send_pin(client, pin): return client.send_pin(pin) PK! Hhwilib/descriptor.pyimport re # From: https://github.com/bitcoin/bitcoin/blob/master/src/script/descriptor.cpp def PolyMod(c, val): c0 = c >> 35 c = ((c & 0x7ffffffff) << 5) ^ val if (c0 & 1): c ^= 0xf5dee51989 if (c0 & 2): c ^= 0xa9fdca3312 if (c0 & 4): c ^= 0x1bab10e32d if (c0 & 8): c ^= 0x3706b1677a if (c0 & 16): c ^= 0x644d626ffd return c def DescriptorChecksum(desc): INPUT_CHARSET = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ "; CHECKSUM_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"; c = 1 cls = 0 clscount = 0 for ch in desc: pos = INPUT_CHARSET.find(ch) if pos == -1: return "" c = PolyMod(c, pos & 31) cls = cls * 3 + (pos >> 5) clscount += 1 if clscount == 3: c = PolyMod(c, cls) cls = 0 clscount = 0 if clscount > 0: c = PolyMod(c, cls) for j in range (0, 8): c = PolyMod(c, 0) c ^= 1 ret = [None] * 8 for j in range(0, 8): ret[j] = CHECKSUM_CHARSET[(c >> (5 * (7 - j))) & 31] return ''.join(ret) def AddChecksum(desc): return desc + "#" + DescriptorChecksum(desc) class Descriptor: def __init__(self, origin_fingerprint, origin_path, base_key, path_suffix, testnet, sh_wpkh, wpkh): self.origin_fingerprint = origin_fingerprint self.origin_path = origin_path self.path_suffix = path_suffix self.base_key = base_key self.testnet = testnet self.sh_wpkh = sh_wpkh self.wpkh = wpkh self.m_path = None if origin_path: self.m_path_base = "m" + origin_path self.m_path = "m" + origin_path + (path_suffix or "") @classmethod def parse(cls, desc, testnet = False): sh_wpkh = None wpkh = None origin_fingerprint = None origin_path = None base_key_and_path_match = None base_key = None path_suffix = None # Check the checksum check_split = desc.split('#') if len(check_split) > 2: return None if len(check_split) == 2: if len(check_split[1]) != 8: return None checksum = DescriptorChecksum(check_split[0]) if not checksum.strip(): return None if checksum != check_split[1]: return None desc = check_split[0] if desc.startswith("sh(wpkh("): sh_wpkh = True elif desc.startswith("wpkh("): wpkh = True origin_match = re.search(r"\[(.*)\]", desc) if origin_match: origin = origin_match.group(1) match = re.search(r"^([0-9a-fA-F]{8})(\/.*)", origin) if match: origin_fingerprint = match.group(1) origin_path = match.group(2) # Replace h with ' origin_path = origin_path.replace('h', '\'') base_key_and_path_match = re.search(r"\[.*\](\w+)([\/\)][\d'\/\*]*)", desc) else: base_key_and_path_match = re.search(r"\((\w+)([\/\)][\d'\/\*]*)", desc) if base_key_and_path_match: base_key = base_key_and_path_match.group(1) path_suffix = base_key_and_path_match.group(2) if path_suffix == ")": path_suffix = None else: if origin_match == None: return None return cls(origin_fingerprint, origin_path, base_key, path_suffix, testnet, sh_wpkh, wpkh) def serialize(self): descriptor_open = 'pkh(' descriptor_close = ')' origin = '' path_suffix = '' if self.wpkh == True: descriptor_open = 'wpkh(' elif self.sh_wpkh == True: descriptor_open = 'sh(wpkh(' descriptor_close = '))' if self.origin_fingerprint and self.origin_path: origin = '[' + self.origin_fingerprint + self.origin_path + ']' if self.path_suffix: path_suffix = self.path_suffix return AddChecksum(descriptor_open + origin + self.base_key + path_suffix + descriptor_close) PK!i!]]hwilib/devices/__init__.py__all__ = [ 'trezor', 'ledger', 'keepkey', 'digitalbitbox', 'coldcard' ] PK!@@!hwilib/devices/btchip/__init__.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ PK!ba  +hwilib/devices/btchip/bitcoinTransaction.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ from .bitcoinVarint import * from binascii import hexlify class bitcoinInput: def __init__(self, bufferOffset=None): self.prevOut = "" self.script = "" self.sequence = "" if bufferOffset is not None: buf = bufferOffset['buffer'] offset = bufferOffset['offset'] self.prevOut = buf[offset:offset + 36] offset += 36 scriptSize = readVarint(buf, offset) offset += scriptSize['size'] self.script = buf[offset:offset + scriptSize['value']] offset += scriptSize['value'] self.sequence = buf[offset:offset + 4] offset += 4 bufferOffset['offset'] = offset def serialize(self): result = [] result.extend(self.prevOut) writeVarint(len(self.script), result) result.extend(self.script) result.extend(self.sequence) return result def __str__(self): buf = "Prevout : " + hexlify(self.prevOut) + "\r\n" buf += "Script : " + hexlify(self.script) + "\r\n" buf += "Sequence : " + hexlify(self.sequence) + "\r\n" return buf class bitcoinOutput: def __init__(self, bufferOffset=None): self.amount = "" self.script = "" if bufferOffset is not None: buf = bufferOffset['buffer'] offset = bufferOffset['offset'] self.amount = buf[offset:offset + 8] offset += 8 scriptSize = readVarint(buf, offset) offset += scriptSize['size'] self.script = buf[offset:offset + scriptSize['value']] offset += scriptSize['value'] bufferOffset['offset'] = offset def serialize(self): result = [] result.extend(self.amount) writeVarint(len(self.script), result) result.extend(self.script) return result def __str__(self): buf = "Amount : " + hexlify(self.amount) + "\r\n" buf += "Script : " + hexlify(self.script) + "\r\n" return buf class bitcoinTransaction: def __init__(self, data=None): self.version = "" self.inputs = [] self.outputs = [] self.lockTime = "" self.witness = False self.witnessScript = "" if data is not None: offset = 0 self.version = data[offset:offset + 4] offset += 4 if (data[offset] == 0) and (data[offset + 1] != 0): offset += 2 self.witness = True inputSize = readVarint(data, offset) offset += inputSize['size'] numInputs = inputSize['value'] for i in range(numInputs): tmp = { 'buffer': data, 'offset' : offset} self.inputs.append(bitcoinInput(tmp)) offset = tmp['offset'] outputSize = readVarint(data, offset) offset += outputSize['size'] numOutputs = outputSize['value'] for i in range(numOutputs): tmp = { 'buffer': data, 'offset' : offset} self.outputs.append(bitcoinOutput(tmp)) offset = tmp['offset'] if self.witness: self.witnessScript = data[offset : len(data) - 4] self.lockTime = data[len(data) - 4:] else: self.lockTime = data[offset:offset + 4] def serialize(self, skipOutputLocktime=False, skipWitness=False): if skipWitness or (not self.witness): useWitness = False else: useWitness = True result = [] result.extend(self.version) if useWitness: result.append(0x00) result.append(0x01) writeVarint(len(self.inputs), result) for trinput in self.inputs: result.extend(trinput.serialize()) if not skipOutputLocktime: writeVarint(len(self.outputs), result) for troutput in self.outputs: result.extend(troutput.serialize()) if useWitness: result.extend(self.witnessScript) result.extend(self.lockTime) return result def serializeOutputs(self): result = [] writeVarint(len(self.outputs), result) for troutput in self.outputs: result.extend(troutput.serialize()) return result def __str__(self): buf = "Version : " + hexlify(self.version) + "\r\n" index = 1 for trinput in self.inputs: buf += "Input #" + str(index) + "\r\n" buf += str(trinput) index+=1 index = 1 for troutput in self.outputs: buf += "Output #" + str(index) + "\r\n" buf += str(troutput) index+=1 buf += "Locktime : " + hexlify(self.lockTime) + "\r\n" if self.witness: buf += "Witness script : " + hexlify(self.witnessScript) + "\r\n" return buf PK!+&hwilib/devices/btchip/bitcoinVarint.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ from .btchipException import BTChipException def readVarint(buffer, offset): varintSize = 0 value = 0 if (buffer[offset] < 0xfd): value = buffer[offset] varintSize = 1 elif (buffer[offset] == 0xfd): value = (buffer[offset + 2] << 8) | (buffer[offset + 1]) varintSize = 3 elif (buffer[offset] == 0xfe): value = (buffer[offset + 4] << 24) | (buffer[offset + 3] << 16) | (buffer[offset + 2] << 8) | (buffer[offset + 1]) varintSize = 5 else: raise BTChipException("unsupported varint") return { "value": value, "size": varintSize } def writeVarint(value, buffer): if (value < 0xfd): buffer.append(value) elif (value <= 0xffff): buffer.append(0xfd) buffer.append(value & 0xff) buffer.append((value >> 8) & 0xff) elif (value <= 0xffffffff): buffer.append(0xfe) buffer.append(value & 0xff) buffer.append((value >> 8) & 0xff) buffer.append((value >> 16) & 0xff) buffer.append((value >> 24) & 0xff) else: raise BTChipException("unsupported encoding") return buffer def getVarintSize(value): if (value < 0xfd): return 1 elif (value <= 0xffff): return 3 elif (value <= 0xffffffff): return 5 else: raise BTChipException("unsupported encoding") PK!I99hwilib/devices/btchip/btchip.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ from .btchipComm import * from .bitcoinTransaction import * from .bitcoinVarint import * from .btchipException import * from .btchipHelpers import * from binascii import hexlify, unhexlify class btchip: BTCHIP_CLA = 0xe0 BTCHIP_JC_EXT_CLA = 0xf0 BTCHIP_INS_SET_ALTERNATE_COIN_VERSION = 0x14 BTCHIP_INS_SETUP = 0x20 BTCHIP_INS_VERIFY_PIN = 0x22 BTCHIP_INS_GET_OPERATION_MODE = 0x24 BTCHIP_INS_SET_OPERATION_MODE = 0x26 BTCHIP_INS_SET_KEYMAP = 0x28 BTCHIP_INS_SET_COMM_PROTOCOL = 0x2a BTCHIP_INS_GET_WALLET_PUBLIC_KEY = 0x40 BTCHIP_INS_GET_TRUSTED_INPUT = 0x42 BTCHIP_INS_HASH_INPUT_START = 0x44 BTCHIP_INS_HASH_INPUT_FINALIZE = 0x46 BTCHIP_INS_HASH_SIGN = 0x48 BTCHIP_INS_HASH_INPUT_FINALIZE_FULL = 0x4a BTCHIP_INS_GET_INTERNAL_CHAIN_INDEX = 0x4c BTCHIP_INS_SIGN_MESSAGE = 0x4e BTCHIP_INS_GET_TRANSACTION_LIMIT = 0xa0 BTCHIP_INS_SET_TRANSACTION_LIMIT = 0xa2 BTCHIP_INS_IMPORT_PRIVATE_KEY = 0xb0 BTCHIP_INS_GET_PUBLIC_KEY = 0xb2 BTCHIP_INS_DERIVE_BIP32_KEY = 0xb4 BTCHIP_INS_SIGNVERIFY_IMMEDIATE = 0xb6 BTCHIP_INS_GET_RANDOM = 0xc0 BTCHIP_INS_GET_ATTESTATION = 0xc2 BTCHIP_INS_GET_FIRMWARE_VERSION = 0xc4 BTCHIP_INS_COMPOSE_MOFN_ADDRESS = 0xc6 BTCHIP_INS_GET_POS_SEED = 0xca BTCHIP_INS_EXT_GET_HALF_PUBLIC_KEY = 0x20 BTCHIP_INS_EXT_CACHE_PUT_PUBLIC_KEY = 0x22 BTCHIP_INS_EXT_CACHE_HAS_PUBLIC_KEY = 0x24 BTCHIP_INS_EXT_CACHE_GET_FEATURES = 0x26 OPERATION_MODE_WALLET = 0x01 OPERATION_MODE_RELAXED_WALLET = 0x02 OPERATION_MODE_SERVER = 0x04 OPERATION_MODE_DEVELOPER = 0x08 FEATURE_UNCOMPRESSED_KEYS = 0x01 FEATURE_RFC6979 = 0x02 FEATURE_FREE_SIGHASHTYPE = 0x04 FEATURE_NO_2FA_P2SH = 0x08 QWERTY_KEYMAP = bytearray(unhexlify("000000000000000000000000760f00d4ffffffc7000000782c1e3420212224342627252e362d3738271e1f202122232425263333362e37381f0405060708090a0b0c0d0e0f101112131415161718191a1b1c1d2f3130232d350405060708090a0b0c0d0e0f101112131415161718191a1b1c1d2f313035")) QWERTZ_KEYMAP = bytearray(unhexlify("000000000000000000000000760f00d4ffffffc7000000782c1e3420212224342627252e362d3738271e1f202122232425263333362e37381f0405060708090a0b0c0d0e0f101112131415161718191a1b1d1c2f3130232d350405060708090a0b0c0d0e0f101112131415161718191a1b1d1c2f313035")) AZERTY_KEYMAP = bytearray(unhexlify("08000000010000200100007820c8ffc3feffff07000000002c38202030341e21222d352e102e3637271e1f202122232425263736362e37101f1405060708090a0b0c0d0e0f331112130415161718191d1b1c1a2f64302f2d351405060708090a0b0c0d0e0f331112130415161718191d1b1c1a2f643035")) def __init__(self, dongle): self.dongle = dongle self.needKeyCache = False try: firmware = self.getFirmwareVersion()['version'] self.multiOutputSupported = tuple(map(int, (firmware.split(".")))) >= (1, 1, 4) if self.multiOutputSupported: self.scriptBlockLength = 50 else: self.scriptBlockLength = 255 except: pass def getWalletPublicKey(self, path, showOnScreen=False, segwit=False, segwitNative=False, cashAddr=False): result = {} donglePath = parse_bip32_path(path) if self.needKeyCache: self.resolvePublicKeysInPath(path) apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_WALLET_PUBLIC_KEY, 0x01 if showOnScreen else 0x00, 0x03 if cashAddr else 0x02 if segwitNative else 0x01 if segwit else 0x00, len(donglePath) ] apdu.extend(donglePath) response = self.dongle.exchange(bytearray(apdu)) offset = 0 result['publicKey'] = response[offset + 1 : offset + 1 + response[offset]] offset = offset + 1 + response[offset] result['address'] = str(response[offset + 1 : offset + 1 + response[offset]]) offset = offset + 1 + response[offset] result['chainCode'] = response[offset : offset + 32] return result def getTrustedInput(self, transaction, index): result = {} # Header apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x00, 0x00 ] params = bytearray.fromhex("%.8x" % (index)) params.extend(transaction.version) writeVarint(len(transaction.inputs), params) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) # Each input for trinput in transaction.inputs: apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00 ] params = bytearray(trinput.prevOut) writeVarint(len(trinput.script), params) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) offset = 0 while True: blockLength = 251 if ((offset + blockLength) < len(trinput.script)): dataLength = blockLength else: dataLength = len(trinput.script) - offset params = bytearray(trinput.script[offset : offset + dataLength]) if ((offset + dataLength) == len(trinput.script)): params.extend(trinput.sequence) apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00, len(params) ] apdu.extend(params) self.dongle.exchange(bytearray(apdu)) offset += dataLength if (offset >= len(trinput.script)): break # Number of outputs apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00 ] params = [] writeVarint(len(transaction.outputs), params) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) # Each output indexOutput = 0 for troutput in transaction.outputs: apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00 ] params = bytearray(troutput.amount) writeVarint(len(troutput.script), params) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) offset = 0 while (offset < len(troutput.script)): blockLength = 255 if ((offset + blockLength) < len(troutput.script)): dataLength = blockLength else: dataLength = len(troutput.script) - offset apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00, dataLength ] apdu.extend(troutput.script[offset : offset + dataLength]) self.dongle.exchange(bytearray(apdu)) offset += dataLength # Locktime apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_TRUSTED_INPUT, 0x80, 0x00, len(transaction.lockTime) ] apdu.extend(transaction.lockTime) response = self.dongle.exchange(bytearray(apdu)) result['trustedInput'] = True result['value'] = response return result def startUntrustedTransaction(self, newTransaction, inputIndex, outputList, redeemScript, version=0x01, cashAddr=False): # Start building a fake transaction with the passed inputs segwit = False if newTransaction: for passedOutput in outputList: if ('witness' in passedOutput) and passedOutput['witness']: segwit = True break if newTransaction: if segwit: p2 = 0x03 if cashAddr else 0x02 else: p2 = 0x00 else: p2 = 0x80 apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_START, 0x00, p2 ] params = bytearray([version, 0x00, 0x00, 0x00]) writeVarint(len(outputList), params) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) # Loop for each input currentIndex = 0 for passedOutput in outputList: if ('sequence' in passedOutput) and passedOutput['sequence']: sequence = bytearray(unhexlify(passedOutput['sequence'])) else: sequence = bytearray([0xFF, 0xFF, 0xFF, 0xFF]) # default sequence apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_START, 0x80, 0x00 ] params = [] script = bytearray(redeemScript) if ('witness' in passedOutput) and passedOutput['witness']: params.append(0x02) elif ('trustedInput' in passedOutput) and passedOutput['trustedInput']: params.append(0x01) else: params.append(0x00) if ('trustedInput' in passedOutput) and passedOutput['trustedInput']: params.append(len(passedOutput['value'])) params.extend(passedOutput['value']) if currentIndex != inputIndex: script = bytearray() writeVarint(len(script), params) if len(script) == 0: params.extend(sequence) apdu.append(len(params)) apdu.extend(params) self.dongle.exchange(bytearray(apdu)) offset = 0 while(offset < len(script)): blockLength = 255 if ((offset + blockLength) < len(script)): dataLength = blockLength else: dataLength = len(script) - offset params = script[offset : offset + dataLength] if ((offset + dataLength) == len(script)): params.extend(sequence) apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_START, 0x80, 0x00, len(params) ] apdu.extend(params) self.dongle.exchange(bytearray(apdu)) offset += blockLength currentIndex += 1 def finalizeInput(self, outputAddress, amount, fees, changePath, rawTx=None): alternateEncoding = False donglePath = parse_bip32_path(changePath) if self.needKeyCache: self.resolvePublicKeysInPath(changePath) result = {} outputs = None if rawTx is not None: try: fullTx = bitcoinTransaction(bytearray(rawTx)) outputs = fullTx.serializeOutputs() if len(donglePath) != 0: apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_FINALIZE_FULL, 0xFF, 0x00 ] params = [] params.extend(donglePath) apdu.append(len(params)) apdu.extend(params) response = self.dongle.exchange(bytearray(apdu)) offset = 0 while (offset < len(outputs)): blockLength = self.scriptBlockLength if ((offset + blockLength) < len(outputs)): dataLength = blockLength p1 = 0x00 else: dataLength = len(outputs) - offset p1 = 0x80 apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_FINALIZE_FULL, \ p1, 0x00, dataLength ] apdu.extend(outputs[offset : offset + dataLength]) response = self.dongle.exchange(bytearray(apdu)) offset += dataLength alternateEncoding = True except: pass if not alternateEncoding: apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_INPUT_FINALIZE, 0x02, 0x00 ] params = [] params.append(len(outputAddress)) params.extend(bytearray(outputAddress)) writeHexAmountBE(btc_to_satoshi(str(amount)), params) writeHexAmountBE(btc_to_satoshi(str(fees)), params) params.extend(donglePath) apdu.append(len(params)) apdu.extend(params) response = self.dongle.exchange(bytearray(apdu)) result['confirmationNeeded'] = response[1 + response[0]] != 0x00 result['confirmationType'] = response[1 + response[0]] if result['confirmationType'] == 0x02: result['keycardData'] = response[1 + response[0] + 1:] if result['confirmationType'] == 0x03: offset = 1 + response[0] + 1 keycardDataLength = response[offset] offset = offset + 1 result['keycardData'] = response[offset : offset + keycardDataLength] offset = offset + keycardDataLength result['secureScreenData'] = response[offset:] if result['confirmationType'] == 0x04: offset = 1 + response[0] + 1 keycardDataLength = response[offset] result['keycardData'] = response[offset + 1 : offset + 1 + keycardDataLength] if outputs == None: result['outputData'] = response[1 : 1 + response[0]] else: result['outputData'] = outputs return result def untrustedHashSign(self, path, pin="", lockTime=0, sighashType=0x01): if isinstance(pin, str): pin = pin.encode('utf-8') donglePath = parse_bip32_path(path) if self.needKeyCache: self.resolvePublicKeysInPath(path) apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_HASH_SIGN, 0x00, 0x00 ] params = [] params.extend(donglePath) params.append(len(pin)) params.extend(bytearray(pin)) writeUint32BE(lockTime, params) params.append(sighashType) apdu.append(len(params)) apdu.extend(params) result = self.dongle.exchange(bytearray(apdu)) result[0] = 0x30 return result def signMessagePrepareV2(self, path, message): donglePath = parse_bip32_path(path) if self.needKeyCache: self.resolvePublicKeysInPath(path) result = {} offset = 0 encryptedOutputData = b"" while (offset < len(message)): params = []; if offset == 0: params.extend(donglePath) params.append((len(message) >> 8) & 0xff) params.append(len(message) & 0xff) p2 = 0x01 else: p2 = 0x80 blockLength = 255 - len(params) if ((offset + blockLength) < len(message)): dataLength = blockLength else: dataLength = len(message) - offset params.extend(bytearray(message[offset : offset + dataLength])) apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_SIGN_MESSAGE, 0x00, p2 ] apdu.append(len(params)) apdu.extend(params) response = self.dongle.exchange(bytearray(apdu)) encryptedOutputData = encryptedOutputData + response[1 : 1 + response[0]] offset += blockLength result['confirmationNeeded'] = response[1 + response[0]] != 0x00 result['confirmationType'] = response[1 + response[0]] if result['confirmationType'] == 0x03: offset = 1 + response[0] + 1 result['secureScreenData'] = response[offset:] result['encryptedOutputData'] = encryptedOutputData return result def signMessagePrepare(self, path, message): try: result = self.signMessagePrepareV2(path, message) except BTChipException as e: if (e.sw == 0x6b00): # Old firmware version, try older method result = self.signMessagePrepareV1(path, message) else: raise return result def signMessageSign(self, pin=""): if isinstance(pin, str): pin = pin.encode('utf-8') apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_SIGN_MESSAGE, 0x80, 0x00 ] params = [] if pin is not None: params.append(len(pin)) params.extend(bytearray(pin)) else: params.append(0x00) apdu.append(len(params)) apdu.extend(params) response = self.dongle.exchange(bytearray(apdu)) return response def getFirmwareVersion(self): result = {} apdu = [ self.BTCHIP_CLA, self.BTCHIP_INS_GET_FIRMWARE_VERSION, 0x00, 0x00, 0x00 ] try: response = self.dongle.exchange(bytearray(apdu)) except BTChipException as e: if (e.sw == 0x6985): response = [0x00, 0x00, 0x01, 0x04, 0x03 ] pass else: raise result['compressedKeys'] = (response[0] == 0x01) result['version'] = "%d.%d.%d" % (response[2], response[3], response[4]) result['specialVersion'] = response[1] return result PK!55#hwilib/devices/btchip/btchipComm.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ from abc import ABCMeta, abstractmethod from .btchipException import * from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU from binascii import hexlify import time import os import struct import socket try: import hid HID = True except ImportError: HID = False try: from smartcard.Exceptions import NoCardException from smartcard.System import readers from smartcard.util import toHexString, toBytes SCARD = True except ImportError: SCARD = False class DongleWait(object): __metaclass__ = ABCMeta @abstractmethod def waitFirstResponse(self, timeout): pass class Dongle(object): __metaclass__ = ABCMeta @abstractmethod def exchange(self, apdu, timeout=20000): pass @abstractmethod def close(self): pass def setWaitImpl(self, waitImpl): self.waitImpl = waitImpl class HIDDongleHIDAPI(Dongle, DongleWait): def __init__(self, device, ledger=False, debug=False): self.device = device self.ledger = ledger self.debug = debug self.waitImpl = self self.opened = True def exchange(self, apdu, timeout=20000): if self.debug: print("=> %s" % hexlify(apdu)) if self.ledger: apdu = wrapCommandAPDU(0x0101, apdu, 64) padSize = len(apdu) % 64 tmp = apdu if padSize != 0: tmp.extend([0] * (64 - padSize)) offset = 0 while(offset != len(tmp)): data = tmp[offset:offset + 64] data = bytearray([0]) + data self.device.write(data) offset += 64 dataLength = 0 dataStart = 2 result = self.waitImpl.waitFirstResponse(timeout) if not self.ledger: if result[0] == 0x61: # 61xx : data available self.device.set_nonblocking(False) dataLength = result[1] dataLength += 2 if dataLength > 62: remaining = dataLength - 62 while(remaining != 0): if remaining > 64: blockLength = 64 else: blockLength = remaining result.extend(bytearray(self.device.read(65))[0:blockLength]) remaining -= blockLength swOffset = dataLength dataLength -= 2 self.device.set_nonblocking(True) else: swOffset = 0 else: self.device.set_nonblocking(False) while True: response = unwrapResponseAPDU(0x0101, result, 64) if response is not None: result = response dataStart = 0 swOffset = len(response) - 2 dataLength = len(response) - 2 self.device.set_nonblocking(True) break result.extend(bytearray(self.device.read(65))) sw = (result[swOffset] << 8) + result[swOffset + 1] response = result[dataStart : dataLength + dataStart] if self.debug: print("<= %s%.2x" % (hexlify(response), sw)) if sw != 0x9000: raise BTChipException("Invalid status %04x" % sw, sw) return response def waitFirstResponse(self, timeout): start = time.time() data = "" while len(data) == 0: data = self.device.read(65) if not len(data): if time.time() - start > timeout: raise BTChipException("Timeout") time.sleep(0.02) return bytearray(data) def close(self): if self.opened: try: self.device.close() except: pass self.opened = False PK!d9(hwilib/devices/btchip/btchipException.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ class BTChipException(Exception): def __init__(self, message, sw=0x6f00): self.message = message self.sw = sw def __str__(self): buf = "Exception : " + self.message return buf PK!.~  &hwilib/devices/btchip/btchipHelpers.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ import decimal import re # from pycoin SATOSHI_PER_COIN = decimal.Decimal(1e8) COIN_PER_SATOSHI = decimal.Decimal(1)/SATOSHI_PER_COIN def satoshi_to_btc(satoshi_count): if satoshi_count == 0: return decimal.Decimal(0) r = satoshi_count * COIN_PER_SATOSHI return r.normalize() def btc_to_satoshi(btc): return int(decimal.Decimal(btc) * SATOSHI_PER_COIN) # /from pycoin def writeUint32BE(value, buffer): buffer.append((value >> 24) & 0xff) buffer.append((value >> 16) & 0xff) buffer.append((value >> 8) & 0xff) buffer.append(value & 0xff) return buffer def writeUint32LE(value, buffer): buffer.append(value & 0xff) buffer.append((value >> 8) & 0xff) buffer.append((value >> 16) & 0xff) buffer.append((value >> 24) & 0xff) return buffer def writeHexAmount(value, buffer): buffer.append(value & 0xff) buffer.append((value >> 8) & 0xff) buffer.append((value >> 16) & 0xff) buffer.append((value >> 24) & 0xff) buffer.append((value >> 32) & 0xff) buffer.append((value >> 40) & 0xff) buffer.append((value >> 48) & 0xff) buffer.append((value >> 56) & 0xff) return buffer def writeHexAmountBE(value, buffer): buffer.append((value >> 56) & 0xff) buffer.append((value >> 48) & 0xff) buffer.append((value >> 40) & 0xff) buffer.append((value >> 32) & 0xff) buffer.append((value >> 24) & 0xff) buffer.append((value >> 16) & 0xff) buffer.append((value >> 8) & 0xff) buffer.append(value & 0xff) return buffer def parse_bip32_path(path): if len(path) == 0: return bytearray([ 0 ]) result = [] elements = path.split('/') if len(elements) > 10: raise BTChipException("Path too long") for pathElement in elements: element = re.split('\'|h|H', pathElement) if len(element) == 1: writeUint32BE(int(element[0]), result) else: writeUint32BE(0x80000000 | int(element[0]), result) return bytearray([ len(elements) ] + result) PK!Tb`)X X $hwilib/devices/btchip/btchipUtils.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ from .btchipException import * from .bitcoinTransaction import * from .btchipHelpers import * def compress_public_key(publicKey): if publicKey[0] == 0x04: if (publicKey[64] & 1) != 0: prefix = 0x03 else: prefix = 0x02 result = [prefix] result.extend(publicKey[1:33]) return bytearray(result) elif publicKey[0] == 0x03 or publicKey[0] == 0x02: return publicKey else: raise BTChipException("Invalid public key format") def format_transaction(dongleOutputData, trustedInputsAndInputScripts, version=0x01, lockTime=0): transaction = bitcoinTransaction() transaction.version = [] writeUint32LE(version, transaction.version) for item in trustedInputsAndInputScripts: newInput = bitcoinInput() newInput.prevOut = item[0][4:4+36] newInput.script = item[1] if len(item) > 2: newInput.sequence = bytearray(item[2].decode('hex')) else: newInput.sequence = bytearray([0xff, 0xff, 0xff, 0xff]) transaction.inputs.append(newInput) result = transaction.serialize(True) result.extend(dongleOutputData) writeUint32LE(lockTime, result) return bytearray(result) def get_regular_input_script(sigHashtype, publicKey): if len(sigHashtype) >= 0x4c: raise BTChipException("Invalid sigHashtype") if len(publicKey) >= 0x4c: raise BTChipException("Invalid publicKey") result = [ len(sigHashtype) ] result.extend(sigHashtype) result.append(len(publicKey)) result.extend(publicKey) return bytearray(result) def write_pushed_data_size(data, buffer): if (len(data) > 0xffff): raise BTChipException("unsupported encoding") if (len(data) < 0x4c): buffer.append(len(data)) elif (len(data) > 255): buffer.append(0x4d) buffer.append(len(data) & 0xff) buffer.append((len(data) >> 8) & 0xff) else: buffer.append(0x4c) buffer.append(len(data)) return buffer def get_p2sh_input_script(redeemScript, sigHashtypeList): result = [ 0x00 ] for sigHashtype in sigHashtypeList: write_pushed_data_size(sigHashtype, result) result.extend(sigHashtype) write_pushed_data_size(redeemScript, result) result.extend(redeemScript) return bytearray(result) def get_p2pk_input_script(sigHashtype): if len(sigHashtype) >= 0x4c: raise BTChipException("Invalid sigHashtype") result = [ len(sigHashtype) ] result.extend(sigHashtype) return bytearray(result) def get_output_script(amountScriptArray): result = [ len(amountScriptArray) ] for amountScript in amountScriptArray: writeHexAmount(btc_to_satoshi(str(amountScript[0])), result) writeVarint(len(amountScript[1]), result) result.extend(amountScript[1]) return bytearray(result) PK!]4 &hwilib/devices/btchip/ledgerWrapper.py""" ******************************************************************************* * BTChip Bitcoin Hardware Wallet Python API * (c) 2014 BTChip - 1BTChip7VfTnrPra5jqci7ejnMguuHogTn * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************** """ import struct from .btchipException import BTChipException def wrapCommandAPDU(channel, command, packetSize): if packetSize < 3: raise BTChipException("Can't handle Ledger framing with less than 3 bytes for the report") sequenceIdx = 0 offset = 0 result = struct.pack(">HBHH", channel, 0x05, sequenceIdx, len(command)) sequenceIdx = sequenceIdx + 1 if len(command) > packetSize - 7: blockSize = packetSize - 7 else: blockSize = len(command) result += command[offset : offset + blockSize] offset = offset + blockSize while offset != len(command): result += struct.pack(">HBH", channel, 0x05, sequenceIdx) sequenceIdx = sequenceIdx + 1 if (len(command) - offset) > packetSize - 5: blockSize = packetSize - 5 else: blockSize = len(command) - offset result += command[offset : offset + blockSize] offset = offset + blockSize while (len(result) % packetSize) != 0: result += b"\x00" return bytearray(result) def unwrapResponseAPDU(channel, data, packetSize): sequenceIdx = 0 offset = 0 if ((data is None) or (len(data) < 7 + 5)): return None if struct.unpack(">H", data[offset : offset + 2])[0] != channel: raise BTChipException("Invalid channel") offset += 2 if data[offset] != 0x05: raise BTChipException("Invalid tag") offset += 1 if struct.unpack(">H", data[offset : offset + 2])[0] != sequenceIdx: raise BTChipException("Invalid sequence") offset += 2 responseLength = struct.unpack(">H", data[offset : offset + 2])[0] offset += 2 if len(data) < 7 + responseLength: return None if responseLength > packetSize - 7: blockSize = packetSize - 7 else: blockSize = responseLength result = data[offset : offset + blockSize] offset += blockSize while (len(result) != responseLength): sequenceIdx = sequenceIdx + 1 if (offset == len(data)): return None if struct.unpack(">H", data[offset : offset + 2])[0] != channel: raise BTChipException("Invalid channel") offset += 2 if data[offset] != 0x05: raise BTChipException("Invalid tag") offset += 1 if struct.unpack(">H", data[offset : offset + 2])[0] != sequenceIdx: raise BTChipException("Invalid sequence") offset += 2 if (responseLength - len(result)) > packetSize - 5: blockSize = packetSize - 5 else: blockSize = responseLength - len(result) result += data[offset : offset + blockSize] offset += blockSize return bytearray(result) PK!uJJhwilib/devices/ckcc/__init__.py __version__ = '0.7.2' __all__ = [ "client", "protocol", "constants" ] PK!'B22hwilib/devices/ckcc/client.py# # client.py # # Implement the desktop side of our Coldcard USB protocol. # # If you would like to use a different EC/AES library, you may subclass # and override these member functions: # # - ec_mult, ec_setup, aes_setup, mitm_verify # import hid, sys, os, platform from binascii import b2a_hex, a2b_hex from hashlib import sha256 from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN, MAX_BLK_LEN from .utils import decode_xpub, get_pubkey_string # unofficial, unpermissioned... USB numbers COINKITE_VID = 0xd13e CKCC_PID = 0xcc10 # Unix domain socket used by the simulator CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock' class ColdcardDevice: def __init__(self, sn=None, dev=None, encrypt=True): # Establish connection via USB (HID) or Unix Pipe self.is_simulator = False if not dev and sn and '/' in sn: if platform.system() == 'Windows': raise RuntimeError("Cannot connect to simulator. Is it running?") dev = UnixSimulatorPipe(sn) found = 'simulator' self.is_simulator = True if not dev: for info in hid.enumerate(COINKITE_VID, CKCC_PID): found = info['serial_number'] if sn and sn != found: continue # only one interface per device, so only one 'path' dev = hid.device(serial=found) assert dev, "failed to open: "+found dev.open_path(info['path']) break if not dev: raise KeyError("Could not find Coldcard!" if not sn else ('Cannot find CC with serial: '+sn)) else: found = dev.get_serial_number_string() self.dev = dev self.serial = found # they will be defined after we've established a shared secret w/ device self.session_key = None self.encrypt_request = None self.decrypt_response = None self.master_xpub = None self.master_fingerprint = None self.resync() if encrypt: self.start_encryption() def close(self): # close underlying HID device if self.dev: self.dev.close() self.dev = None def resync(self): # flush anything already waiting on the EP while 1: junk = self.dev.read(64, timeout_ms=1) if not junk: break # write a special packet, that encodes zero-length data, and last packet in sequence # prefix with 0x00 for "report number" self.dev.write(b'\x00\x80' + (b'\xff'*63)) # flush any response (perhaps error) waiting on the EP while 1: junk = self.dev.read(64, timeout_ms=1) if not junk: break # check the above all worked err = self.dev.error() if err != '': raise RuntimeError('hidapi: '+err) assert self.dev.get_serial_number_string() == self.serial def send_recv(self, msg, expect_errors=False, verbose=0, timeout=1000, encrypt=True): # first byte of each 64-byte packet encodes length or packet-offset assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg) if not self.encrypt_request: # disable encryption if not already enabled for this connection encrypt = False if encrypt: msg = self.encrypt_request(msg) left = len(msg) offset = 0 while left > 0: # Note: first byte always zero (HID report number), # [1] is framing header (length+flags) # [2:65] payload (63 bytes, perhaps including padding) here = min(63, left) buf = bytearray(65) buf[2:2+here] = msg[offset:offset+here] if here == left: # final one in sequence buf[1] = here | 0x80 | (0x40 if encrypt else 0x00) else: # more will be coming buf[1] = here assert len(buf) == 65 if verbose: print("Tx [%2d]: %s (0x%x)" % (here, b2a_hex(buf[1:]), buf[1])) rv = self.dev.write(buf) assert rv == len(buf) == 65, repr(rv) offset += here left -= here # collect response, framed in the same manner resp = b'' while 1: buf = self.dev.read(64, timeout_ms=(timeout or 0)) assert buf, "timeout reading USB EP" # (trusting more than usual here) flag = buf[0] resp += bytes(buf[1:1+(flag & 0x3f)]) if flag & 0x80: break if flag & 0x40: if verbose: print('Enc response: %s' % b2a_hex(resp)) resp = self.decrypt_response(resp) try: if verbose: print("Rx [%2d]: %r" % (len(resp), b2a_hex(bytes(resp)))) return CCProtocolUnpacker.decode(resp) except CCProtoError as e: if expect_errors: raise raise except: #print("Corrupt response: %r" % resp) raise def ec_setup(self): # Provides the ECSDA primatives in portable way. # Needed to do D-H session key aggreement and then AES. # - should be replaced in subclasses if you have other EC libraries # - curve is always secp256k1 # - values are binary strings # - write whatever you want onto self. # - setup: return 65 of public key, and 16 bytes of AES IV # - second call: give the pubkey of far side, calculate the shared pt on curve from ecdsa.curves import SECP256k1 from ecdsa import SigningKey self.my_key = SigningKey.generate(curve=SECP256k1, hashfunc=sha256) pubkey = self.my_key.get_verifying_key().to_string() assert len(pubkey) == 64 #print("my pubkey = %s" % b2a_hex(pubkey)) return pubkey def ec_mult(self, his_pubkey): # - second call: given the pubkey of far side, calculate the shared pt on curve # - creates session key based on that from ecdsa.curves import SECP256k1 from ecdsa import VerifyingKey from ecdsa.util import number_to_string # Validate his pubkey a little: this call will check it's on the curve. assert len(his_pubkey) == 64 his_pubkey = VerifyingKey.from_string(his_pubkey, curve=SECP256k1, hashfunc=sha256) #print("his pubkey = %s" % b2a_hex(his_pubkey.to_string())) # do the D-H thing pt = self.my_key.privkey.secret_multiplier * his_pubkey.pubkey.point # final key is sha256 of that point, serialized (64 bytes). order = SECP256k1.order kk = number_to_string(pt.x(), order) + number_to_string(pt.y(), order) del self.my_key return sha256(kk).digest() def aes_setup(self, session_key): # Load keys and define encrypt/decrypt functions # - for CTR mode, we have different counters in each direction, so need two instances # - count must start at zero, and increment in LSB for each block. import pyaes self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt def start_encryption(self): # setup encryption on the link # - pick our own key pair, IV for AES # - send IV and pubkey to device # - it replies with own pubkey # - determine what the session key was/is pubkey = self.ec_setup() msg = CCProtocolPacker.encrypt_start(pubkey) his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False) self.session_key = self.ec_mult(his_pubkey) # capture some public details of remote side's master key # - these can be empty/0x0 when no secrets on device yet self.master_xpub = str(xpub, 'ascii') self.master_fingerprint = fingerprint #print('sess key = %s' % b2a_hex(self.session_key)) self.aes_setup(self.session_key) def mitm_verify(self, sig, expected_xpub): # First try with Pycoin try: from pycoin.key.BIP32Node import BIP32Node from pycoin.contrib.msg_signing import verify_message from pycoin.encoding import from_bytes_32 from base64 import b64encode mk = BIP32Node.from_wallet_key(expected_xpub) return verify_message(mk, b64encode(sig), msg_hash=from_bytes_32(self.session_key)) except ImportError: pass # If Pycoin is not available, do it using ecdsa from ecdsa import BadSignatureError, SECP256k1, VerifyingKey pubkey, chaincode = decode_xpub(expected_xpub) vk = VerifyingKey.from_string(get_pubkey_string(pubkey), curve=SECP256k1) try: ok = vk.verify_digest(sig[1:], self.session_key) except BadSignatureError: ok = False return ok def check_mitm(self, expected_xpub=None, sig=None): # Optional? verification against MiTM attack: # Using the master xpub, check a signature over the session public key, to # verify we talking directly to the real Coldcard (no active MitM between us). # - message is just the session key itself; no digests or prefixes # - no need for this unless concerned about *active* mitm on USB bus # - passive attackers (snoopers) will get nothing anyway, thanks to diffie-helman sauce # - unfortunately might be too slow to do everytime? xp = expected_xpub or self.master_xpub assert xp, "device doesn't have any secrets yet" assert self.session_key, "connection not yet in encrypted mode" # this request is delibrately slow on the device side if not sig: sig = self.send_recv(CCProtocolPacker.check_mitm(), timeout=5000) assert len(sig) == 65 ok = self.mitm_verify(sig, xp) if ok != True: raise RuntimeError("Possible active MiTM attack in progress! Incorrect signature.") def upload_file(self, data, verify=True, blksize=1024): # upload a single file, up to 1MB? in size. Can check arrives ok. chk = sha256(data).digest() for i in range(0, len(data), blksize): here = data[i:i+blksize] pos = self.send_recv(CCProtocolPacker.upload(i, len(data), here)) assert pos == i if verify: rb = self.send_recv(CCProtocolPacker.sha256()) if rb != chk: raise RuntimeError('Checksum wrong during file upload') return len(data), chk def download_file(self, length, checksum, blksize=1024, file_number=1): # Download a single file, when you already know it's checksum. Will check arrives ok. data = b'' chk = sha256() pos = 0 while pos < length: here = self.send_recv(CCProtocolPacker.download(pos, min(blksize, length-pos), file_number)) data += here chk.update(here) pos += len(here) assert len(here) > 0 if chk.digest() != checksum: raise RuntimeError('Checksum wrong during file download') return data class UnixSimulatorPipe: # Use a UNIX pipe to the simulator instead of a real USB connection. # - emulates the API of hidapi device object. def __init__(self, path): import socket, atexit self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: self.pipe.connect(path) except FileNotFoundError: raise RuntimeError("Cannot connect to simulator. Is it running?") instance = 0 while instance < 10: pn = '/tmp/ckcc-client-%d-%d.sock' % (os.getpid(), instance) try: self.pipe.bind(pn) # just needs any name break except OSError: instance += 1 continue self.pipe_name = pn atexit.register(self.close) def read(self, max_count, timeout_ms=None): import socket if not timeout_ms: self.pipe.settimeout(None) else: self.pipe.settimeout(timeout_ms / 1000.0) try: return self.pipe.recv(max_count) except socket.timeout: return None def write(self, buf): assert len(buf) == 65 self.pipe.settimeout(10) rv = self.pipe.send(buf[1:]) return 65 if rv == 64 else rv def error(self): return '' def close(self): self.pipe.close() try: os.unlink(self.pipe_name) except: pass def get_serial_number_string(self): return 'simulator' # EOF PK!NU  hwilib/devices/ckcc/constants.py# # Constants and various "limits" shared between embedded and desktop USB protocol # try: from micropython import const except ImportError: const = int # For upload/download this is the max size of the data block. MAX_BLK_LEN = const(2048) # Max total message length, excluding framing overhead (1 byte per 64). # - includes args for upload command MAX_MSG_LEN = const(4+4+4+MAX_BLK_LEN) # Max PSBT txn we support (384k bytes as PSBT) # - the max on the wire for mainnet is 100k # - but a PSBT might contain a full txn for each input MAX_TXN_LEN = const(384*1024) # Max size of any upload (firmware.dfu files in particular) MAX_UPLOAD_LEN = const(2*MAX_TXN_LEN) # Max length of text messages for signing MSG_SIGNING_MAX_LENGTH = const(240) # Bit values for address types AFC_PUBKEY = const(0x01) # pay to hash of pubkey AFC_SEGWIT = const(0x02) # requires a witness to spend AFC_BECH32 = const(0x04) # just how we're encoding it? AFC_SCRIPT = const(0x08) # paying into a script AFC_WRAPPED = const(0x10) # for transition/compat types for segwit vs. old # Numeric codes for specific address types AF_CLASSIC = AFC_PUBKEY # 1addr AF_P2SH = AFC_SCRIPT # classic multisig / simple P2SH / 3hash AF_P2WPKH = AFC_PUBKEY | AFC_SEGWIT | AFC_BECH32 # bc1qsdklfj AF_P2WSH = AFC_SCRIPT | AFC_SEGWIT | AFC_BECH32 # segwit multisig AF_P2WPKH_P2SH = AFC_WRAPPED | AFC_PUBKEY | AFC_SEGWIT # looks classic P2SH, but p2wpkh inside AF_P2WSH_P2SH = AFC_WRAPPED | AFC_SCRIPT | AFC_SEGWIT # looks classic P2SH, segwit multisig SUPPORTED_ADDR_FORMATS = frozenset([ AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH, ]) # BIP-174 aka PSBT defined values # PSBT_GLOBAL_UNSIGNED_TX = const(0) PSBT_IN_NON_WITNESS_UTXO = const(0) PSBT_IN_WITNESS_UTXO = const(1) PSBT_IN_PARTIAL_SIG = const(2) PSBT_IN_SIGHASH_TYPE = const(3) PSBT_IN_REDEEM_SCRIPT = const(4) PSBT_IN_WITNESS_SCRIPT = const(5) PSBT_IN_BIP32_DERIVATION = const(6) PSBT_IN_FINAL_SCRIPTSIG = const(7) PSBT_IN_FINAL_SCRIPTWITNESS = const(8) PSBT_OUT_REDEEM_SCRIPT = const(0) PSBT_OUT_WITNESS_SCRIPT = const(1) PSBT_OUT_BIP32_DERIVATION = const(2) # EOF PK!}ڻhwilib/devices/ckcc/protocol.py# # Details of our USB level protocol. Shared file between desktop and embedded. # # - first 4 bytes of all messages is the command code or response code # - use H # from struct import pack, unpack_from from .constants import * class CCProtoError(RuntimeError): def __str__(self): return self.args[0] class CCUserRefused(RuntimeError): def __str__(self): return 'You refused permission to do the operation' class CCBusyError(RuntimeError): def __str__(self): return 'Coldcard is handling another request right now' class CCProtocolPacker: # returns a lamba that will take correct args # and then give you a binary string to encode the # request @staticmethod def logout(): return pack('4s', b'logo') @staticmethod def reboot(): return pack('4s', b'rebo') @staticmethod def version(): # returns a string, with newline separators return pack('4s', b'vers') @staticmethod def ping(msg): # returns whatever binary you give it return b'ping' + bytes(msg) @staticmethod def check_mitm(): return b'mitm' @staticmethod def start_backup(): # prompts user with password for encrytped backup return b'back' @staticmethod def encrypt_start(device_pubkey, version=0x1): assert len(device_pubkey) == 64, "want uncompressed 64-byte pubkey, no prefix byte" return pack('<4sI64s', b'ncry', version, device_pubkey) @staticmethod def upload(offset, total_size, data): # note: see MAX_MSG_LEN above assert len(data) <= MAX_MSG_LEN, 'badlen' return pack('<4sII', b'upld', offset, total_size) + data @staticmethod def download(offset, length, file_number=0): assert 0 <= file_number < 2 return pack('<4sIII', b'dwld', offset, length, file_number) @staticmethod def sha256(): return b'sha2' @staticmethod def sign_transaction(length, file_sha, finalize=False): # must have already uploaded binary, and give expected sha256 assert len(file_sha) == 32 return pack('<4sII32s', b'stxn', length, int(finalize), file_sha) @staticmethod def sign_message(raw_msg, subpath='m', addr_fmt=AF_CLASSIC): # only begins user interaction return pack('<4sIII', b'smsg', addr_fmt, len(subpath), len(raw_msg)) \ + subpath.encode('ascii') + raw_msg @staticmethod def get_signed_msg(): # poll completion/results of message signing return b'smok' @staticmethod def get_backup_file(): # poll completion/results of backup return b'bkok' @staticmethod def get_signed_txn(): # poll completion/results of transaction signing return b'stok' @staticmethod def get_xpub(subpath='m'): # takes a string, like: m/44'/0'/23/23 return b'xpub' + subpath.encode('ascii') @staticmethod def show_address(subpath, addr_fmt=AF_CLASSIC): # takes a string, like: m/44'/0'/23/23 # shows on screen, no feedback from user expected return pack('<4sI', b'show', addr_fmt) + subpath.encode('ascii') @staticmethod def sim_keypress(key): # Simulator ONLY: pretend a key is pressed return b'XKEY' + key @staticmethod def bag_number(new_number=b''): # one time only: put into bag, or readback bag return b'bagi' + bytes(new_number) class CCProtocolUnpacker: # Take a binary response, and turn it into a python object # - we support a number of signatures, and expand as needed # - some will be general-purpose, but others can be very specific to one command # - given full rx message to work from # - this is done after un-framing @classmethod def decode(cls, msg): assert len(msg) >= 4 sign = str(msg[0:4], 'utf8', 'ignore') d = getattr(cls, sign, cls) if d is cls: raise CCProtoError('Unknown response signature: ' + repr(sign)) return d(msg) # struct info for each response def okay(msg): # trivial response, w/ no content assert len(msg) == 4 return None # low-level errors def fram(msg): raise CCProtoError("Framing Error", str(msg[4:], 'utf8')) def err_(msg): raise CCProtoError("Remote Error: " + str(msg[4:], 'utf8', 'ignore'), msg[4:]) def refu(msg): # user didn't want to approve something raise CCUserRefused() def busy(msg): # user didn't want to approve something raise CCBusyError() def biny(msg): # binary string: length implied by msg framing return msg[4:] def int1(msg): return unpack_from('= 0x8008000, "Bad address?" yield fd.tell() yield elem.size # Adapted from https://github.com/petertodd/python-bitcoinlib/blob/master/bitcoin/base58.py def decode_xpub(s): assert s[1:].startswith('pub') b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' # Convert the string to an integer n = 0 for c in s: n *= 58 if c not in b58_digits: raise ValueError('Character %r is not a valid base58 character' % c) digit = b58_digits.index(c) n += digit # Convert the integer to bytes h = '%x' % n if len(h) % 2: h = '0' + h res = binascii.unhexlify(h.encode('utf8')) # Add padding back. pad = 0 for c in s[:-1]: if c == b58_digits[0]: pad += 1 else: break decoded = b'\x00' * pad + res # Get the pubkey and chaincode return decoded[-37:-4], decoded[-69:-37] def get_pubkey_string(b): p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F x = int.from_bytes(b[1:], byteorder="big") y = pow((x*x*x + 7) % p, (p + 1) // 4, p) if (y & 1 != b[0] & 1): y = p - y return x.to_bytes(32, byteorder="big") + y.to_bytes(32, byteorder="big") # EOF PK!7##hwilib/devices/coldcard.py# Coldcard interaction script from ..hwwclient import HardwareWalletClient from ..errors import ActionCanceledError, BadArgumentError, DeviceBusyError, UnavailableActionError, DeviceFailureError from .ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID from .ckcc.protocol import CCProtocolPacker, CCBusyError, CCProtoError, CCUserRefused from .ckcc.constants import MAX_BLK_LEN, AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH from ..base58 import xpub_main_2_test, get_xpub_fingerprint_hex from hashlib import sha256 import base64 import json import hid import io import sys import time CC_SIMULATOR_SOCK = '/tmp/ckcc-simulator.sock' # Using the simulator: https://github.com/Coldcard/firmware/blob/master/unix/README.md def coldcard_exception(f): def func(*args, **kwargs): try: return f(*args, **kwargs) except CCProtoError as e: raise BadArgumentError(str(e)) except CCUserRefused as e: raise ActionCanceledError('{} canceled'.format(f.__name__)) except CCBusyError as e: raise DeviceBusyError(str(e)) return func # This class extends the HardwareWalletClient for ColdCard specific things class ColdcardClient(HardwareWalletClient): def __init__(self, path, password=''): super(ColdcardClient, self).__init__(path, password) # Simulator hard coded pipe socket if path == CC_SIMULATOR_SOCK: self.device = ColdcardDevice(sn=path) else: device = hid.device() device.open_path(path.encode()) self.device = ColdcardDevice(dev=device) # Must return a dict with the xpub # Retrieves the public key at the specified BIP 32 derivation path @coldcard_exception def get_pubkey_at_path(self, path): self.device.check_mitm() path = path.replace('h', '\'') path = path.replace('H', '\'') xpub = self.device.send_recv(CCProtocolPacker.get_xpub(path), timeout=None) if self.is_testnet: return {'xpub':xpub_main_2_test(xpub)} else: return {'xpub':xpub} # Must return a hex string with the signed transaction # The tx must be in the combined unsigned transaction format @coldcard_exception def sign_tx(self, tx): self.device.check_mitm() # Get psbt in hex and then make binary fd = io.BytesIO(base64.b64decode(tx.serialize())) # learn size (portable way) offset = 0 sz = fd.seek(0, 2) fd.seek(0) left = sz chk = sha256() for pos in range(0, sz, MAX_BLK_LEN): here = fd.read(min(MAX_BLK_LEN, left)) if not here: break left -= len(here) result = self.device.send_recv(CCProtocolPacker.upload(pos, sz, here)) assert result == pos chk.update(here) # do a verify expect = chk.digest() result = self.device.send_recv(CCProtocolPacker.sha256()) assert len(result) == 32 if result != expect: raise DeviceFailureError("Wrong checksum:\nexpect: %s\n got: %s" % (b2a_hex(expect).decode('ascii'), b2a_hex(result).decode('ascii'))) # start the signing process ok = self.device.send_recv(CCProtocolPacker.sign_transaction(sz, expect), timeout=None) assert ok == None if self.device.is_simulator: self.device.send_recv(CCProtocolPacker.sim_keypress(b'y')) print("Waiting for OK on the Coldcard...", file=sys.stderr) while 1: time.sleep(0.250) done = self.device.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) if done == None: continue break if len(done) != 2: raise DeviceFailureError('Failed: %r' % done) result_len, result_sha = done result = self.device.download_file(result_len, result_sha, file_number=1) return {'psbt':base64.b64encode(result).decode()} # Must return a base64 encoded string with the signed message # The message can be any string. keypath is the bip 32 derivation path for the key to sign with @coldcard_exception def sign_message(self, message, keypath): self.device.check_mitm() keypath = keypath.replace('h', '\'') keypath = keypath.replace('H', '\'') ok = self.device.send_recv(CCProtocolPacker.sign_message(message.encode(), keypath, AF_CLASSIC), timeout=None) assert ok == None if self.device.is_simulator: self.device.send_recv(CCProtocolPacker.sim_keypress(b'y')) while 1: time.sleep(0.250) done = self.device.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) if done == None: continue break if len(done) != 2: raise DeviceFailureError('Failed: %r' % done) addr, raw = done sig = str(base64.b64encode(raw), 'ascii').replace('\n', '') return {"signature": sig} # Display address of specified type on the device. Only supports single-key based addresses. @coldcard_exception def display_address(self, keypath, p2sh_p2wpkh, bech32): self.device.check_mitm() keypath = keypath.replace('h', '\'') keypath = keypath.replace('H', '\'') if p2sh_p2wpkh: format = AF_P2WPKH_P2SH elif bech32: format = AF_P2WPKH else: format = AF_CLASSIC address = self.device.send_recv(CCProtocolPacker.show_address(keypath, format), timeout=None) if self.device.is_simulator: self.device.send_recv(CCProtocolPacker.sim_keypress(b'y')) return {'address': address} # Setup a new device def setup_device(self, label='', passphrase=''): raise UnavailableActionError('The Coldcard does not support software setup') # Wipe this device def wipe_device(self): raise UnavailableActionError('The Coldcard does not support wiping via software') # Restore device from mnemonic or xprv def restore_device(self, label=''): raise UnavailableActionError('The Coldcard does not support restoring via software') # Begin backup process @coldcard_exception def backup_device(self, label='', passphrase=''): self.device.check_mitm() ok = self.device.send_recv(CCProtocolPacker.start_backup()) assert ok == None if self.device.is_simulator: self.device.send_recv(CCProtocolPacker.sim_keypress(b'y')) while 1: if self.device.is_simulator: # For the simulator, work through the password quiz. Eventually pressing 1 will work self.device.send_recv(CCProtocolPacker.sim_keypress(b'1')) time.sleep(0.250) done = self.device.send_recv(CCProtocolPacker.get_backup_file(), timeout=None) if done == None: continue break if len(done) != 2: raise DeviceFailureError('Failed: %r' % done) result_len, result_sha = done result = self.device.download_file(result_len, result_sha, file_number=0) filename = time.strftime('backup-%Y%m%d-%H%M.7z') open(filename, 'wb').write(result) return {'success': True, 'message': 'The backup has been written to {}'.format(filename)} # Close the device def close(self): self.device.close() # Prompt pin def prompt_pin(self): raise UnavailableActionError('The Coldcard does not need a PIN sent from the host') # Send pin def send_pin(self, pin): raise UnavailableActionError('The Coldcard does not need a PIN sent from the host') def enumerate(password=''): results = [] for d in hid.enumerate(COINKITE_VID, CKCC_PID): d_data = {} path = d['path'].decode() d_data['type'] = 'coldcard' d_data['path'] = path client = None try: client = ColdcardClient(path) master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) except Exception as e: d_data['error'] = "Could not open client or get fingerprint information: " + str(e) if client: client.close() results.append(d_data) # Check if the simulator is there client = None try: client = ColdcardClient(CC_SIMULATOR_SOCK) master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data = {} d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) d_data['type'] = 'coldcard' d_data['path'] = CC_SIMULATOR_SOCK results.append(d_data) except RuntimeError as e: if str(e) == 'Cannot connect to simulator. Is it running?': pass else: raise e if client: client.close() return results PK!\ \ \hwilib/devices/digitalbitbox.py# Digital Bitbox interaction script import hid import struct import json import base64 import pyaes import hashlib import hmac import os import binascii import logging import socket import sys import time from ..hwwclient import HardwareWalletClient from ..errors import ActionCanceledError, BadArgumentError, DeviceFailureError, DeviceAlreadyInitError, DeviceNotReadyError, NoPasswordError, UnavailableActionError, DeviceFailureError from ..serializations import CTransaction, PSBT, hash256, hash160, ser_sig_der, ser_sig_compact, ser_compact_size from ..base58 import get_xpub_fingerprint, decode, to_address, xpub_main_2_test, get_xpub_fingerprint_hex applen = 225280 # flash size minus bootloader length chunksize = 8*512 usb_report_size = 64 # firmware > v2.0 report_buf_size = 4096 # firmware v2.0.0 boot_buf_size_send = 4098 boot_buf_size_reply = 256 HWW_CID = 0xFF000000 HWW_CMD = 0x80 + 0x40 + 0x01 DBB_VENDOR_ID = 0x03eb DBB_DEVICE_ID = 0x2402 # Errors codes from the device bad_args = [ 102, # The password length must be at least " STRINGIFY(PASSWORD_LEN_MIN) " characters. 103, # No input received. 104, # Invalid command. 105, # Only one command allowed at a time. 109, # JSON parse error. 204, # Invalid seed. 253, # Incorrect serialized pubkey length. A 33-byte hexadecimal value (66 characters) is expected. 254, # Incorrect serialized pubkey hash length. A 32-byte hexadecimal value (64 characters) is expected. 256, # Failed to pair with second factor, because the previously received hash of the public key does not match the computed hash of the public key. 300, # Incorrect pubkey length. A 33-byte hexadecimal value (66 characters) is expected. 301, # Incorrect hash length. A 32-byte hexadecimal value (64 characters) is expected. 304, # Incorrect TFA pin. 411, # Filenames limited to alphanumeric values, hyphens, and underscores. 412, # Please provide an encryption key. 112, # Device password matches reset password. Disabling reset password. 251, # Could not generate key. ] device_failures = [ 101, # Please set a password. 107, # Output buffer overflow. 200, # Seed creation requires an SD card for automatic encrypted backup of the seed. 250, # Master key not present. 252, # Could not generate ECDH secret. 303, # Could not sign. 400, # Please insert SD card. 401, # Could not mount the SD card. 402, # Could not open a file to write - it may already exist. 403, # Could not open the directory. 405, # Could not write the file. 407, # Could not read the file. 408, # May not have erased all files (or no file present). 410, # Backup file does not match wallet. 500, # Chip communication error. 501, # Could not read flash. 502, # Could not encrypt. 110, # Too many failed access attempts. Device reset. 111, # Device locked. Erase device to access this command. 113, # Due to many login attempts, the next login requires holding the touch button for 3 seconds. 900, # attempts remain before the device is reset. 901, # Ignored for non-embedded testing. 902, # Too many backup files to read. The list is truncated. 903, # attempts remain before the device is reset. The next login requires holding the touch button. ] cancels = [ 600, # Aborted by user. 601, # Touchbutton timed out. ] ERR_MEM_SETUP = 503 # Device initialization in progress. class DBBError(Exception): def __init__(self, error): Exception.__init__(self) self.error = error def get_error(self): return self.error['error']['message'] def get_code(self): return self.error['error']['code'] def __str__(self): return 'Error: {}, Code: {}'.format(self.error['error']['message'], self.error['error']['code']) def digitalbitbox_exception(f): def func(*args, **kwargs): try: return f(*args, **kwargs) except DBBError as e: if e.get_code() in bad_args: raise BadArgumentError(e.get_error()) elif e.get_code() in device_failures: raise DeviceFailureError(e.get_error()) elif e.get_code() in cancels: raise ActionCanceledError(e.get_error()) elif e.get_code() == ERR_MEM_SETUP: raise DeviceNotReadyError(e.get_error()) return func def aes_encrypt_with_iv(key, iv, data): aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv) aes = pyaes.Encrypter(aes_cbc) e = aes.feed(data) + aes.feed() # empty aes.feed() appends pkcs padding return e def aes_decrypt_with_iv(key, iv, data): aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv) aes = pyaes.Decrypter(aes_cbc) s = aes.feed(data) + aes.feed() # empty aes.feed() strips pkcs padding return s def encrypt_aes(secret, s): iv = bytes(os.urandom(16)) ct = aes_encrypt_with_iv(secret, iv, s) e = iv + ct return e def decrypt_aes(secret, e): iv, e = e[:16], e[16:] s = aes_decrypt_with_iv(secret, iv, e) return s def sha256(x): return hashlib.sha256(x).digest() def sha512(x): return hashlib.sha512(x).digest() def double_hash(x): if type(x) is not bytearray: x=x.encode('utf-8') return sha256(sha256(x)) def derive_keys(x): h = double_hash(x) h = sha512(h) return (h[:len(h)//2], h[len(h)//2:]) def to_string(x, enc): if isinstance(x, (bytes, bytearray)): return x.decode(enc) if isinstance(x, str): return x else: raise DeviceFailureError("Not a string or bytes like object") class BitboxSimulator(): def __init__(self, ip, port): self.ip = ip self.port = port self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.connect((self.ip, self.port)) self.socket.settimeout(1) def send_recv(self, msg): self.socket.sendall(msg) data = self.socket.recv(3584) return data def close(self): self.socket.close() def get_serial_number_string(self): return 'dbb_fw:v5.0.0' def send_frame(data, device): data = bytearray(data) data_len = len(data) seq = 0; idx = 0; write = [] while idx < data_len: if idx == 0: # INIT frame write = data[idx : idx + min(data_len, usb_report_size - 7)] device.write(b'\0' + struct.pack(">IBH",HWW_CID, HWW_CMD, data_len & 0xFFFF) + write + b'\xEE' * (usb_report_size - 7 - len(write))) else: # CONT frame write = data[idx : idx + min(data_len, usb_report_size - 5)] device.write(b'\0' + struct.pack(">IB", HWW_CID, seq) + write + b'\xEE' * (usb_report_size - 5 - len(write))) seq += 1 idx += len(write) def read_frame(device): # INIT response read = bytearray(device.read(usb_report_size)) cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3] cmd = read[4] data_len = read[5] * 256 + read[6] data = read[7:] idx = len(read) - 7; while idx < data_len: # CONT response read = bytearray(device.read(usb_report_size)) data += read[5:] idx += len(read) - 5 assert cid == HWW_CID, '- USB command ID mismatch' assert cmd == HWW_CMD, '- USB command frame mismatch' return data def get_firmware_version(device): serial_number = device.get_serial_number_string() split_serial = serial_number.split(':') firm_ver = split_serial[1][1:] # Version is vX.Y.Z, we just need X.Y.Z split_ver = firm_ver.split('.') return (int(split_ver[0]), int(split_ver[1]), int(split_ver[2])) # major, minor, revision def send_plain(msg, device): reply = "" try: if isinstance(device, BitboxSimulator): r = device.send_recv(msg) else: firm_ver = get_firmware_version(device) if (firm_ver[0] == 2 and firm_ver[1] == 0) or (firm_ver[0] == 1): hidBufSize = 4096 device.write('\0' + msg + '\0' * (hidBufSize - len(msg))) r = bytearray() while len(r) < hidBufSize: r += bytearray(self.dbb_hid.read(hidBufSize)) else: send_frame(msg, device) r = read_frame(device) r = r.rstrip(b' \t\r\n\0') r = r.replace(b"\0", b'') r = to_string(r, 'utf8') reply = json.loads(r) except Exception as e: reply = json.loads('{"error":"Exception caught while sending plaintext message to DigitalBitbox ' + str(e) + '"}') return reply def send_encrypt(msg, password, device): reply = "" try: firm_ver = get_firmware_version(device) if firm_ver[0] >= 5: encryption_key, authentication_key = derive_keys(password) msg = encrypt_aes(encryption_key, msg) hmac_digest = hmac.new(authentication_key, msg, digestmod=hashlib.sha256).digest() authenticated_msg = base64.b64encode(msg + hmac_digest) else: encryption_key = double_hash(password) authenticated_msg = base64.b64encode(encrypt_aes(encryption_key, msg)) reply = send_plain(authenticated_msg, device) if 'ciphertext' in reply: b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"]))) if firm_ver[0] >= 5: msg = b64_unencoded[:-32] reply_hmac = b64_unencoded[-32:] hmac_calculated = hmac.new(authentication_key, msg, digestmod=hashlib.sha256).digest() if not hmac.compare_digest(reply_hmac, hmac_calculated): raise Exception("Failed to validate HMAC") else: msg = b64_unencoded reply = decrypt_aes(encryption_key, msg) reply = json.loads(reply.decode("utf-8")) if 'error' in reply: password = None except Exception as e: reply = {'error':'Exception caught while sending encrypted message to DigitalBitbox ' + str(e)} return reply def stretch_backup_key(password): key = hashlib.pbkdf2_hmac('sha512', password.encode(), b'Digital Bitbox', 20480) return binascii.hexlify(key).decode() def format_backup_filename(name): return '{}-{}.pdf'.format(name, time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())) # This class extends the HardwareWalletClient for Digital Bitbox specific things class DigitalbitboxClient(HardwareWalletClient): def __init__(self, path, password): super(DigitalbitboxClient, self).__init__(path, password) if not password: raise NoPasswordError('Password must be supplied for digital BitBox') if path.startswith('udp:'): split_path = path.split(':') ip = split_path[1] port = int(split_path[2]) self.device = BitboxSimulator(ip, port) else: self.device = hid.device() self.device.open_path(path.encode()) self.password = password # Must return a dict with the xpub # Retrieves the public key at the specified BIP 32 derivation path @digitalbitbox_exception def get_pubkey_at_path(self, path): if '\'' not in path and 'h' not in path and 'H' not in path: raise BadArgumentError('The digital bitbox requires one part of the derivation path to be derived using hardened keys') reply = send_encrypt('{"xpub":"' + path + '"}', self.password, self.device) if 'error' in reply: raise DBBError(reply) if self.is_testnet: return {'xpub':xpub_main_2_test(reply['xpub'])} else: return {'xpub':reply['xpub']} # Must return a hex string with the signed transaction # The tx must be in the PSBT format @digitalbitbox_exception def sign_tx(self, tx): # Create a transaction with all scriptsigs blanekd out blank_tx = CTransaction(tx.tx) # Get the master key fingerprint master_fp = get_xpub_fingerprint(self.get_pubkey_at_path('m/0h')['xpub']) # create sighashes sighash_tuples = [] for txin, psbt_in, i_num in zip(blank_tx.vin, tx.inputs, range(len(blank_tx.vin))): sighash = b"" pubkeys = [] if psbt_in.non_witness_utxo: utxo = psbt_in.non_witness_utxo.vout[txin.prevout.n] # Check if P2SH if utxo.is_p2sh(): # Look up redeemscript redeemscript = psbt_in.redeem_script # Add to blank_tx txin.scriptSig = redeemscript # Check if P2PKH elif utxo.is_p2pkh() or utxo.is_p2pk(): txin.scriptSig = psbt_in.non_witness_utxo.vout[txin.prevout.n].scriptPubKey # We don't know what this is, skip it else: continue # Serialize and add sighash ALL ser_tx = blank_tx.serialize_without_witness() ser_tx += b"\x01\x00\x00\x00" # Hash it sighash += hash256(ser_tx) txin.scriptSig = b"" elif psbt_in.witness_utxo: # Calculate hashPrevouts and hashSequence prevouts_preimage = b"" sequence_preimage = b"" for inputs in blank_tx.vin: prevouts_preimage += inputs.prevout.serialize() sequence_preimage += struct.pack("= 0x80000000: keypath_str += str(index - 0x80000000) + 'h' else: keypath_str += str(index) # Create tuples and add to List tup = (binascii.hexlify(sighash).decode(), keypath_str, i_num, pubkey) sighash_tuples.append(tup) # Return early if nothing to do if len(sighash_tuples) == 0: return {'psbt':tx.serialize()} # Sign the sighashes to_send = '{"sign":{"data":[' for tup in sighash_tuples: to_send += '{"hash":"' to_send += tup[0] to_send += '","keypath":"' to_send += tup[1] to_send += '"},' if to_send[-1] == ',': to_send = to_send[:-1] to_send += ']}}' logging.debug(to_send) reply = send_encrypt(to_send, self.password, self.device) logging.debug(reply) if 'error' in reply: raise DBBError(reply) print("Touch the device for 3 seconds to sign. Touch briefly to cancel", file=sys.stderr) reply = send_encrypt(to_send, self.password, self.device) logging.debug(reply) if 'error' in reply: raise DBBError(reply) # Extract sigs sigs = [] for item in reply['sign']: sigs.append(binascii.unhexlify(item['sig'])) # Make sigs der der_sigs = [] for sig in sigs: der_sigs.append(ser_sig_der(sig[0:32], sig[32:64])) # add sigs to tx for tup, sig in zip(sighash_tuples, der_sigs): tx.inputs[tup[2]].partial_sigs[tup[3]] = sig return {'psbt':tx.serialize()} # Must return a base64 encoded string with the signed message # The message can be any string @digitalbitbox_exception def sign_message(self, message, keypath): to_hash = b"" to_hash += self.message_magic to_hash += ser_compact_size(len(message)) to_hash += message.encode() hashed_message = hash256(to_hash) to_send = '{"sign":{"data":[{"hash":"' to_send += binascii.hexlify(hashed_message).decode() to_send += '","keypath":"' to_send += keypath to_send += '"}]}}' reply = send_encrypt(to_send, self.password, self.device) logging.debug(reply) if 'error' in reply: raise DBBError(reply) print("Touch the device for 3 seconds to sign. Touch briefly to cancel", file=sys.stderr) reply = send_encrypt(to_send, self.password, self.device) logging.debug(reply) if 'error' in reply: raise DBBError(reply) sig = binascii.unhexlify(reply['sign'][0]['sig']) r = sig[0:32] s = sig[32:64] recid = binascii.unhexlify(reply['sign'][0]['recid']) compact_sig = ser_sig_compact(r, s, recid) logging.debug(binascii.hexlify(compact_sig)) return {"signature":base64.b64encode(compact_sig).decode('utf-8')} # Display address of specified type on the device. Only supports single-key based addresses. def display_address(self, keypath, p2sh_p2wpkh, bech32): raise UnavailableActionError('The Digital Bitbox does not have a screen to display addresses on') # Setup a new device @digitalbitbox_exception def setup_device(self, label='', passphrase=''): # Make sure this is not initialized reply = send_encrypt('{"device" : "info"}', self.password, self.device) if 'error' not in reply or ('error' in reply and reply['error']['code'] != 101): raise DeviceAlreadyInitError('Device is already initialized. Use wipe first and try again') # Need a wallet name and backup passphrase if not label or not passphrase: raise BadArgumentError('The label and backup passphrase for a new Digital Bitbox wallet must be specified and cannot be empty') # Set password to_send = {'password': self.password} reply = send_plain(json.dumps(to_send).encode(), self.device) # Now make the wallet key = stretch_backup_key(passphrase) backup_filename = format_backup_filename(label) to_send = {'seed': {'source': 'create', 'key': key, 'filename': backup_filename}} reply = send_encrypt(json.dumps(to_send).encode(), self.password, self.device) if 'error' in reply: return {'success': False, 'error': reply['error']['message']} return {'success': True} # Wipe this device @digitalbitbox_exception def wipe_device(self): reply = send_encrypt('{"reset" : "__ERASE__"}', self.password, self.device) if 'error' in reply: return {'success': False, 'error': reply['error']['message']} return {'success': True} # Restore device from mnemonic or xprv def restore_device(self, label=''): raise UnavailableActionError('The Digital Bitbox does not support restoring via software') # Begin backup process @digitalbitbox_exception def backup_device(self, label='', passphrase=''): # Need a wallet name and backup passphrase if not label or not passphrase: raise BadArgumentError('The label and backup passphrase for a Digital Bitbox backup must be specified and cannot be empty') key = stretch_backup_key(passphrase) backup_filename = format_backup_filename(label) to_send = {'backup': {'source': 'all', 'key': key, 'filename': backup_filename}} reply = send_encrypt(json.dumps(to_send).encode(), self.password, self.device) if 'error' in reply: raise DBBError(reply) return {'success': True} # Close the device def close(self): self.device.close() # Prompt pin def prompt_pin(self): raise UnavailableActionError('The Digital Bitbox does not need a PIN sent from the host') # Send pin def send_pin(self, pin): raise UnavailableActionError('The Digital Bitbox does not need a PIN sent from the host') def enumerate(password=''): results = [] devices = hid.enumerate(DBB_VENDOR_ID, DBB_DEVICE_ID) # Try connecting to simulator try: dev = BitboxSimulator('127.0.0.1', 35345) res = dev.send_recv(b'{"device" : "info"}') devices.append({'path': b'udp:127.0.0.1:35345', 'interface_number': 0}) dev.close() except: pass for d in devices: if ('interface_number' in d and d['interface_number'] == 0 \ or ('usage_page' in d and d['usage_page'] == 0xffff)): d_data = {} path = d['path'].decode() d_data['type'] = 'digitalbitbox' d_data['path'] = path client = None try: client = DigitalbitboxClient(path, password) # Check initialized reply = send_encrypt('{"device" : "info"}', password, client.device) if 'error' in reply and reply['error']['code'] == 101: d_data['error'] = 'Not initialized' else: master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) except Exception as e: d_data['error'] = "Could not open client or get fingerprint information: " + str(e) if client: client.close() results.append(d_data) return results PK!8Ñ;;hwilib/devices/keepkey.py# KeepKey interaction script from .trezorlib.transport import enumerate_devices from .trezor import TrezorClient from ..base58 import get_xpub_fingerprint_hex py_enumerate = enumerate # Need to use the enumerate built-in but there's another function already named that class KeepkeyClient(TrezorClient): def __init__(self, path, password=''): super(KeepkeyClient, self).__init__(path, password) self.type = 'Keepkey' def enumerate(password=''): results = [] for dev in enumerate_devices(): d_data = {} d_data['type'] = 'keepkey' d_data['path'] = dev.get_path() client = None try: client = KeepkeyClient(d_data['path'], password) client.client.init_device() if not 'keepkey' in client.client.features.vendor: continue if client.client.features.initialized: master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) else: d_data['error'] = 'Not initialized' except Exception as e: d_data['error'] = "Could not open client or get fingerprint information: " + str(e) if client: client.close() results.append(d_data) return results PK!!::hwilib/devices/ledger.py# Ledger interaction script from ..hwwclient import HardwareWalletClient from ..errors import ActionCanceledError, BadArgumentError, DeviceConnectionError, DeviceFailureError, UnavailableActionError from .btchip.btchip import * from .btchip.btchipUtils import * import base64 import json import struct from .. import base58 from ..base58 import get_xpub_fingerprint_hex from ..serializations import hash256, hash160, ser_uint256, PSBT, CTransaction, HexToBase64 import binascii import logging import re LEDGER_VENDOR_ID = 0x2c97 LEDGER_DEVICE_ID = 0x0001 # minimal checking of string keypath def check_keypath(key_path): parts = re.split("/", key_path) if parts[0] != "m": return False # strip hardening chars for index in parts[1:]: index_int = re.sub('[hH\']', '', index) if not index_int.isdigit(): return False if int(index_int) > 0x80000000: return False return True bad_args = [ 0x6700, # BTCHIP_SW_INCORRECT_LENGTH 0x6A80, # BTCHIP_SW_INCORRECT_DATA 0x6B00, # BTCHIP_SW_INCORRECT_P1_P2 0x6D00, # BTCHIP_SW_INS_NOT_SUPPORTED ] cancels = [ 0x6982, # BTCHIP_SW_SECURITY_STATUS_NOT_SATISFIED 0x6985, # BTCHIP_SW_CONDITIONS_OF_USE_NOT_SATISFIED ] def ledger_exception(f): def func(*args, **kwargs): try: return f(*args, **kwargs) except ValueError as e: raise BadArgumentError(str(e)) except BTChipException as e: if e.sw in bad_args: raise BadArgumentError('Bad argument') elif e.sw == 0x6F00: # BTCHIP_SW_TECHNICAL_PROBLEM raise DeviceFailureError(e.message) elif e.sw == 0x6FAA: # BTCHIP_SW_HALTED raise DeviceConnectionError('Device is asleep') elif e.sw in cancels: raise ActionCanceledError('{} canceled'.format(f.__name__)) else: raise e return func # This class extends the HardwareWalletClient for Ledger Nano S specific things class LedgerClient(HardwareWalletClient): def __init__(self, path, password=''): super(LedgerClient, self).__init__(path, password) device = hid.device() device.open_path(path.encode()) device.set_nonblocking(True) self.dongle = HIDDongleHIDAPI(device, True, logging.getLogger().getEffectiveLevel() == logging.DEBUG) self.app = btchip(self.dongle) # Must return a dict with the xpub # Retrieves the public key at the specified BIP 32 derivation path @ledger_exception def get_pubkey_at_path(self, path): if not check_keypath(path): raise BadArgumentError("Invalid keypath") path = path[2:] path = path.replace('h', '\'') path = path.replace('H', '\'') # This call returns raw uncompressed pubkey, chaincode pubkey = self.app.getWalletPublicKey(path) if path != "": parent_path = "" for ind in path.split("/")[:-1]: parent_path += ind+"/" parent_path = parent_path[:-1] # Get parent key fingerprint parent = self.app.getWalletPublicKey(parent_path) fpr = hash160(compress_public_key(parent["publicKey"]))[:4] # Compute child info childstr = path.split("/")[-1] hard = 0 if childstr[-1] == "'" or childstr[-1] == "h" or childstr[-1] == "H": childstr = childstr[:-1] hard = 0x80000000 child = struct.pack(">I", int(childstr)+hard) # Special case for m else: child = bytearray.fromhex("00000000") fpr = child chainCode = pubkey["chainCode"] publicKey = compress_public_key(pubkey["publicKey"]) depth = len(path.split("/")) if len(path) > 0 else 0 depth = struct.pack("B", depth) if self.is_testnet: version = bytearray.fromhex("043587CF") else: version = bytearray.fromhex("0488B21E") extkey = version+depth+fpr+child+chainCode+publicKey checksum = hash256(extkey)[:4] return {"xpub":base58.encode(extkey+checksum)} # Must return a hex string with the signed transaction # The tx must be in the combined unsigned transaction format # Current only supports segwit signing @ledger_exception def sign_tx(self, tx): c_tx = CTransaction(tx.tx) tx_bytes = c_tx.serialize_with_witness() # Master key fingerprint master_fpr = hash160(compress_public_key(self.app.getWalletPublicKey('')["publicKey"]))[:4] # An entry per input, each with 0 to many keys to sign with all_signature_attempts = [[]]*len(c_tx.vin) # NOTE: We only support signing Segwit inputs, where we can skip over non-segwit # inputs, or non-segwit inputs, where *all* inputs are non-segwit. This is due # to Ledger's mutually exclusive signing steps for each type. segwit_inputs = [] # Legacy style inputs legacy_inputs = [] has_segwit = False has_legacy = False script_codes = [[]]*len(c_tx.vin) # Detect changepath, (p2sh-)p2(w)pkh only change_path = '' for txout, i_num in zip(c_tx.vout, range(len(c_tx.vout))): # Find which wallet key could be change based on hdsplit: m/.../1/k # Wallets shouldn't be sending to change address as user action # otherwise this will get confused for pubkey, path in tx.outputs[i_num].hd_keypaths.items(): if struct.pack(" 2 and path[-2] == 1: # For possible matches, check if pubkey matches possible template if hash160(pubkey) in txout.scriptPubKey or hash160(bytearray.fromhex("0014")+hash160(pubkey)) in txout.scriptPubKey: change_path = '' for index in path[1:]: change_path += str(index)+"/" change_path = change_path[:-1] for txin, psbt_in, i_num in zip(c_tx.vin, tx.inputs, range(len(c_tx.vin))): seq = format(txin.nSequence, 'x') seq = seq.zfill(8) seq = bytearray.fromhex(seq) seq.reverse() seq_hex = ''.join('{:02x}'.format(x) for x in seq) if psbt_in.non_witness_utxo: segwit_inputs.append({"value":txin.prevout.serialize()+struct.pack(" 0: # p2wpkh scriptCode += b"\x76\xa9\x14" scriptCode += witness_program[2:] scriptCode += b"\x88\xac" elif len(witness_program) == 0: if len(redeemscript) > 0: scriptCode = redeemscript else: scriptCode = psbt_in.non_witness_utxo.vout[txin.prevout.n].scriptPubKey # Save scriptcode for later signing script_codes[i_num] = scriptCode # Find which pubkeys could sign this input (should be all?) for pubkey in psbt_in.hd_keypaths.keys(): if hash160(pubkey) in scriptCode or pubkey in scriptCode: pubkeys.append(pubkey) # Figure out which keys in inputs are from our wallet for pubkey in pubkeys: keypath = psbt_in.hd_keypaths[pubkey] if master_fpr == struct.pack("')". This extracts the actual address to work around this. # Setup a new device def setup_device(self, label='', passphrase=''): raise UnavailableActionError('The Ledger Nano S does not support software setup') # Wipe this device def wipe_device(self): raise UnavailableActionError('The Ledger Nano S does not support wiping via software') # Restore device from mnemonic or xprv def restore_device(self, label=''): raise UnavailableActionError('The Ledger Nano S does not support restoring via software') # Begin backup process def backup_device(self, label='', passphrase=''): raise UnavailableActionError('The Ledger Nano S does not support creating a backup via software') # Close the device def close(self): self.dongle.close() # Prompt pin def prompt_pin(self): raise UnavailableActionError('The Ledger Nano S does not need a PIN sent from the host') # Send pin def send_pin(self, pin): raise UnavailableActionError('The Ledger Nano S does not need a PIN sent from the host') def enumerate(password=''): results = [] for d in hid.enumerate(LEDGER_VENDOR_ID, LEDGER_DEVICE_ID): if ('interface_number' in d and d['interface_number'] == 0 \ or ('usage_page' in d and d['usage_page'] == 0xffa0)): d_data = {} path = d['path'].decode() d_data['type'] = 'ledger' d_data['path'] = path client = None try: client = LedgerClient(path, password) master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) except Exception as e: d_data['error'] = "Could not open client or get fingerprint information: " + str(e) if client: client.close() results.append(d_data) return results PK!h}ݥ/D/Dhwilib/devices/trezor.py# Trezor interaction script from ..hwwclient import HardwareWalletClient from ..errors import ActionCanceledError, BadArgumentError, DeviceAlreadyInitError, DeviceAlreadyUnlockedError, DeviceConnectionError, UnavailableActionError, DeviceNotReadyError from .trezorlib.client import TrezorClient as Trezor from .trezorlib.debuglink import TrezorClientDebugLink, DebugUI from .trezorlib.exceptions import Cancelled from .trezorlib.transport import enumerate_devices, get_transport from .trezorlib.ui import echo, PassphraseUI, mnemonic_words, PIN_CURRENT, PIN_NEW, PIN_CONFIRM, PIN_MATRIX_DESCRIPTION, prompt from .trezorlib import protobuf, tools, btc, device from .trezorlib import messages as proto from ..base58 import get_xpub_fingerprint, decode, to_address, xpub_main_2_test, get_xpub_fingerprint_hex from ..serializations import ser_uint256, uint256_from_str from .. import bech32 from usb1 import USBErrorNoDevice from types import MethodType import base64 import binascii import json import logging import sys import os py_enumerate = enumerate # Need to use the enumerate built-in but there's another function already named that # Only handles up to 15 of 15 def parse_multisig(script): # Get m m = script[0] - 80 if m < 1 or m > 15: return (False, None) # Get pubkeys and build HDNodePathType pubkeys = [] offset = 1 while True: pubkey_len = script[offset] if pubkey_len != 33: break offset += 1 key = script[offset:offset + 33] offset += 33 hd_node = proto.HDNodeType(depth=0, fingerprint=0, child_num=0, chain_code=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', public_key=key) pubkeys.append(proto.HDNodePathType(node=hd_node, address_n=[])) # Check things at the end n = script[offset] - 80 if n != len(pubkeys): return (False, None) offset += 1 op_cms = script[offset] if op_cms != 174: return (False, None) # Build MultisigRedeemScriptType and return it multisig = proto.MultisigRedeemScriptType(m=m, signatures=[b''] * n, pubkeys=pubkeys) return (True, multisig) def trezor_exception(f): def func(*args, **kwargs): try: return f(*args, **kwargs) except ValueError as e: raise BadArgumentError(str(e)) except Cancelled as e: raise ActionCanceledError('{} canceled'.format(f.__name__)) except USBErrorNoDevice as e: raise DeviceConnectionError('Device disconnected') return func def interactive_get_pin(self, code=None): if code == PIN_CURRENT: desc = "current PIN" elif code == PIN_NEW: desc = "new PIN" elif code == PIN_CONFIRM: desc = "new PIN again" else: desc = "PIN" echo(PIN_MATRIX_DESCRIPTION) while True: pin = prompt("Please enter {}".format(desc), hide_input=True) if not pin.isdigit(): echo("Non-numerical PIN provided, please try again") else: return pin # This class extends the HardwareWalletClient for Trezor specific things class TrezorClient(HardwareWalletClient): def __init__(self, path, password=''): super(TrezorClient, self).__init__(path, password) self.simulator = False if path.startswith('udp'): logging.debug('Simulator found, using DebugLink') transport = get_transport(path) self.client = TrezorClientDebugLink(transport=transport) self.simulator = True else: self.client = Trezor(transport=get_transport(path), ui=PassphraseUI(password)) # if it wasn't able to find a client, throw an error if not self.client: raise IOError("no Device") self.password = password self.type = 'Trezor' def _check_unlocked(self): self.client.init_device() if self.client.features.pin_protection and not self.client.features.pin_cached: raise DeviceNotReadyError('{} is locked. Unlock by using \'promptpin\' and then \'sendpin\'.'.format(self.type)) # Must return a dict with the xpub # Retrieves the public key at the specified BIP 32 derivation path @trezor_exception def get_pubkey_at_path(self, path): self._check_unlocked() try: expanded_path = tools.parse_path(path) except ValueError as e: raise BadArgumentError(str(e)) output = btc.get_public_node(self.client, expanded_path) if self.is_testnet: return {'xpub':xpub_main_2_test(output.xpub)} else: return {'xpub':output.xpub} # Must return a hex string with the signed transaction # The tx must be in the psbt format @trezor_exception def sign_tx(self, tx): self._check_unlocked() # Get this devices master key fingerprint master_key = btc.get_public_node(self.client, [0]) master_fp = get_xpub_fingerprint(master_key.xpub) # Do multiple passes for multisig passes = 1 p = 0 while p < passes: # Prepare inputs inputs = [] to_ignore = [] # Note down which inputs whose signatures we're going to ignore for input_num, (psbt_in, txin) in py_enumerate(list(zip(tx.inputs, tx.tx.vin))): txinputtype = proto.TxInputType() # Set the input stuff txinputtype.prev_hash = ser_uint256(txin.prevout.hash)[::-1] txinputtype.prev_index = txin.prevout.n txinputtype.sequence = txin.nSequence # Detrermine spend type scriptcode = b'' if psbt_in.non_witness_utxo: utxo = psbt_in.non_witness_utxo.vout[txin.prevout.n] txinputtype.script_type = proto.InputScriptType.SPENDADDRESS scriptcode = utxo.scriptPubKey txinputtype.amount = psbt_in.non_witness_utxo.vout[txin.prevout.n].nValue elif psbt_in.witness_utxo: utxo = psbt_in.witness_utxo # Check if the output is p2sh if psbt_in.witness_utxo.is_p2sh(): txinputtype.script_type = proto.InputScriptType.SPENDP2SHWITNESS else: txinputtype.script_type = proto.InputScriptType.SPENDWITNESS scriptcode = psbt_in.witness_utxo.scriptPubKey txinputtype.amount = psbt_in.witness_utxo.nValue # Set the script if psbt_in.witness_script: scriptcode = psbt_in.witness_script elif psbt_in.redeem_script: scriptcode = psbt_in.redeem_script def ignore_input(): txinputtype.address_n = [0x80000000] txinputtype.multisig = None txinputtype.script_type = proto.InputScriptType.SPENDWITNESS inputs.append(txinputtype) to_ignore.append(input_num) # Check for multisig is_ms, multisig = parse_multisig(scriptcode) if is_ms: # Add to txinputtype txinputtype.multisig = multisig if psbt_in.non_witness_utxo: if utxo.is_p2sh: txinputtype.script_type = proto.InputScriptType.SPENDMULTISIG else: # Cannot sign bare multisig, ignore it ignore_input() continue elif not is_ms and psbt_in.non_witness_utxo and not utxo.is_p2pkh: # Cannot sign unknown spk, ignore it ignore_input() continue elif not is_ms and psbt_in.witness_utxo and psbt_in.witness_script: # Cannot sign unknown witness script, ignore it ignore_input() continue # Find key to sign with found = False our_keys = 0 for key in psbt_in.hd_keypaths.keys(): keypath = psbt_in.hd_keypaths[key] if keypath[0] == master_fp and key not in psbt_in.partial_sigs: if not found: txinputtype.address_n = keypath[1:] found = True our_keys += 1 # Determine if we need to do more passes to sign everything if our_keys > passes: passes = our_keys if not found: # This input is not one of ours ignore_input() continue # append to inputs inputs.append(txinputtype) # address version byte if self.is_testnet: p2pkh_version = b'\x6f' p2sh_version = b'\xc4' bech32_hrp = 'tb' else: p2pkh_version = b'\x00' p2sh_version = b'\x05' bech32_hrp = 'bc' # prepare outputs outputs = [] for out in tx.tx.vout: txoutput = proto.TxOutputType() txoutput.amount = out.nValue txoutput.script_type = proto.OutputScriptType.PAYTOADDRESS if out.is_p2pkh(): txoutput.address = to_address(out.scriptPubKey[3:23], p2pkh_version) elif out.is_p2sh(): txoutput.address = to_address(out.scriptPubKey[2:22], p2sh_version) else: wit, ver, prog = out.is_witness() if wit: txoutput.address = bech32.encode(bech32_hrp, ver, prog) else: raise BadArgumentError("Output is not an address") # append to outputs outputs.append(txoutput) # Prepare prev txs prevtxs = {} for psbt_in in tx.inputs: if psbt_in.non_witness_utxo: prev = psbt_in.non_witness_utxo t = proto.TransactionType() t.version = prev.nVersion t.lock_time = prev.nLockTime for vin in prev.vin: i = proto.TxInputType() i.prev_hash = ser_uint256(vin.prevout.hash)[::-1] i.prev_index = vin.prevout.n i.script_sig = vin.scriptSig i.sequence = vin.nSequence t.inputs.append(i) for vout in prev.vout: o = proto.TxOutputBinType() o.amount = vout.nValue o.script_pubkey = vout.scriptPubKey t.bin_outputs.append(o) logging.debug(psbt_in.non_witness_utxo.hash) prevtxs[ser_uint256(psbt_in.non_witness_utxo.sha256)[::-1]] = t # Sign the transaction tx_details = proto.SignTx() tx_details.version = tx.tx.nVersion tx_details.lock_time = tx.tx.nLockTime if self.is_testnet: signed_tx = btc.sign_tx(self.client, "Testnet", inputs, outputs, tx_details, prevtxs) else: signed_tx = btc.sign_tx(self.client, "Bitcoin", inputs, outputs, tx_details, prevtxs) # Each input has one signature for input_num, (psbt_in, sig) in py_enumerate(list(zip(tx.inputs, signed_tx[0]))): if input_num in to_ignore: continue for pubkey in psbt_in.hd_keypaths.keys(): fp = psbt_in.hd_keypaths[pubkey][0] if fp == master_fp and pubkey not in psbt_in.partial_sigs: psbt_in.partial_sigs[pubkey] = sig + b'\x01' break p += 1 return {'psbt':tx.serialize()} # Must return a base64 encoded string with the signed message # The message can be any string @trezor_exception def sign_message(self, message, keypath): self._check_unlocked() path = tools.parse_path(keypath) result = btc.sign_message(self.client, 'Bitcoin', path, message) return {'signature': base64.b64encode(result.signature).decode('utf-8')} # Display address of specified type on the device. Only supports single-key based addresses. @trezor_exception def display_address(self, keypath, p2sh_p2wpkh, bech32): self._check_unlocked() expanded_path = tools.parse_path(keypath) address = btc.get_address( self.client, "Testnet" if self.is_testnet else "Bitcoin", expanded_path, show_display=True, script_type=proto.InputScriptType.SPENDWITNESS if bech32 else (proto.InputScriptType.SPENDP2SHWITNESS if p2sh_p2wpkh else proto.InputScriptType.SPENDADDRESS) ) return {'address': address} # Setup a new device @trezor_exception def setup_device(self, label='', passphrase=''): self.client.init_device() if not self.simulator: # Use interactive_get_pin self.client.ui.get_pin = MethodType(interactive_get_pin, self.client.ui) if self.client.features.initialized: raise DeviceAlreadyInitError('Device is already initialized. Use wipe first and try again') passphrase_enabled = False if self.password: passphrase_enabled = True device.reset(self.client, passphrase_protection=bool(self.password)) return {'success': True} # Wipe this device @trezor_exception def wipe_device(self): self._check_unlocked() device.wipe(self.client) return {'success': True} # Restore device from mnemonic or xprv @trezor_exception def restore_device(self, label=''): self.client.init_device() if not self.simulator: # Use interactive_get_pin self.client.ui.get_pin = MethodType(interactive_get_pin, self.client.ui) passphrase_enabled = False device.recover(self.client, label=label, input_callback=mnemonic_words(), passphrase_protection=bool(self.password)) return {'success': True} # Begin backup process def backup_device(self, label='', passphrase=''): raise UnavailableActionError('The {} does not support creating a backup via software'.format(self.type)) # Close the device @trezor_exception def close(self): self.client.close() # Prompt for a pin on device @trezor_exception def prompt_pin(self): self.client.open() self.client.init_device() if not self.client.features.pin_protection: raise DeviceAlreadyUnlockedError('This device does not need a PIN') if self.client.features.pin_cached: raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device') print('Use \'sendpin\' to provide the number positions for the PIN as displayed on your device\'s screen', file=sys.stderr) print(PIN_MATRIX_DESCRIPTION, file=sys.stderr) self.client.call_raw(proto.Ping(message=b'ping', button_protection=False, pin_protection=True, passphrase_protection=False)) return {'success': True} # Send the pin @trezor_exception def send_pin(self, pin): self.client.open() if not pin.isdigit(): raise BadArgumentError("Non-numeric PIN provided") resp = self.client.call_raw(proto.PinMatrixAck(pin=pin)) if isinstance(resp, proto.Failure): self.client.features = self.client.call_raw(proto.GetFeatures()) if isinstance(self.client.features, proto.Features): if not self.client.features.pin_protection: raise DeviceAlreadyUnlockedError('This device does not need a PIN') if self.client.features.pin_cached: raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device') return {'success': False} return {'success': True} def enumerate(password=''): results = [] for dev in enumerate_devices(): d_data = {} d_data['type'] = 'trezor' d_data['path'] = dev.get_path() client = None try: client = TrezorClient(d_data['path'], password) client.client.init_device() if not 'trezor' in client.client.features.vendor: continue if client.client.features.initialized: master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub) else: d_data['error'] = 'Not initialized' except Exception as e: d_data['error'] = "Could not open client or get fingerprint information: " + str(e) if client: client.close() results.append(d_data) return results PK! $hwilib/devices/trezorlib/__init__.py__version__ = "0.11.1" # fmt: off MINIMUM_FIRMWARE_VERSION = { "1": (1, 6, 1), "T": (2, 0, 10), "K1-14AM": (0, 0, 0) } # fmt: on PK!.DDhwilib/devices/trezorlib/btc.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . from . import messages from .tools import CallException, expect, normalize_nfc, session @expect(messages.PublicKey) def get_public_node( client, n, ecdsa_curve_name=None, show_display=False, coin_name=None, script_type=messages.InputScriptType.SPENDADDRESS, ): return client.call( messages.GetPublicKey( address_n=n, ecdsa_curve_name=ecdsa_curve_name, show_display=show_display, coin_name=coin_name, script_type=script_type, ) ) @expect(messages.Address, field="address") def get_address( client, coin_name, n, show_display=False, multisig=None, script_type=messages.InputScriptType.SPENDADDRESS, ): return client.call( messages.GetAddress( address_n=n, coin_name=coin_name, show_display=show_display, multisig=multisig, script_type=script_type, ) ) @expect(messages.MessageSignature) def sign_message( client, coin_name, n, message, script_type=messages.InputScriptType.SPENDADDRESS ): message = normalize_nfc(message) return client.call( messages.SignMessage( coin_name=coin_name, address_n=n, message=message, script_type=script_type ) ) @session def sign_tx(client, coin_name, inputs, outputs, details=None, prev_txes=None): # set up a transactions dict txes = {None: messages.TransactionType(inputs=inputs, outputs=outputs)} # preload all relevant transactions ahead of time for inp in inputs: if inp.script_type not in ( messages.InputScriptType.SPENDP2SHWITNESS, messages.InputScriptType.SPENDWITNESS, messages.InputScriptType.EXTERNAL, ): try: prev_tx = prev_txes[inp.prev_hash] except Exception as e: raise ValueError("Could not retrieve prev_tx") from e if not isinstance(prev_tx, messages.TransactionType): raise ValueError("Invalid value for prev_tx") from None txes[inp.prev_hash] = prev_tx if details is None: signtx = messages.SignTx() else: signtx = details signtx.coin_name = coin_name signtx.inputs_count = len(inputs) signtx.outputs_count = len(outputs) res = client.call(signtx) # Prepare structure for signatures signatures = [None] * len(inputs) serialized_tx = b"" def copy_tx_meta(tx): tx_copy = messages.TransactionType() tx_copy.CopyFrom(tx) # clear fields tx_copy.inputs_cnt = len(tx.inputs) tx_copy.inputs = [] tx_copy.outputs_cnt = len(tx.bin_outputs or tx.outputs) tx_copy.outputs = [] tx_copy.bin_outputs = [] tx_copy.extra_data_len = len(tx.extra_data or b"") tx_copy.extra_data = None return tx_copy R = messages.RequestType while isinstance(res, messages.TxRequest): # If there's some part of signed transaction, let's add it if res.serialized: if res.serialized.serialized_tx: serialized_tx += res.serialized.serialized_tx if res.serialized.signature_index is not None: idx = res.serialized.signature_index sig = res.serialized.signature if signatures[idx] is not None: raise ValueError("Signature for index %d already filled" % idx) signatures[idx] = sig if res.request_type == R.TXFINISHED: break # Device asked for one more information, let's process it. current_tx = txes[res.details.tx_hash] if res.request_type == R.TXMETA: msg = copy_tx_meta(current_tx) res = client.call(messages.TxAck(tx=msg)) elif res.request_type == R.TXINPUT: msg = messages.TransactionType() msg.inputs = [current_tx.inputs[res.details.request_index]] res = client.call(messages.TxAck(tx=msg)) elif res.request_type == R.TXOUTPUT: msg = messages.TransactionType() if res.details.tx_hash: msg.bin_outputs = [current_tx.bin_outputs[res.details.request_index]] else: msg.outputs = [current_tx.outputs[res.details.request_index]] res = client.call(messages.TxAck(tx=msg)) elif res.request_type == R.TXEXTRADATA: o, l = res.details.extra_data_offset, res.details.extra_data_len msg = messages.TransactionType() msg.extra_data = current_tx.extra_data[o : o + l] res = client.call(messages.TxAck(tx=msg)) if isinstance(res, messages.Failure): raise CallException("Signing failed") if not isinstance(res, messages.TxRequest): raise CallException("Unexpected message") if None in signatures: raise RuntimeError("Some signatures are missing!") return signatures, serialized_tx PK!3<"<""hwilib/devices/trezorlib/client.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import logging import sys import warnings from mnemonic import Mnemonic from . import MINIMUM_FIRMWARE_VERSION, exceptions, messages, tools if sys.version_info.major < 3: raise Exception("Trezorlib does not support Python 2 anymore.") LOG = logging.getLogger(__name__) VENDORS = ("bitcointrezor.com", "trezor.io", "keepkey.com") MAX_PASSPHRASE_LENGTH = 50 DEPRECATION_ERROR = """ Incompatible Trezor library detected. (Original error: {}) """.strip() OUTDATED_FIRMWARE_ERROR = """ Your Trezor firmware is out of date. Update it with the following command: trezorctl firmware-update Or visit https://wallet.trezor.io/ """.strip() def get_buttonrequest_value(code): # Converts integer code to its string representation of ButtonRequestType return [ k for k in dir(messages.ButtonRequestType) if getattr(messages.ButtonRequestType, k) == code ][0] class TrezorClient: """Trezor client, a connection to a Trezor device. This class allows you to manage connection state, send and receive protobuf messages, handle user interactions, and perform some generic tasks (send a cancel message, initialize or clear a session, ping the device). You have to provide a transport, i.e., a raw connection to the device. You can use `trezorlib.transport.get_transport` to find one. You have to provide an UI implementation for the three kinds of interaction: - button request (notify the user that their interaction is needed) - PIN request (on T1, ask the user to input numbers for a PIN matrix) - passphrase request (ask the user to enter a passphrase) See `trezorlib.ui` for details. You can supply a `state` you saved in the previous session. If you do, the user might not need to enter their passphrase again. """ def __init__(self, transport, ui=None, state=None): LOG.info("creating client instance for device: {}".format(transport.get_path())) self.transport = transport self.ui = ui self.state = state if ui is None: warnings.warn("UI class not supplied. This will probably crash soon.") self.session_counter = 0 def open(self): if self.session_counter == 0: self.transport.begin_session() self.session_counter += 1 def close(self): if self.session_counter == 1: self.transport.end_session() self.session_counter -= 1 def cancel(self): self._raw_write(messages.Cancel()) def call_raw(self, msg): __tracebackhide__ = True # for pytest # pylint: disable=W0612 self._raw_write(msg) return self._raw_read() def _raw_write(self, msg): __tracebackhide__ = True # for pytest # pylint: disable=W0612 self.transport.write(msg) def _raw_read(self): __tracebackhide__ = True # for pytest # pylint: disable=W0612 return self.transport.read() def _callback_pin(self, msg): try: pin = self.ui.get_pin(msg.type) except exceptions.Cancelled: self.call_raw(messages.Cancel()) raise if not pin.isdigit(): self.call_raw(messages.Cancel()) raise ValueError("Non-numeric PIN provided") resp = self.call_raw(messages.PinMatrixAck(pin=pin)) if isinstance(resp, messages.Failure) and resp.code in ( messages.FailureType.PinInvalid, messages.FailureType.PinCancelled, messages.FailureType.PinExpected, ): raise exceptions.PinException(resp.code, resp.message) else: return resp def _callback_passphrase(self, msg): if msg.on_device: passphrase = None else: try: passphrase = self.ui.get_passphrase() except exceptions.Cancelled: self.call_raw(messages.Cancel()) raise passphrase = Mnemonic.normalize_string(passphrase) if len(passphrase) > MAX_PASSPHRASE_LENGTH: self.call_raw(messages.Cancel()) raise ValueError("Passphrase too long") resp = self.call_raw(messages.PassphraseAck(passphrase=passphrase)) if isinstance(resp, messages.PassphraseStateRequest): self.state = resp.state return self.call_raw(messages.PassphraseStateAck()) else: return resp def _callback_button(self, msg): __tracebackhide__ = True # for pytest # pylint: disable=W0612 # do this raw - send ButtonAck first, notify UI later self._raw_write(messages.ButtonAck()) self.ui.button_request(msg.code) return self._raw_read() @tools.session def call(self, msg): self.check_firmware_version() resp = self.call_raw(msg) while True: if isinstance(resp, messages.PinMatrixRequest): resp = self._callback_pin(resp) elif isinstance(resp, messages.PassphraseRequest): resp = self._callback_passphrase(resp) elif isinstance(resp, messages.ButtonRequest): resp = self._callback_button(resp) elif isinstance(resp, messages.Failure): if resp.code == messages.FailureType.ActionCancelled: raise exceptions.Cancelled raise exceptions.TrezorFailure(resp) else: return resp @tools.session def init_device(self): resp = self.call_raw(messages.Initialize(state=self.state)) if not isinstance(resp, messages.Features): raise exceptions.TrezorException("Unexpected initial response") else: self.features = resp if self.features.vendor not in VENDORS: raise RuntimeError("Unsupported device") # A side-effect of this is a sanity check for broken protobuf definitions. # If the `vendor` field doesn't exist, you probably have a mismatched # checkout of trezor-common. self.version = ( self.features.major_version, self.features.minor_version, self.features.patch_version, ) self.check_firmware_version(warn_only=True) def is_outdated(self): if self.features.bootloader_mode: return False model = self.features.model or "1" required_version = MINIMUM_FIRMWARE_VERSION[model] return self.version < required_version def check_firmware_version(self, warn_only=False): if self.is_outdated(): if warn_only: warnings.warn(OUTDATED_FIRMWARE_ERROR, stacklevel=2) else: raise exceptions.OutdatedFirmwareError(OUTDATED_FIRMWARE_ERROR) @tools.expect(messages.Success, field="message") def ping( self, msg, button_protection=False, pin_protection=False, passphrase_protection=False, ): # We would like ping to work on any valid TrezorClient instance, but # due to the protection modes, we need to go through self.call, and that will # raise an exception if the firmware is too old. # So we short-circuit the simplest variant of ping with call_raw. if not button_protection and not pin_protection and not passphrase_protection: # XXX this should be: `with self:` try: self.open() return self.call_raw(messages.Ping(message=msg)) finally: self.close() msg = messages.Ping( message=msg, button_protection=button_protection, pin_protection=pin_protection, passphrase_protection=passphrase_protection, ) return self.call(msg) def get_device_id(self): return self.features.device_id @tools.expect(messages.Success, field="message") @tools.session def clear_session(self): return self.call_raw(messages.ClearSession()) PK!RoA==%hwilib/devices/trezorlib/debuglink.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . from copy import deepcopy from mnemonic import Mnemonic from . import messages as proto, protobuf, tools from .client import TrezorClient from .tools import expect EXPECTED_RESPONSES_CONTEXT_LINES = 3 class DebugLink: def __init__(self, transport, auto_interact=True): self.transport = transport self.allow_interactions = auto_interact def open(self): self.transport.begin_session() def close(self): self.transport.end_session() def _call(self, msg, nowait=False): self.transport.write(msg) if nowait: return None ret = self.transport.read() return ret def state(self): return self._call(proto.DebugLinkGetState()) def read_pin(self): state = self.state() return state.pin, state.matrix def read_pin_encoded(self): return self.encode_pin(*self.read_pin()) def encode_pin(self, pin, matrix=None): """Transform correct PIN according to the displayed matrix.""" if matrix is None: _, matrix = self.read_pin() return "".join([str(matrix.index(p) + 1) for p in pin]) def read_layout(self): obj = self._call(proto.DebugLinkGetState()) return obj.layout def read_mnemonic(self): obj = self._call(proto.DebugLinkGetState()) return obj.mnemonic def read_recovery_word(self): obj = self._call(proto.DebugLinkGetState()) return (obj.recovery_fake_word, obj.recovery_word_pos) def read_reset_word(self): obj = self._call(proto.DebugLinkGetState()) return obj.reset_word def read_reset_word_pos(self): obj = self._call(proto.DebugLinkGetState()) return obj.reset_word_pos def read_reset_entropy(self): obj = self._call(proto.DebugLinkGetState()) return obj.reset_entropy def read_passphrase_protection(self): obj = self._call(proto.DebugLinkGetState()) return obj.passphrase_protection def input(self, word=None, button=None, swipe=None): if not self.allow_interactions: return decision = proto.DebugLinkDecision() if button is not None: decision.yes_no = button elif word is not None: decision.input = word elif swipe is not None: decision.up_down = swipe else: raise ValueError("You need to provide input data.") self._call(decision, nowait=True) def press_button(self, yes_no): self._call(proto.DebugLinkDecision(yes_no=yes_no), nowait=True) def press_yes(self): self.input(button=True) def press_no(self): self.input(button=False) def swipe_up(self): self.input(swipe=True) def swipe_down(self): self.input(swipe=False) def stop(self): self._call(proto.DebugLinkStop(), nowait=True) @expect(proto.DebugLinkMemory, field="memory") def memory_read(self, address, length): return self._call(proto.DebugLinkMemoryRead(address=address, length=length)) def memory_write(self, address, memory, flash=False): self._call( proto.DebugLinkMemoryWrite(address=address, memory=memory, flash=flash), nowait=True, ) def flash_erase(self, sector): self._call(proto.DebugLinkFlashErase(sector=sector), nowait=True) class NullDebugLink(DebugLink): def __init__(self): super().__init__(None) def open(self): pass def close(self): pass def _call(self, msg, nowait=False): if not nowait: if isinstance(msg, proto.DebugLinkGetState): return proto.DebugLinkState() else: raise RuntimeError("unexpected call to a fake debuglink") class DebugUI: INPUT_FLOW_DONE = object() def __init__(self, debuglink: DebugLink): self.debuglink = debuglink self.pin = None self.passphrase = "sphinx of black quartz, judge my wov" self.input_flow = None def button_request(self, code): if self.input_flow is None: self.debuglink.press_yes() elif self.input_flow is self.INPUT_FLOW_DONE: raise AssertionError("input flow ended prematurely") else: try: self.input_flow.send(code) except StopIteration: self.input_flow = self.INPUT_FLOW_DONE def get_pin(self, code=None): if self.pin: return self.pin else: return self.debuglink.read_pin_encoded() def get_passphrase(self): return self.passphrase class TrezorClientDebugLink(TrezorClient): # This class implements automatic responses # and other functionality for unit tests # for various callbacks, created in order # to automatically pass unit tests. # # This mixing should be used only for purposes # of unit testing, because it will fail to work # without special DebugLink interface provided # by the device. def __init__(self, transport, auto_interact=True): try: debug_transport = transport.find_debug() self.debug = DebugLink(debug_transport, auto_interact) except Exception: if not auto_interact: self.debug = NullDebugLink() else: raise self.ui = DebugUI(self.debug) self.in_with_statement = 0 self.screenshot_id = 0 self.filters = {} # Always press Yes and provide correct pin self.setup_debuglink(True, True) # Do not expect any specific response from device self.expected_responses = None self.current_response = None # Use blank passphrase self.set_passphrase("") super().__init__(transport, ui=self.ui) def open(self): super().open() self.debug.open() def close(self): self.debug.close() super().close() def set_filter(self, message_type, callback): self.filters[message_type] = callback def _filter_message(self, msg): message_type = msg.__class__ callback = self.filters.get(message_type) if callable(callback): return callback(deepcopy(msg)) else: return msg def set_input_flow(self, input_flow): if input_flow is None: self.ui.input_flow = None return if callable(input_flow): input_flow = input_flow() if not hasattr(input_flow, "send"): raise RuntimeError("input_flow should be a generator function") self.ui.input_flow = input_flow next(input_flow) # can't send before first yield def __enter__(self): # For usage in with/expected_responses self.in_with_statement += 1 return self def __exit__(self, _type, value, traceback): self.in_with_statement -= 1 if _type is not None: # Another exception raised return False if self.expected_responses is None: # no need to check anything else return False # return isinstance(value, TypeError) # Evaluate missed responses in 'with' statement if self.current_response < len(self.expected_responses): self._raise_unexpected_response(None) # Cleanup self.expected_responses = None self.current_response = None return False def set_expected_responses(self, expected): if not self.in_with_statement: raise RuntimeError("Must be called inside 'with' statement") self.expected_responses = expected self.current_response = 0 def setup_debuglink(self, button, pin_correct): # self.button = button # True -> YES button, False -> NO button if pin_correct: self.ui.pin = None else: self.ui.pin = "444222" def set_passphrase(self, passphrase): self.ui.passphrase = Mnemonic.normalize_string(passphrase) def set_mnemonic(self, mnemonic): self.mnemonic = Mnemonic.normalize_string(mnemonic).split(" ") def _raw_read(self): __tracebackhide__ = True # for pytest # pylint: disable=W0612 # if SCREENSHOT and self.debug: # from PIL import Image # layout = self.debug.state().layout # im = Image.new("RGB", (128, 64)) # pix = im.load() # for x in range(128): # for y in range(64): # rx, ry = 127 - x, 63 - y # if (ord(layout[rx + (ry / 8) * 128]) & (1 << (ry % 8))) > 0: # pix[x, y] = (255, 255, 255) # im.save("scr%05d.png" % self.screenshot_id) # self.screenshot_id += 1 resp = super()._raw_read() resp = self._filter_message(resp) self._check_request(resp) return resp def _raw_write(self, msg): return super()._raw_write(self._filter_message(msg)) def _raise_unexpected_response(self, msg): __tracebackhide__ = True # for pytest # pylint: disable=W0612 start_at = max(self.current_response - EXPECTED_RESPONSES_CONTEXT_LINES, 0) stop_at = min( self.current_response + EXPECTED_RESPONSES_CONTEXT_LINES + 1, len(self.expected_responses), ) output = [] output.append("Expected responses:") if start_at > 0: output.append(" (...{} previous responses omitted)".format(start_at)) for i in range(start_at, stop_at): exp = self.expected_responses[i] prefix = " " if i != self.current_response else ">>> " set_fields = { key: value for key, value in exp.__dict__.items() if value is not None and value != [] } oneline_str = ", ".join("{}={!r}".format(*i) for i in set_fields.items()) if len(oneline_str) < 60: output.append( "{}{}({})".format(prefix, exp.__class__.__name__, oneline_str) ) else: item = [] item.append("{}{}(".format(prefix, exp.__class__.__name__)) for key, value in set_fields.items(): item.append("{} {}={!r}".format(prefix, key, value)) item.append("{})".format(prefix)) output.append("\n".join(item)) if stop_at < len(self.expected_responses): omitted = len(self.expected_responses) - stop_at output.append(" (...{} following responses omitted)".format(omitted)) output.append("") if msg is not None: output.append("Actually received:") output.append(protobuf.format_message(msg)) else: output.append("This message was never received.") raise AssertionError("\n".join(output)) def _check_request(self, msg): __tracebackhide__ = True # for pytest # pylint: disable=W0612 if self.expected_responses is None: return if self.current_response >= len(self.expected_responses): raise AssertionError( "No more messages were expected, but we got:\n" + protobuf.format_message(msg) ) expected = self.expected_responses[self.current_response] if msg.__class__ != expected.__class__: self._raise_unexpected_response(msg) for field, value in expected.__dict__.items(): if value is None or value == []: continue if getattr(msg, field) != value: self._raise_unexpected_response(msg) self.current_response += 1 def mnemonic_callback(self, _): word, pos = self.debug.read_recovery_word() if word != "": return word if pos != 0: return self.mnemonic[pos - 1] raise RuntimeError("Unexpected call") @expect(proto.Success, field="message") def load_device_by_mnemonic( client, mnemonic, pin, passphrase_protection, label, language="english", skip_checksum=False, expand=False, ): # Convert mnemonic to UTF8 NKFD mnemonic = Mnemonic.normalize_string(mnemonic) # Convert mnemonic to ASCII stream mnemonic = mnemonic.encode() m = Mnemonic("english") if expand: mnemonic = m.expand(mnemonic) if not skip_checksum and not m.check(mnemonic): raise ValueError("Invalid mnemonic checksum") if client.features.initialized: raise RuntimeError( "Device is initialized already. Call device.wipe() and try again." ) resp = client.call( proto.LoadDevice( mnemonic=mnemonic, pin=pin, passphrase_protection=passphrase_protection, language=language, label=label, skip_checksum=skip_checksum, ) ) client.init_device() return resp @expect(proto.Success, field="message") def load_device_by_xprv(client, xprv, pin, passphrase_protection, label, language): if client.features.initialized: raise RuntimeError( "Device is initialized already. Call wipe_device() and try again." ) if xprv[0:4] not in ("xprv", "tprv"): raise ValueError("Unknown type of xprv") if not 100 < len(xprv) < 112: # yes this is correct in Python raise ValueError("Invalid length of xprv") node = proto.HDNodeType() data = tools.b58decode(xprv, None).hex() if data[90:92] != "00": raise ValueError("Contain invalid private key") checksum = (tools.btc_hash(bytes.fromhex(data[:156]))[:4]).hex() if checksum != data[156:]: raise ValueError("Checksum doesn't match") # version 0488ade4 # depth 00 # fingerprint 00000000 # child_num 00000000 # chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508 # privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35 # checksum e77e9d71 node.depth = int(data[8:10], 16) node.fingerprint = int(data[10:18], 16) node.child_num = int(data[18:26], 16) node.chain_code = bytes.fromhex(data[26:90]) node.private_key = bytes.fromhex(data[92:156]) # skip 0x00 indicating privkey resp = client.call( proto.LoadDevice( node=node, pin=pin, passphrase_protection=passphrase_protection, language=language, label=label, ) ) client.init_device() return resp @expect(proto.Success, field="message") def self_test(client): if client.features.bootloader_mode is not True: raise RuntimeError("Device must be in bootloader mode") return client.call( proto.SelfTest( payload=b"\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC" ) ) PK!#"hwilib/devices/trezorlib/device.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import os import time import warnings from . import messages as proto from .exceptions import Cancelled from .tools import expect, session from .transport import enumerate_devices, get_transport RECOVERY_BACK = "\x08" # backspace character, sent literally class TrezorDevice: """ This class is deprecated. (There is no reason for it to exist in the first place, it is nothing but a collection of two functions.) Instead, please use functions from the ``trezorlib.transport`` module. """ @classmethod def enumerate(cls): warnings.warn("TrezorDevice is deprecated.", DeprecationWarning) return enumerate_devices() @classmethod def find_by_path(cls, path): warnings.warn("TrezorDevice is deprecated.", DeprecationWarning) return get_transport(path, prefix_search=False) @expect(proto.Success, field="message") def apply_settings( client, label=None, language=None, use_passphrase=None, homescreen=None, passphrase_source=None, auto_lock_delay_ms=None, ): settings = proto.ApplySettings() if label is not None: settings.label = label if language: settings.language = language if use_passphrase is not None: settings.use_passphrase = use_passphrase if homescreen is not None: settings.homescreen = homescreen if passphrase_source is not None: settings.passphrase_source = passphrase_source if auto_lock_delay_ms is not None: settings.auto_lock_delay_ms = auto_lock_delay_ms out = client.call(settings) client.init_device() # Reload Features return out @expect(proto.Success, field="message") def apply_flags(client, flags): out = client.call(proto.ApplyFlags(flags=flags)) client.init_device() # Reload Features return out @expect(proto.Success, field="message") def change_pin(client, remove=False): ret = client.call(proto.ChangePin(remove=remove)) client.init_device() # Re-read features return ret @expect(proto.Success, field="message") def wipe(client): ret = client.call(proto.WipeDevice()) client.init_device() return ret @expect(proto.Success, field="message") def recover( client, word_count=24, passphrase_protection=False, pin_protection=True, label=None, language="english", input_callback=None, type=proto.RecoveryDeviceType.ScrambledWords, dry_run=False, u2f_counter=None, ): if client.features.model == "1" and input_callback is None: raise RuntimeError("Input callback required for Trezor One") if word_count not in (12, 18, 24): raise ValueError("Invalid word count. Use 12/18/24") if client.features.initialized and not dry_run: raise RuntimeError( "Device already initialized. Call device.wipe() and try again." ) if u2f_counter is None: u2f_counter = int(time.time()) res = client.call( proto.RecoveryDevice( word_count=word_count, passphrase_protection=bool(passphrase_protection), pin_protection=bool(pin_protection), label=label, language=language, enforce_wordlist=True, type=type, dry_run=dry_run, u2f_counter=u2f_counter, ) ) while isinstance(res, proto.WordRequest): try: inp = input_callback(res.type) res = client.call(proto.WordAck(word=inp)) except Cancelled: res = client.call(proto.Cancel()) client.init_device() return res @expect(proto.Success, field="message") @session def reset( client, display_random=False, strength=None, passphrase_protection=False, pin_protection=True, label=None, language="english", # u2f_counter=0, # skip_backup=False, # no_backup=False, ): if client.features.initialized: raise RuntimeError( "Device is initialized already. Call wipe_device() and try again." ) if strength is None: if client.features.model == "1": strength = 256 else: strength = 128 # Begin with device reset workflow msg = proto.ResetDevice( display_random=bool(display_random), strength=strength, passphrase_protection=bool(passphrase_protection), pin_protection=bool(pin_protection), language=language, label=label, # u2f_counter=u2f_counter, # skip_backup=bool(skip_backup), # no_backup=bool(no_backup), ) resp = client.call(msg) if not isinstance(resp, proto.EntropyRequest): raise RuntimeError("Invalid response, expected EntropyRequest") external_entropy = os.urandom(32) # LOG.debug("Computer generated entropy: " + external_entropy.hex()) ret = client.call(proto.EntropyAck(entropy=external_entropy)) client.init_device() return ret @expect(proto.Success, field="message") def backup(client): ret = client.call(proto.BackupDevice()) return ret PK!1o&hwilib/devices/trezorlib/exceptions.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . class TrezorException(Exception): pass class TrezorFailure(TrezorException): def __init__(self, failure): self.failure = failure # TODO: this is backwards compatibility with tests. it should be changed super().__init__(self.failure.code, self.failure.message) def __str__(self): from .messages import FailureType types = { getattr(FailureType, name): name for name in dir(FailureType) if not name.startswith("_") } if self.failure.message is not None: return "{}: {}".format(types[self.failure.code], self.failure.message) else: return types[self.failure.code] class PinException(TrezorException): pass class Cancelled(TrezorException): pass class OutdatedFirmwareError(TrezorException): pass PK!{L''$hwilib/devices/trezorlib/firmware.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import hashlib from enum import Enum from typing import NewType, Tuple import construct as c import ecdsa import pyblake2 from . import cosi, messages, tools V1_SIGNATURE_SLOTS = 3 V1_BOOTLOADER_KEYS = { 1: "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", 2: "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", 3: "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", 4: "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", 5: "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", } V2_BOOTLOADER_KEYS = [ bytes.fromhex("c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f"), bytes.fromhex("80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a"), bytes.fromhex("b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751"), ] V2_BOOTLOADER_M = 2 V2_BOOTLOADER_N = 3 V2_CHUNK_SIZE = 1024 * 128 def _transform_vendor_trust(data: bytes) -> bytes: """Byte-swap and bit-invert the VendorTrust field. Vendor trust is interpreted as a bitmask in a 16-bit little-endian integer, with the added twist that 0 means set and 1 means unset. We feed it to a `BitStruct` that expects a big-endian sequence where bits have the traditional meaning. We must therefore do a bitwise negation of each byte, and return them in reverse order. This is the same transformation both ways, fortunately, so we don't need two separate functions. """ return bytes(~b & 0xFF for b in data)[::-1] # fmt: off Toif = c.Struct( "magic" / c.Const(b"TOI"), "format" / c.Enum(c.Byte, full_color=b"f", grayscale=b"g"), "width" / c.Int16ul, "height" / c.Int16ul, "data" / c.Prefixed(c.Int32ul, c.GreedyBytes), ) VendorTrust = c.Transformed(c.BitStruct( "reserved" / c.Default(c.BitsInteger(9), 0), "show_vendor_string" / c.Flag, "require_user_click" / c.Flag, "red_background" / c.Flag, "delay" / c.BitsInteger(4), ), _transform_vendor_trust, 2, _transform_vendor_trust, 2) VendorHeader = c.Struct( "_start_offset" / c.Tell, "magic" / c.Const(b"TRZV"), "_header_len" / c.Padding(4), "expiry" / c.Int32ul, "version" / c.Struct( "major" / c.Int8ul, "minor" / c.Int8ul, ), "vendor_sigs_required" / c.Int8ul, "vendor_sigs_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), "vendor_trust" / VendorTrust, "reserved" / c.Padding(14), "pubkeys" / c.Bytes(32)[c.this.vendor_sigs_n], "vendor_string" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), "vendor_image" / Toif, "_data_end_offset" / c.Tell, c.Padding(-(c.this._data_end_offset + 65) % 512), "sigmask" / c.Byte, "signature" / c.Bytes(64), "_end_offset" / c.Tell, "header_len" / c.Pointer( c.this._start_offset + 4, c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) ), ) VersionLong = c.Struct( "major" / c.Int8ul, "minor" / c.Int8ul, "patch" / c.Int8ul, "build" / c.Int8ul, ) FirmwareHeader = c.Struct( "_start_offset" / c.Tell, "magic" / c.Const(b"TRZF"), "_header_len" / c.Padding(4), "expiry" / c.Int32ul, "code_length" / c.Rebuild( c.Int32ul, lambda this: len(this._.code) if "code" in this._ else (this.code_length or 0) ), "version" / VersionLong, "fix_version" / VersionLong, "reserved" / c.Padding(8), "hashes" / c.Bytes(32)[16], "reserved" / c.Padding(415), "sigmask" / c.Byte, "signature" / c.Bytes(64), "_end_offset" / c.Tell, "header_len" / c.Pointer( c.this._start_offset + 4, c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) ), ) Firmware = c.Struct( "vendor_header" / VendorHeader, "firmware_header" / FirmwareHeader, "_code_offset" / c.Tell, "code" / c.Bytes(c.this.firmware_header.code_length), c.Terminated, ) FirmwareV1 = c.Struct( "magic" / c.Const(b"TRZR"), "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), "key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 "flags" / c.BitStruct( c.Padding(7), "restore_storage" / c.Flag, ), "reserved" / c.Padding(52), "signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], "code" / c.Bytes(c.this.code_length), c.Terminated, ) # fmt: on class FirmwareFormat(Enum): TREZOR_ONE = 1 TREZOR_T = 2 FirmwareType = NewType("FirmwareType", c.Container) ParsedFirmware = Tuple[FirmwareFormat, FirmwareType] def parse(data: bytes) -> ParsedFirmware: if data[:4] == b"TRZR": version = FirmwareFormat.TREZOR_ONE cls = FirmwareV1 elif data[:4] == b"TRZV": version = FirmwareFormat.TREZOR_T cls = Firmware else: raise ValueError("Unrecognized firmware image type") try: fw = cls.parse(data) except Exception as e: raise ValueError("Invalid firmware image") from e return version, FirmwareType(fw) def digest_v1(fw: FirmwareType) -> bytes: return hashlib.sha256(fw.code).digest() def check_sig_v1(fw: FirmwareType, idx: int) -> bool: key_idx = fw.key_indexes[idx] signature = fw.signatures[idx] if key_idx == 0: # no signature = invalid signature return False if key_idx not in V1_BOOTLOADER_KEYS: # unknown pubkey return False pubkey = bytes.fromhex(V1_BOOTLOADER_KEYS[key_idx])[1:] verify = ecdsa.VerifyingKey.from_string( pubkey, curve=ecdsa.curves.SECP256k1, hashfunc=hashlib.sha256 ) try: verify.verify(signature, fw.code) return True except ecdsa.BadSignatureError: return False def _header_digest(header: c.Container, header_type: c.Construct) -> bytes: stripped_header = header.copy() stripped_header.sigmask = 0 stripped_header.signature = b"\0" * 64 header_bytes = header_type.build(stripped_header) return pyblake2.blake2s(header_bytes).digest() def digest(fw: FirmwareType) -> bytes: return _header_digest(fw.firmware_header, FirmwareHeader) def validate(fw: FirmwareType, skip_vendor_header=False) -> bool: vendor_fingerprint = _header_digest(fw.vendor_header, VendorHeader) fingerprint = digest(fw) if not skip_vendor_header: try: # if you want to validate a custom vendor header, you can modify # the global variables to match your keys and m-of-n scheme cosi.verify_m_of_n( fw.vendor_header.signature, vendor_fingerprint, V2_BOOTLOADER_M, V2_BOOTLOADER_N, fw.vendor_header.sigmask, V2_BOOTLOADER_KEYS, ) except Exception: raise ValueError("Invalid vendor header signature.") # XXX expiry is not used now # now = time.gmtime() # if time.gmtime(fw.vendor_header.expiry) < now: # raise ValueError("Vendor header expired.") try: cosi.verify_m_of_n( fw.firmware_header.signature, fingerprint, fw.vendor_header.vendor_sigs_required, fw.vendor_header.vendor_sigs_n, fw.firmware_header.sigmask, fw.vendor_header.pubkeys, ) except Exception: raise ValueError("Invalid firmware signature.") # XXX expiry is not used now # if time.gmtime(fw.firmware_header.expiry) < now: # raise ValueError("Firmware header expired.") for i, expected_hash in enumerate(fw.firmware_header.hashes): if i == 0: # Because first chunk is sent along with headers, there is less code in it. chunk = fw.code[: V2_CHUNK_SIZE - fw._code_offset] else: # Subsequent chunks are shifted by the "missing header" size. ptr = i * V2_CHUNK_SIZE - fw._code_offset chunk = fw.code[ptr : ptr + V2_CHUNK_SIZE] if not chunk and expected_hash == b"\0" * 32: continue chunk_hash = pyblake2.blake2s(chunk).digest() if chunk_hash != expected_hash: raise ValueError("Invalid firmware data.") return True # ====== Client functions ====== # @tools.session def update(client, data): if client.features.bootloader_mode is False: raise RuntimeError("Device must be in bootloader mode") resp = client.call(messages.FirmwareErase(length=len(data))) # TREZORv1 method if isinstance(resp, messages.Success): resp = client.call(messages.FirmwareUpload(payload=data)) if isinstance(resp, messages.Success): return else: raise RuntimeError("Unexpected result %s" % resp) # TREZORv2 method while isinstance(resp, messages.FirmwareRequest): payload = data[resp.offset : resp.offset + resp.length] digest = pyblake2.blake2s(payload).digest() resp = client.call(messages.FirmwareUpload(payload=payload, hash=digest)) if isinstance(resp, messages.Success): return else: raise RuntimeError("Unexpected message %s" % resp) PK! Cnjhwilib/devices/trezorlib/log.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import logging from typing import Optional, Set, Type from . import protobuf OMITTED_MESSAGES = set() # type: Set[Type[protobuf.MessageType]] class PrettyProtobufFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: time = self.formatTime(record) message = "[{time}] {source} {level}: {msg}".format( time=time, level=record.levelname.upper(), source=record.name, msg=super().format(record), ) if hasattr(record, "protobuf"): if type(record.protobuf) in OMITTED_MESSAGES: message += " ({} bytes)".format(record.protobuf.ByteSize()) else: message += "\n" + protobuf.format_message(record.protobuf) return message def enable_debug_output(handler: Optional[logging.Handler] = None): if handler is None: handler = logging.StreamHandler() formatter = PrettyProtobufFormatter() handler.setFormatter(formatter) logger = logging.getLogger("trezorlib") logger.setLevel(logging.DEBUG) logger.addHandler(handler) PK!|[[#hwilib/devices/trezorlib/mapping.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . from . import messages map_type_to_class = {} map_class_to_type = {} def build_map(): for msg_name in dir(messages.MessageType): if msg_name.startswith("__"): continue try: msg_class = getattr(messages, msg_name) except AttributeError: raise ValueError( "Implementation of protobuf message '%s' is missing" % msg_name ) if msg_class.MESSAGE_WIRE_TYPE != getattr(messages.MessageType, msg_name): raise ValueError( "Inconsistent wire type and MessageType record for '%s'" % msg_class ) register_message(msg_class) def register_message(msg_class): if msg_class.MESSAGE_WIRE_TYPE in map_type_to_class: raise Exception( "Message for wire type %s is already registered by %s" % (msg_class.MESSAGE_WIRE_TYPE, get_class(msg_class.MESSAGE_WIRE_TYPE)) ) map_class_to_type[msg_class] = msg_class.MESSAGE_WIRE_TYPE map_type_to_class[msg_class.MESSAGE_WIRE_TYPE] = msg_class def get_type(msg): return map_class_to_type[msg.__class__] def get_class(t): return map_type_to_class[t] build_map() PK!wrr,hwilib/devices/trezorlib/messages/Address.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Address(p.MessageType): MESSAGE_WIRE_TYPE = 30 def __init__( self, address: str = None, ) -> None: self.address = address @classmethod def get_fields(cls): return { 1: ('address', p.UnicodeType, 0), # required } PK!G(waa/hwilib/devices/trezorlib/messages/ApplyFlags.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ApplyFlags(p.MessageType): MESSAGE_WIRE_TYPE = 28 def __init__( self, flags: int = None, ) -> None: self.flags = flags @classmethod def get_fields(cls): return { 1: ('flags', p.UVarintType, 0), } PK!N2hwilib/devices/trezorlib/messages/ApplySettings.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ApplySettings(p.MessageType): MESSAGE_WIRE_TYPE = 25 def __init__( self, language: str = None, label: str = None, use_passphrase: bool = None, homescreen: bytes = None, passphrase_source: int = None, auto_lock_delay_ms: int = None, ) -> None: self.language = language self.label = label self.use_passphrase = use_passphrase self.homescreen = homescreen self.passphrase_source = passphrase_source self.auto_lock_delay_ms = auto_lock_delay_ms @classmethod def get_fields(cls): return { 1: ('language', p.UnicodeType, 0), 2: ('label', p.UnicodeType, 0), 3: ('use_passphrase', p.BoolType, 0), 4: ('homescreen', p.BytesType, 0), 5: ('passphrase_source', p.UVarintType, 0), 6: ('auto_lock_delay_ms', p.UVarintType, 0), } PK![1hwilib/devices/trezorlib/messages/BackupDevice.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class BackupDevice(p.MessageType): MESSAGE_WIRE_TYPE = 34 PK!ݣY.hwilib/devices/trezorlib/messages/ButtonAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ButtonAck(p.MessageType): MESSAGE_WIRE_TYPE = 27 PK!2hwilib/devices/trezorlib/messages/ButtonRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ButtonRequest(p.MessageType): MESSAGE_WIRE_TYPE = 26 def __init__( self, code: int = None, data: str = None, ) -> None: self.code = code self.data = data @classmethod def get_fields(cls): return { 1: ('code', p.UVarintType, 0), 2: ('data', p.UnicodeType, 0), } PK!\h006hwilib/devices/trezorlib/messages/ButtonRequestType.py# Automatically generated by pb2py # fmt: off Other = 1 FeeOverThreshold = 2 ConfirmOutput = 3 ResetDevice = 4 ConfirmWord = 5 WipeDevice = 6 ProtectCall = 7 SignTx = 8 FirmwareCheck = 9 Address = 10 PublicKey = 11 MnemonicWordCount = 12 MnemonicInput = 13 PassphraseType = 14 UnknownDerivationPath = 15 PK!3+hwilib/devices/trezorlib/messages/Cancel.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Cancel(p.MessageType): MESSAGE_WIRE_TYPE = 20 PK! aa.hwilib/devices/trezorlib/messages/ChangePin.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ChangePin(p.MessageType): MESSAGE_WIRE_TYPE = 4 def __init__( self, remove: bool = None, ) -> None: self.remove = remove @classmethod def get_fields(cls): return { 1: ('remove', p.BoolType, 0), } PK!}1hwilib/devices/trezorlib/messages/ClearSession.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ClearSession(p.MessageType): MESSAGE_WIRE_TYPE = 24 PK!3556hwilib/devices/trezorlib/messages/DebugLinkDecision.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkDecision(p.MessageType): MESSAGE_WIRE_TYPE = 100 def __init__( self, yes_no: bool = None, up_down: bool = None, input: str = None, ) -> None: self.yes_no = yes_no self.up_down = up_down self.input = input @classmethod def get_fields(cls): return { 1: ('yes_no', p.BoolType, 0), 2: ('up_down', p.BoolType, 0), 3: ('input', p.UnicodeType, 0), } PK! Uoo8hwilib/devices/trezorlib/messages/DebugLinkFlashErase.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkFlashErase(p.MessageType): MESSAGE_WIRE_TYPE = 113 def __init__( self, sector: int = None, ) -> None: self.sector = sector @classmethod def get_fields(cls): return { 1: ('sector', p.UVarintType, 0), } PK!]~6hwilib/devices/trezorlib/messages/DebugLinkGetState.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkGetState(p.MessageType): MESSAGE_WIRE_TYPE = 101 PK!J((1hwilib/devices/trezorlib/messages/DebugLinkLog.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkLog(p.MessageType): MESSAGE_WIRE_TYPE = 104 def __init__( self, level: int = None, bucket: str = None, text: str = None, ) -> None: self.level = level self.bucket = bucket self.text = text @classmethod def get_fields(cls): return { 1: ('level', p.UVarintType, 0), 2: ('bucket', p.UnicodeType, 0), 3: ('text', p.UnicodeType, 0), } PK!kk4hwilib/devices/trezorlib/messages/DebugLinkMemory.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkMemory(p.MessageType): MESSAGE_WIRE_TYPE = 111 def __init__( self, memory: bytes = None, ) -> None: self.memory = memory @classmethod def get_fields(cls): return { 1: ('memory', p.BytesType, 0), } PK! 8hwilib/devices/trezorlib/messages/DebugLinkMemoryRead.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkMemoryRead(p.MessageType): MESSAGE_WIRE_TYPE = 110 def __init__( self, address: int = None, length: int = None, ) -> None: self.address = address self.length = length @classmethod def get_fields(cls): return { 1: ('address', p.UVarintType, 0), 2: ('length', p.UVarintType, 0), } PK!lP::9hwilib/devices/trezorlib/messages/DebugLinkMemoryWrite.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkMemoryWrite(p.MessageType): MESSAGE_WIRE_TYPE = 112 def __init__( self, address: int = None, memory: bytes = None, flash: bool = None, ) -> None: self.address = address self.memory = memory self.flash = flash @classmethod def get_fields(cls): return { 1: ('address', p.UVarintType, 0), 2: ('memory', p.BytesType, 0), 3: ('flash', p.BoolType, 0), } PK!ll3hwilib/devices/trezorlib/messages/DebugLinkState.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .HDNodeType import HDNodeType class DebugLinkState(p.MessageType): MESSAGE_WIRE_TYPE = 102 def __init__( self, layout: bytes = None, pin: str = None, matrix: str = None, mnemonic: str = None, node: HDNodeType = None, passphrase_protection: bool = None, reset_word: str = None, reset_entropy: bytes = None, recovery_fake_word: str = None, recovery_word_pos: int = None, # reset_word_pos: int = None, ) -> None: self.layout = layout self.pin = pin self.matrix = matrix self.mnemonic = mnemonic self.node = node self.passphrase_protection = passphrase_protection self.reset_word = reset_word self.reset_entropy = reset_entropy self.recovery_fake_word = recovery_fake_word self.recovery_word_pos = recovery_word_pos # self.reset_word_pos = reset_word_pos @classmethod def get_fields(cls): return { 1: ('layout', p.BytesType, 0), 2: ('pin', p.UnicodeType, 0), 3: ('matrix', p.UnicodeType, 0), 4: ('mnemonic', p.UnicodeType, 0), 5: ('node', HDNodeType, 0), 6: ('passphrase_protection', p.BoolType, 0), 7: ('reset_word', p.UnicodeType, 0), 8: ('reset_entropy', p.BytesType, 0), 9: ('recovery_fake_word', p.UnicodeType, 0), 10: ('recovery_word_pos', p.UVarintType, 0), # 11: ('reset_word_pos', p.UVarintType, 0), } PK!~xL2hwilib/devices/trezorlib/messages/DebugLinkStop.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class DebugLinkStop(p.MessageType): MESSAGE_WIRE_TYPE = 103 PK!9drr,hwilib/devices/trezorlib/messages/Entropy.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Entropy(p.MessageType): MESSAGE_WIRE_TYPE = 10 def __init__( self, entropy: bytes = None, ) -> None: self.entropy = entropy @classmethod def get_fields(cls): return { 1: ('entropy', p.BytesType, 0), # required } PK!ii/hwilib/devices/trezorlib/messages/EntropyAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class EntropyAck(p.MessageType): MESSAGE_WIRE_TYPE = 36 def __init__( self, entropy: bytes = None, ) -> None: self.entropy = entropy @classmethod def get_fields(cls): return { 1: ('entropy', p.BytesType, 0), } PK!&„3hwilib/devices/trezorlib/messages/EntropyRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class EntropyRequest(p.MessageType): MESSAGE_WIRE_TYPE = 35 PK!;,hwilib/devices/trezorlib/messages/Failure.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Failure(p.MessageType): MESSAGE_WIRE_TYPE = 3 def __init__( self, code: int = None, message: str = None, ) -> None: self.code = code self.message = message @classmethod def get_fields(cls): return { 1: ('code', p.UVarintType, 0), 2: ('message', p.UnicodeType, 0), } PK!>\0hwilib/devices/trezorlib/messages/FailureType.py# Automatically generated by pb2py # fmt: off UnexpectedMessage = 1 ButtonExpected = 2 DataError = 3 ActionCancelled = 4 PinExpected = 5 PinCancelled = 6 PinInvalid = 7 InvalidSignature = 8 ProcessError = 9 NotEnoughFunds = 10 NotInitialized = 11 PinMismatch = 12 FirmwareError = 99 PK!Ug -hwilib/devices/trezorlib/messages/Features.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Features(p.MessageType): MESSAGE_WIRE_TYPE = 17 def __init__( self, vendor: str = None, major_version: int = None, minor_version: int = None, patch_version: int = None, bootloader_mode: bool = None, device_id: str = None, pin_protection: bool = None, passphrase_protection: bool = None, language: str = None, label: str = None, initialized: bool = None, revision: bytes = None, bootloader_hash: bytes = None, imported: bool = None, pin_cached: bool = None, passphrase_cached: bool = None, firmware_present: bool = None, needs_backup: bool = None, flags: int = None, model: str = None, fw_major: int = None, fw_minor: int = None, fw_patch: int = None, fw_vendor: str = None, fw_vendor_keys: bytes = None, unfinished_backup: bool = None, no_backup: bool = None, ) -> None: self.vendor = vendor self.major_version = major_version self.minor_version = minor_version self.patch_version = patch_version self.bootloader_mode = bootloader_mode self.device_id = device_id self.pin_protection = pin_protection self.passphrase_protection = passphrase_protection self.language = language self.label = label self.initialized = initialized self.revision = revision self.bootloader_hash = bootloader_hash self.imported = imported self.pin_cached = pin_cached self.passphrase_cached = passphrase_cached self.firmware_present = firmware_present self.needs_backup = needs_backup self.flags = flags self.model = model self.fw_major = fw_major self.fw_minor = fw_minor self.fw_patch = fw_patch self.fw_vendor = fw_vendor self.fw_vendor_keys = fw_vendor_keys self.unfinished_backup = unfinished_backup self.no_backup = no_backup @classmethod def get_fields(cls): return { 1: ('vendor', p.UnicodeType, 0), 2: ('major_version', p.UVarintType, 0), 3: ('minor_version', p.UVarintType, 0), 4: ('patch_version', p.UVarintType, 0), 5: ('bootloader_mode', p.BoolType, 0), 6: ('device_id', p.UnicodeType, 0), 7: ('pin_protection', p.BoolType, 0), 8: ('passphrase_protection', p.BoolType, 0), 9: ('language', p.UnicodeType, 0), 10: ('label', p.UnicodeType, 0), 12: ('initialized', p.BoolType, 0), 13: ('revision', p.BytesType, 0), 14: ('bootloader_hash', p.BytesType, 0), 15: ('imported', p.BoolType, 0), 16: ('pin_cached', p.BoolType, 0), 17: ('passphrase_cached', p.BoolType, 0), # 18: ('firmware_present', p.BoolType, 0), # 19: ('needs_backup', p.BoolType, 0), # 20: ('flags', p.UVarintType, 0), 21: ('model', p.UnicodeType, 0), # 22: ('fw_major', p.UVarintType, 0), # 23: ('fw_minor', p.UVarintType, 0), # 24: ('fw_patch', p.UVarintType, 0), # 25: ('fw_vendor', p.UnicodeType, 0), # 26: ('fw_vendor_keys', p.BytesType, 0), # 27: ('unfinished_backup', p.BoolType, 0), # 28: ('no_backup', p.BoolType, 0), } PK!(/gg2hwilib/devices/trezorlib/messages/FirmwareErase.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class FirmwareErase(p.MessageType): MESSAGE_WIRE_TYPE = 6 def __init__( self, length: int = None, ) -> None: self.length = length @classmethod def get_fields(cls): return { 1: ('length', p.UVarintType, 0), } PK!% 4hwilib/devices/trezorlib/messages/FirmwareRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class FirmwareRequest(p.MessageType): MESSAGE_WIRE_TYPE = 8 def __init__( self, offset: int = None, length: int = None, ) -> None: self.offset = offset self.length = length @classmethod def get_fields(cls): return { 1: ('offset', p.UVarintType, 0), 2: ('length', p.UVarintType, 0), } PK!}3hwilib/devices/trezorlib/messages/FirmwareUpload.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class FirmwareUpload(p.MessageType): MESSAGE_WIRE_TYPE = 7 def __init__( self, payload: bytes = None, hash: bytes = None, ) -> None: self.payload = payload self.hash = hash @classmethod def get_fields(cls): return { 1: ('payload', p.BytesType, 0), # required 2: ('hash', p.BytesType, 0), } PK!cbY{{/hwilib/devices/trezorlib/messages/GetAddress.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .MultisigRedeemScriptType import MultisigRedeemScriptType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class GetAddress(p.MessageType): MESSAGE_WIRE_TYPE = 29 def __init__( self, address_n: List[int] = None, coin_name: str = None, show_display: bool = None, multisig: MultisigRedeemScriptType = None, script_type: int = None, ) -> None: self.address_n = address_n if address_n is not None else [] self.coin_name = coin_name self.show_display = show_display self.multisig = multisig self.script_type = script_type @classmethod def get_fields(cls): return { 1: ('address_n', p.UVarintType, p.FLAG_REPEATED), 2: ('coin_name', p.UnicodeType, 0), # default=Bitcoin 3: ('show_display', p.BoolType, 0), 4: ('multisig', MultisigRedeemScriptType, 0), 5: ('script_type', p.UVarintType, 0), # default=SPENDADDRESS } PK! K hh/hwilib/devices/trezorlib/messages/GetEntropy.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class GetEntropy(p.MessageType): MESSAGE_WIRE_TYPE = 9 def __init__( self, size: int = None, ) -> None: self.size = size @classmethod def get_fields(cls): return { 1: ('size', p.UVarintType, 0), # required } PK!Pb0hwilib/devices/trezorlib/messages/GetFeatures.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class GetFeatures(p.MessageType): MESSAGE_WIRE_TYPE = 55 PK!?==1hwilib/devices/trezorlib/messages/GetPublicKey.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p if __debug__: try: from typing import List except ImportError: List = None # type: ignore class GetPublicKey(p.MessageType): MESSAGE_WIRE_TYPE = 11 def __init__( self, address_n: List[int] = None, ecdsa_curve_name: str = None, show_display: bool = None, coin_name: str = None, script_type: int = None, ) -> None: self.address_n = address_n if address_n is not None else [] self.ecdsa_curve_name = ecdsa_curve_name self.show_display = show_display self.coin_name = coin_name self.script_type = script_type @classmethod def get_fields(cls): return { 1: ('address_n', p.UVarintType, p.FLAG_REPEATED), 2: ('ecdsa_curve_name', p.UnicodeType, 0), 3: ('show_display', p.BoolType, 0), 4: ('coin_name', p.UnicodeType, 0), # default=Bitcoin 5: ('script_type', p.UVarintType, 0), # default=SPENDADDRESS } PK!S_3hwilib/devices/trezorlib/messages/HDNodePathType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .HDNodeType import HDNodeType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class HDNodePathType(p.MessageType): def __init__( self, node: HDNodeType = None, address_n: List[int] = None, ) -> None: self.node = node self.address_n = address_n if address_n is not None else [] @classmethod def get_fields(cls): return { 1: ('node', HDNodeType, 0), # required 2: ('address_n', p.UVarintType, p.FLAG_REPEATED), } PK!/hwilib/devices/trezorlib/messages/HDNodeType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class HDNodeType(p.MessageType): def __init__( self, depth: int = None, fingerprint: int = None, child_num: int = None, chain_code: bytes = None, private_key: bytes = None, public_key: bytes = None, ) -> None: self.depth = depth self.fingerprint = fingerprint self.child_num = child_num self.chain_code = chain_code self.private_key = private_key self.public_key = public_key @classmethod def get_fields(cls): return { 1: ('depth', p.UVarintType, 0), # required 2: ('fingerprint', p.UVarintType, 0), # required 3: ('child_num', p.UVarintType, 0), # required 4: ('chain_code', p.BytesType, 0), # required 5: ('private_key', p.BytesType, 0), 6: ('public_key', p.BytesType, 0), } PK!V//1hwilib/devices/trezorlib/messages/IdentityType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class IdentityType(p.MessageType): def __init__( self, proto: str = None, user: str = None, host: str = None, port: str = None, path: str = None, index: int = None, ) -> None: self.proto = proto self.user = user self.host = host self.port = port self.path = path self.index = index @classmethod def get_fields(cls): return { 1: ('proto', p.UnicodeType, 0), 2: ('user', p.UnicodeType, 0), 3: ('host', p.UnicodeType, 0), 4: ('port', p.UnicodeType, 0), 5: ('path', p.UnicodeType, 0), 6: ('index', p.UVarintType, 0), # default=0 } PK!˟/hwilib/devices/trezorlib/messages/Initialize.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Initialize(p.MessageType): MESSAGE_WIRE_TYPE = 0 def __init__( self, state: bytes = None, skip_passphrase: bool = None, ) -> None: self.state = state self.skip_passphrase = skip_passphrase @classmethod def get_fields(cls): return { 1: ('state', p.BytesType, 0), 2: ('skip_passphrase', p.BoolType, 0), } PK!.܄4hwilib/devices/trezorlib/messages/InputScriptType.py# Automatically generated by pb2py # fmt: off SPENDADDRESS = 0 SPENDMULTISIG = 1 EXTERNAL = 2 SPENDWITNESS = 3 SPENDP2SHWITNESS = 4 PK!/hwilib/devices/trezorlib/messages/LoadDevice.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .HDNodeType import HDNodeType class LoadDevice(p.MessageType): MESSAGE_WIRE_TYPE = 13 def __init__( self, mnemonic: str = None, node: HDNodeType = None, pin: str = None, passphrase_protection: bool = None, language: str = None, label: str = None, skip_checksum: bool = None, u2f_counter: int = None, ) -> None: self.mnemonic = mnemonic self.node = node self.pin = pin self.passphrase_protection = passphrase_protection self.language = language self.label = label self.skip_checksum = skip_checksum self.u2f_counter = u2f_counter @classmethod def get_fields(cls): return { 1: ('mnemonic', p.UnicodeType, 0), 2: ('node', HDNodeType, 0), 3: ('pin', p.UnicodeType, 0), 4: ('passphrase_protection', p.BoolType, 0), 5: ('language', p.UnicodeType, 0), # default=english 6: ('label', p.UnicodeType, 0), 7: ('skip_checksum', p.BoolType, 0), 8: ('u2f_counter', p.UVarintType, 0), } PK!5hwilib/devices/trezorlib/messages/MessageSignature.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class MessageSignature(p.MessageType): MESSAGE_WIRE_TYPE = 40 def __init__( self, address: str = None, signature: bytes = None, ) -> None: self.address = address self.signature = signature @classmethod def get_fields(cls): return { 1: ('address', p.UnicodeType, 0), 2: ('signature', p.BytesType, 0), } PK!;0hwilib/devices/trezorlib/messages/MessageType.py# Automatically generated by pb2py # fmt: off Initialize = 0 Ping = 1 Success = 2 Failure = 3 ChangePin = 4 WipeDevice = 5 GetEntropy = 9 Entropy = 10 LoadDevice = 13 ResetDevice = 14 Features = 17 PinMatrixRequest = 18 PinMatrixAck = 19 Cancel = 20 ClearSession = 24 ApplySettings = 25 ButtonRequest = 26 ButtonAck = 27 ApplyFlags = 28 BackupDevice = 34 EntropyRequest = 35 EntropyAck = 36 PassphraseRequest = 41 PassphraseAck = 42 PassphraseStateRequest = 77 PassphraseStateAck = 78 RecoveryDevice = 45 WordRequest = 46 WordAck = 47 GetFeatures = 55 FirmwareErase = 6 FirmwareUpload = 7 FirmwareRequest = 8 SelfTest = 32 GetPublicKey = 11 PublicKey = 12 SignTx = 15 TxRequest = 21 TxAck = 22 GetAddress = 29 Address = 30 SignMessage = 38 VerifyMessage = 39 MessageSignature = 40 SignIdentity = 53 SignedIdentity = 54 DebugLinkDecision = 100 DebugLinkGetState = 101 DebugLinkState = 102 DebugLinkStop = 103 DebugLinkLog = 104 DebugLinkMemoryRead = 110 DebugLinkMemory = 111 DebugLinkMemoryWrite = 112 DebugLinkFlashErase = 113 PK!n99=hwilib/devices/trezorlib/messages/MultisigRedeemScriptType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .HDNodePathType import HDNodePathType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class MultisigRedeemScriptType(p.MessageType): def __init__( self, pubkeys: List[HDNodePathType] = None, signatures: List[bytes] = None, m: int = None, ) -> None: self.pubkeys = pubkeys if pubkeys is not None else [] self.signatures = signatures if signatures is not None else [] self.m = m @classmethod def get_fields(cls): return { 1: ('pubkeys', HDNodePathType, p.FLAG_REPEATED), 2: ('signatures', p.BytesType, p.FLAG_REPEATED), 3: ('m', p.UVarintType, 0), } PK!7 {5hwilib/devices/trezorlib/messages/OutputScriptType.py# Automatically generated by pb2py # fmt: off PAYTOADDRESS = 0 PAYTOSCRIPTHASH = 1 PAYTOMULTISIG = 2 PAYTOOPRETURN = 3 PAYTOWITNESS = 4 PAYTOP2SHWITNESS = 5 PK!-2hwilib/devices/trezorlib/messages/PassphraseAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PassphraseAck(p.MessageType): MESSAGE_WIRE_TYPE = 42 def __init__( self, passphrase: str = None, state: bytes = None, ) -> None: self.passphrase = passphrase self.state = state @classmethod def get_fields(cls): return { 1: ('passphrase', p.UnicodeType, 0), 2: ('state', p.BytesType, 0), } PK!79vv6hwilib/devices/trezorlib/messages/PassphraseRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PassphraseRequest(p.MessageType): MESSAGE_WIRE_TYPE = 41 def __init__( self, on_device: bool = None, ) -> None: self.on_device = on_device @classmethod def get_fields(cls): return { 1: ('on_device', p.BoolType, 0), } PK! JJ9hwilib/devices/trezorlib/messages/PassphraseSourceType.py# Automatically generated by pb2py # fmt: off ASK = 0 DEVICE = 1 HOST = 2 PK!N7hwilib/devices/trezorlib/messages/PassphraseStateAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PassphraseStateAck(p.MessageType): MESSAGE_WIRE_TYPE = 78 PK!?jmm;hwilib/devices/trezorlib/messages/PassphraseStateRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PassphraseStateRequest(p.MessageType): MESSAGE_WIRE_TYPE = 77 def __init__( self, state: bytes = None, ) -> None: self.state = state @classmethod def get_fields(cls): return { 1: ('state', p.BytesType, 0), } PK!gg1hwilib/devices/trezorlib/messages/PinMatrixAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PinMatrixAck(p.MessageType): MESSAGE_WIRE_TYPE = 19 def __init__( self, pin: str = None, ) -> None: self.pin = pin @classmethod def get_fields(cls): return { 1: ('pin', p.UnicodeType, 0), # required } PK!&(cc5hwilib/devices/trezorlib/messages/PinMatrixRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class PinMatrixRequest(p.MessageType): MESSAGE_WIRE_TYPE = 18 def __init__( self, type: int = None, ) -> None: self.type = type @classmethod def get_fields(cls): return { 1: ('type', p.UVarintType, 0), } PK!14.UU9hwilib/devices/trezorlib/messages/PinMatrixRequestType.py# Automatically generated by pb2py # fmt: off Current = 1 NewFirst = 2 NewSecond = 3 PK!r)hwilib/devices/trezorlib/messages/Ping.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Ping(p.MessageType): MESSAGE_WIRE_TYPE = 1 def __init__( self, message: str = None, button_protection: bool = None, pin_protection: bool = None, passphrase_protection: bool = None, ) -> None: self.message = message self.button_protection = button_protection self.pin_protection = pin_protection self.passphrase_protection = passphrase_protection @classmethod def get_fields(cls): return { 1: ('message', p.UnicodeType, 0), 2: ('button_protection', p.BoolType, 0), 3: ('pin_protection', p.BoolType, 0), 4: ('passphrase_protection', p.BoolType, 0), } PK!Fڀ.hwilib/devices/trezorlib/messages/PublicKey.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .HDNodeType import HDNodeType class PublicKey(p.MessageType): MESSAGE_WIRE_TYPE = 12 def __init__( self, node: HDNodeType = None, xpub: str = None, ) -> None: self.node = node self.xpub = xpub @classmethod def get_fields(cls): return { 1: ('node', HDNodeType, 0), # required 2: ('xpub', p.UnicodeType, 0), } PK!?MM3hwilib/devices/trezorlib/messages/RecoveryDevice.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class RecoveryDevice(p.MessageType): MESSAGE_WIRE_TYPE = 45 def __init__( self, word_count: int = None, passphrase_protection: bool = None, pin_protection: bool = None, language: str = None, label: str = None, enforce_wordlist: bool = None, type: int = None, u2f_counter: int = None, dry_run: bool = None, ) -> None: self.word_count = word_count self.passphrase_protection = passphrase_protection self.pin_protection = pin_protection self.language = language self.label = label self.enforce_wordlist = enforce_wordlist self.type = type self.u2f_counter = u2f_counter self.dry_run = dry_run @classmethod def get_fields(cls): return { 1: ('word_count', p.UVarintType, 0), 2: ('passphrase_protection', p.BoolType, 0), 3: ('pin_protection', p.BoolType, 0), 4: ('language', p.UnicodeType, 0), # default=english 5: ('label', p.UnicodeType, 0), 6: ('enforce_wordlist', p.BoolType, 0), 8: ('type', p.UVarintType, 0), 9: ('u2f_counter', p.UVarintType, 0), 10: ('dry_run', p.BoolType, 0), } PK!׭2/LL7hwilib/devices/trezorlib/messages/RecoveryDeviceType.py# Automatically generated by pb2py # fmt: off ScrambledWords = 0 Matrix = 1 PK!-rqq0hwilib/devices/trezorlib/messages/RequestType.py# Automatically generated by pb2py # fmt: off TXINPUT = 0 TXOUTPUT = 1 TXMETA = 2 TXFINISHED = 3 TXEXTRADATA = 4 PK!ZH||0hwilib/devices/trezorlib/messages/ResetDevice.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class ResetDevice(p.MessageType): MESSAGE_WIRE_TYPE = 14 def __init__( self, display_random: bool = None, strength: int = None, passphrase_protection: bool = None, pin_protection: bool = None, language: str = None, label: str = None, # u2f_counter: int = None, # skip_backup: bool = None, # no_backup: bool = None, ) -> None: self.display_random = display_random self.strength = strength self.passphrase_protection = passphrase_protection self.pin_protection = pin_protection self.language = language self.label = label # self.u2f_counter = u2f_counter # self.skip_backup = skip_backup # self.no_backup = no_backup @classmethod def get_fields(cls): return { 1: ('display_random', p.BoolType, 0), 2: ('strength', p.UVarintType, 0), # default=256 3: ('passphrase_protection', p.BoolType, 0), 4: ('pin_protection', p.BoolType, 0), 5: ('language', p.UnicodeType, 0), # default=english 6: ('label', p.UnicodeType, 0), # 7: ('u2f_counter', p.UVarintType, 0), # 8: ('skip_backup', p.BoolType, 0), # 9: ('no_backup', p.BoolType, 0), } PK!OKgg-hwilib/devices/trezorlib/messages/SelfTest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class SelfTest(p.MessageType): MESSAGE_WIRE_TYPE = 32 def __init__( self, payload: bytes = None, ) -> None: self.payload = payload @classmethod def get_fields(cls): return { 1: ('payload', p.BytesType, 0), } PK!\ II1hwilib/devices/trezorlib/messages/SignIdentity.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .IdentityType import IdentityType class SignIdentity(p.MessageType): MESSAGE_WIRE_TYPE = 53 def __init__( self, identity: IdentityType = None, challenge_hidden: bytes = None, challenge_visual: str = None, ecdsa_curve_name: str = None, ) -> None: self.identity = identity self.challenge_hidden = challenge_hidden self.challenge_visual = challenge_visual self.ecdsa_curve_name = ecdsa_curve_name @classmethod def get_fields(cls): return { 1: ('identity', IdentityType, 0), 2: ('challenge_hidden', p.BytesType, 0), 3: ('challenge_visual', p.UnicodeType, 0), 4: ('ecdsa_curve_name', p.UnicodeType, 0), } PK!婨0hwilib/devices/trezorlib/messages/SignMessage.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p if __debug__: try: from typing import List except ImportError: List = None # type: ignore class SignMessage(p.MessageType): MESSAGE_WIRE_TYPE = 38 def __init__( self, address_n: List[int] = None, message: bytes = None, coin_name: str = None, script_type: int = None, ) -> None: self.address_n = address_n if address_n is not None else [] self.message = message self.coin_name = coin_name self.script_type = script_type @classmethod def get_fields(cls): return { 1: ('address_n', p.UVarintType, p.FLAG_REPEATED), 2: ('message', p.BytesType, 0), # required 3: ('coin_name', p.UnicodeType, 0), # default=Bitcoin 4: ('script_type', p.UVarintType, 0), # default=SPENDADDRESS } PK!$*4pp+hwilib/devices/trezorlib/messages/SignTx.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class SignTx(p.MessageType): MESSAGE_WIRE_TYPE = 15 def __init__( self, outputs_count: int = None, inputs_count: int = None, coin_name: str = None, version: int = None, lock_time: int = None, expiry: int = None, overwintered: bool = None, version_group_id: int = None, timestamp: int = None, ) -> None: self.outputs_count = outputs_count self.inputs_count = inputs_count self.coin_name = coin_name self.version = version self.lock_time = lock_time self.expiry = expiry self.overwintered = overwintered self.version_group_id = version_group_id self.timestamp = timestamp @classmethod def get_fields(cls): return { 1: ('outputs_count', p.UVarintType, 0), # required 2: ('inputs_count', p.UVarintType, 0), # required 3: ('coin_name', p.UnicodeType, 0), # default=Bitcoin 4: ('version', p.UVarintType, 0), # default=1 5: ('lock_time', p.UVarintType, 0), # default=0 6: ('expiry', p.UVarintType, 0), 7: ('overwintered', p.BoolType, 0), 8: ('version_group_id', p.UVarintType, 0), 9: ('timestamp', p.UVarintType, 0), } PK!̉UU3hwilib/devices/trezorlib/messages/SignedIdentity.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class SignedIdentity(p.MessageType): MESSAGE_WIRE_TYPE = 54 def __init__( self, address: str = None, public_key: bytes = None, signature: bytes = None, ) -> None: self.address = address self.public_key = public_key self.signature = signature @classmethod def get_fields(cls): return { 1: ('address', p.UnicodeType, 0), 2: ('public_key', p.BytesType, 0), 3: ('signature', p.BytesType, 0), } PK!A-ee,hwilib/devices/trezorlib/messages/Success.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class Success(p.MessageType): MESSAGE_WIRE_TYPE = 2 def __init__( self, message: str = None, ) -> None: self.message = message @classmethod def get_fields(cls): return { 1: ('message', p.UnicodeType, 0), } PK!LT4hwilib/devices/trezorlib/messages/TransactionType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .TxInputType import TxInputType from .TxOutputBinType import TxOutputBinType from .TxOutputType import TxOutputType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class TransactionType(p.MessageType): def __init__( self, version: int = None, inputs: List[TxInputType] = None, bin_outputs: List[TxOutputBinType] = None, lock_time: int = None, outputs: List[TxOutputType] = None, inputs_cnt: int = None, outputs_cnt: int = None, extra_data: bytes = None, extra_data_len: int = None, expiry: int = None, overwintered: bool = None, version_group_id: int = None, timestamp: int = None, ) -> None: self.version = version self.inputs = inputs if inputs is not None else [] self.bin_outputs = bin_outputs if bin_outputs is not None else [] self.lock_time = lock_time self.outputs = outputs if outputs is not None else [] self.inputs_cnt = inputs_cnt self.outputs_cnt = outputs_cnt self.extra_data = extra_data self.extra_data_len = extra_data_len self.expiry = expiry self.overwintered = overwintered self.version_group_id = version_group_id self.timestamp = timestamp @classmethod def get_fields(cls): return { 1: ('version', p.UVarintType, 0), 2: ('inputs', TxInputType, p.FLAG_REPEATED), 3: ('bin_outputs', TxOutputBinType, p.FLAG_REPEATED), 4: ('lock_time', p.UVarintType, 0), 5: ('outputs', TxOutputType, p.FLAG_REPEATED), 6: ('inputs_cnt', p.UVarintType, 0), 7: ('outputs_cnt', p.UVarintType, 0), 8: ('extra_data', p.BytesType, 0), 9: ('extra_data_len', p.UVarintType, 0), 10: ('expiry', p.UVarintType, 0), 11: ('overwintered', p.BoolType, 0), 12: ('version_group_id', p.UVarintType, 0), 13: ('timestamp', p.UVarintType, 0), } PK!*hwilib/devices/trezorlib/messages/TxAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .TransactionType import TransactionType class TxAck(p.MessageType): MESSAGE_WIRE_TYPE = 22 def __init__( self, tx: TransactionType = None, ) -> None: self.tx = tx @classmethod def get_fields(cls): return { 1: ('tx', TransactionType, 0), } PK!-6#330hwilib/devices/trezorlib/messages/TxInputType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .MultisigRedeemScriptType import MultisigRedeemScriptType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class TxInputType(p.MessageType): def __init__( self, address_n: List[int] = None, prev_hash: bytes = None, prev_index: int = None, script_sig: bytes = None, sequence: int = None, script_type: int = None, multisig: MultisigRedeemScriptType = None, amount: int = None, decred_tree: int = None, decred_script_version: int = None, prev_block_hash_bip115: bytes = None, prev_block_height_bip115: int = None, ) -> None: self.address_n = address_n if address_n is not None else [] self.prev_hash = prev_hash self.prev_index = prev_index self.script_sig = script_sig self.sequence = sequence self.script_type = script_type self.multisig = multisig self.amount = amount self.decred_tree = decred_tree self.decred_script_version = decred_script_version self.prev_block_hash_bip115 = prev_block_hash_bip115 self.prev_block_height_bip115 = prev_block_height_bip115 @classmethod def get_fields(cls): return { 1: ('address_n', p.UVarintType, p.FLAG_REPEATED), 2: ('prev_hash', p.BytesType, 0), # required 3: ('prev_index', p.UVarintType, 0), # required 4: ('script_sig', p.BytesType, 0), 5: ('sequence', p.UVarintType, 0), # default=4294967295 6: ('script_type', p.UVarintType, 0), # default=SPENDADDRESS 7: ('multisig', MultisigRedeemScriptType, 0), 8: ('amount', p.UVarintType, 0), 9: ('decred_tree', p.UVarintType, 0), 10: ('decred_script_version', p.UVarintType, 0), 11: ('prev_block_hash_bip115', p.BytesType, 0), 12: ('prev_block_height_bip115', p.UVarintType, 0), } PK!d4hwilib/devices/trezorlib/messages/TxOutputBinType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class TxOutputBinType(p.MessageType): def __init__( self, amount: int = None, script_pubkey: bytes = None, decred_script_version: int = None, ) -> None: self.amount = amount self.script_pubkey = script_pubkey self.decred_script_version = decred_script_version @classmethod def get_fields(cls): return { 1: ('amount', p.UVarintType, 0), # required 2: ('script_pubkey', p.BytesType, 0), # required 3: ('decred_script_version', p.UVarintType, 0), } PK!baoS1hwilib/devices/trezorlib/messages/TxOutputType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .MultisigRedeemScriptType import MultisigRedeemScriptType if __debug__: try: from typing import List except ImportError: List = None # type: ignore class TxOutputType(p.MessageType): def __init__( self, address: str = None, address_n: List[int] = None, amount: int = None, script_type: int = None, multisig: MultisigRedeemScriptType = None, op_return_data: bytes = None, decred_script_version: int = None, block_hash_bip115: bytes = None, block_height_bip115: int = None, ) -> None: self.address = address self.address_n = address_n if address_n is not None else [] self.amount = amount self.script_type = script_type self.multisig = multisig self.op_return_data = op_return_data self.decred_script_version = decred_script_version self.block_hash_bip115 = block_hash_bip115 self.block_height_bip115 = block_height_bip115 @classmethod def get_fields(cls): return { 1: ('address', p.UnicodeType, 0), 2: ('address_n', p.UVarintType, p.FLAG_REPEATED), 3: ('amount', p.UVarintType, 0), # required 4: ('script_type', p.UVarintType, 0), # required 5: ('multisig', MultisigRedeemScriptType, 0), 6: ('op_return_data', p.BytesType, 0), 7: ('decred_script_version', p.UVarintType, 0), 8: ('block_hash_bip115', p.BytesType, 0), 9: ('block_height_bip115', p.UVarintType, 0), } PK! N.hwilib/devices/trezorlib/messages/TxRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p from .TxRequestDetailsType import TxRequestDetailsType from .TxRequestSerializedType import TxRequestSerializedType class TxRequest(p.MessageType): MESSAGE_WIRE_TYPE = 21 def __init__( self, request_type: int = None, details: TxRequestDetailsType = None, serialized: TxRequestSerializedType = None, ) -> None: self.request_type = request_type self.details = details self.serialized = serialized @classmethod def get_fields(cls): return { 1: ('request_type', p.UVarintType, 0), 2: ('details', TxRequestDetailsType, 0), 3: ('serialized', TxRequestSerializedType, 0), } PK!:,/9hwilib/devices/trezorlib/messages/TxRequestDetailsType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class TxRequestDetailsType(p.MessageType): def __init__( self, request_index: int = None, tx_hash: bytes = None, extra_data_len: int = None, extra_data_offset: int = None, ) -> None: self.request_index = request_index self.tx_hash = tx_hash self.extra_data_len = extra_data_len self.extra_data_offset = extra_data_offset @classmethod def get_fields(cls): return { 1: ('request_index', p.UVarintType, 0), 2: ('tx_hash', p.BytesType, 0), 3: ('extra_data_len', p.UVarintType, 0), 4: ('extra_data_offset', p.UVarintType, 0), } PK! oo<hwilib/devices/trezorlib/messages/TxRequestSerializedType.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class TxRequestSerializedType(p.MessageType): def __init__( self, signature_index: int = None, signature: bytes = None, serialized_tx: bytes = None, ) -> None: self.signature_index = signature_index self.signature = signature self.serialized_tx = serialized_tx @classmethod def get_fields(cls): return { 1: ('signature_index', p.UVarintType, 0), 2: ('signature', p.BytesType, 0), 3: ('serialized_tx', p.BytesType, 0), } PK!2T2hwilib/devices/trezorlib/messages/VerifyMessage.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class VerifyMessage(p.MessageType): MESSAGE_WIRE_TYPE = 39 def __init__( self, address: str = None, signature: bytes = None, message: bytes = None, coin_name: str = None, ) -> None: self.address = address self.signature = signature self.message = message self.coin_name = coin_name @classmethod def get_fields(cls): return { 1: ('address', p.UnicodeType, 0), 2: ('signature', p.BytesType, 0), 3: ('message', p.BytesType, 0), 4: ('coin_name', p.UnicodeType, 0), # default=Bitcoin } PK!L{wc/hwilib/devices/trezorlib/messages/WipeDevice.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class WipeDevice(p.MessageType): MESSAGE_WIRE_TYPE = 5 PK!7ff,hwilib/devices/trezorlib/messages/WordAck.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class WordAck(p.MessageType): MESSAGE_WIRE_TYPE = 47 def __init__( self, word: str = None, ) -> None: self.word = word @classmethod def get_fields(cls): return { 1: ('word', p.UnicodeType, 0), # required } PK!ļ^^0hwilib/devices/trezorlib/messages/WordRequest.py# Automatically generated by pb2py # fmt: off from .. import protobuf as p class WordRequest(p.MessageType): MESSAGE_WIRE_TYPE = 46 def __init__( self, type: int = None, ) -> None: self.type = type @classmethod def get_fields(cls): return { 1: ('type', p.UVarintType, 0), } PK!W;PP4hwilib/devices/trezorlib/messages/WordRequestType.py# Automatically generated by pb2py # fmt: off Plain = 0 Matrix9 = 1 Matrix6 = 2 PK!' -hwilib/devices/trezorlib/messages/__init__.py# Automatically generated by pb2py # fmt: off from .Address import Address from .ApplyFlags import ApplyFlags from .ApplySettings import ApplySettings from .BackupDevice import BackupDevice from .ButtonAck import ButtonAck from .ButtonRequest import ButtonRequest from .Cancel import Cancel from .ChangePin import ChangePin from .ClearSession import ClearSession from .DebugLinkDecision import DebugLinkDecision from .DebugLinkFlashErase import DebugLinkFlashErase from .DebugLinkGetState import DebugLinkGetState from .DebugLinkLog import DebugLinkLog from .DebugLinkMemory import DebugLinkMemory from .DebugLinkMemoryRead import DebugLinkMemoryRead from .DebugLinkMemoryWrite import DebugLinkMemoryWrite from .DebugLinkState import DebugLinkState from .DebugLinkStop import DebugLinkStop from .Entropy import Entropy from .EntropyAck import EntropyAck from .EntropyRequest import EntropyRequest from .Failure import Failure from .Features import Features from .FirmwareErase import FirmwareErase from .FirmwareRequest import FirmwareRequest from .FirmwareUpload import FirmwareUpload from .GetAddress import GetAddress from .GetEntropy import GetEntropy from .GetFeatures import GetFeatures from .GetPublicKey import GetPublicKey from .HDNodePathType import HDNodePathType from .HDNodeType import HDNodeType from .IdentityType import IdentityType from .Initialize import Initialize from .LoadDevice import LoadDevice from .MessageSignature import MessageSignature from .MultisigRedeemScriptType import MultisigRedeemScriptType from .PassphraseAck import PassphraseAck from .PassphraseRequest import PassphraseRequest from .PassphraseStateAck import PassphraseStateAck from .PassphraseStateRequest import PassphraseStateRequest from .PinMatrixAck import PinMatrixAck from .PinMatrixRequest import PinMatrixRequest from .Ping import Ping from .PublicKey import PublicKey from .RecoveryDevice import RecoveryDevice from .ResetDevice import ResetDevice from .SelfTest import SelfTest from .SignIdentity import SignIdentity from .SignMessage import SignMessage from .SignTx import SignTx from .SignedIdentity import SignedIdentity from .Success import Success from .TransactionType import TransactionType from .TxAck import TxAck from .TxInputType import TxInputType from .TxOutputBinType import TxOutputBinType from .TxOutputType import TxOutputType from .TxRequest import TxRequest from .TxRequestDetailsType import TxRequestDetailsType from .TxRequestSerializedType import TxRequestSerializedType from .VerifyMessage import VerifyMessage from .WipeDevice import WipeDevice from .WordAck import WordAck from .WordRequest import WordRequest from . import ButtonRequestType from . import FailureType from . import InputScriptType from . import MessageType from . import OutputScriptType from . import PassphraseSourceType from . import PinMatrixRequestType from . import RecoveryDeviceType from . import RequestType from . import WordRequestType PK!R..$hwilib/devices/trezorlib/protobuf.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . ''' Extremely minimal streaming codec for a subset of protobuf. Supports uint32, bytes, string, embedded message and repeated fields. For de-sererializing (loading) protobuf types, object with `Reader` interface is required: >>> class Reader: >>> def readinto(self, buffer): >>> """ >>> Reads `len(buffer)` bytes into `buffer`, or raises `EOFError`. >>> """ For serializing (dumping) protobuf types, object with `Writer` interface is required: >>> class Writer: >>> def write(self, buffer): >>> """ >>> Writes all bytes from `buffer`, or raises `EOFError`. >>> """ ''' from io import BytesIO from typing import Any, Optional _UVARINT_BUFFER = bytearray(1) def load_uvarint(reader): buffer = _UVARINT_BUFFER result = 0 shift = 0 byte = 0x80 while byte & 0x80: if reader.readinto(buffer) == 0: raise EOFError byte = buffer[0] result += (byte & 0x7F) << shift shift += 7 return result def dump_uvarint(writer, n): if n < 0: raise ValueError("Cannot dump signed value, convert it to unsigned first.") buffer = _UVARINT_BUFFER shifted = True while shifted: shifted = n >> 7 buffer[0] = (n & 0x7F) | (0x80 if shifted else 0x00) writer.write(buffer) n = shifted # protobuf interleaved signed encoding: # https://developers.google.com/protocol-buffers/docs/encoding#structure # the idea is to save the sign in LSbit instead of twos-complement. # so counting up, you go: 0, -1, 1, -2, 2, ... (as the first bit changes, sign flips) # # To achieve this with a twos-complement number: # 1. shift left by 1, leaving LSbit free # 2. if the number is negative, do bitwise negation. # This keeps positive number the same, and converts negative from twos-complement # to the appropriate value, while setting the sign bit. # # The original algorithm makes use of the fact that arithmetic (signed) shift # keeps the sign bits, so for a n-bit number, (x >> n) gets us "all sign bits". # Then you can take "number XOR all-sign-bits", which is XOR 0 (identity) for positive # and XOR 1 (bitwise negation) for negative. Cute and efficient. # # But this is harder in Python because we don't natively know the bit size of the number. # So we have to branch on whether the number is negative. def sint_to_uint(sint): res = sint << 1 if sint < 0: res = ~res return res def uint_to_sint(uint): sign = uint & 1 res = uint >> 1 if sign: res = ~res return res class UVarintType: WIRE_TYPE = 0 class SVarintType: WIRE_TYPE = 0 class BoolType: WIRE_TYPE = 0 class BytesType: WIRE_TYPE = 2 class UnicodeType: WIRE_TYPE = 2 class MessageType: WIRE_TYPE = 2 @classmethod def get_fields(cls): return {} def __init__(self, **kwargs): for kw in kwargs: setattr(self, kw, kwargs[kw]) self._fill_missing() def __eq__(self, rhs): return self.__class__ is rhs.__class__ and self.__dict__ == rhs.__dict__ def __repr__(self): d = {} for key, value in self.__dict__.items(): if value is None or value == []: continue d[key] = value return "<%s: %s>" % (self.__class__.__name__, d) def __iter__(self): return iter(self.keys()) def keys(self): return (name for name, _, _ in self.get_fields().values()) def __getitem__(self, key): return getattr(self, key) def _fill_missing(self): # fill missing fields for fname, ftype, fflags in self.get_fields().values(): if not hasattr(self, fname): if fflags & FLAG_REPEATED: setattr(self, fname, []) else: setattr(self, fname, None) def CopyFrom(self, obj): self.__dict__ = obj.__dict__.copy() def ByteSize(self): data = BytesIO() dump_message(data, self) return len(data.getvalue()) class LimitedReader: def __init__(self, reader, limit): self.reader = reader self.limit = limit def readinto(self, buf): if self.limit < len(buf): raise EOFError else: nread = self.reader.readinto(buf) self.limit -= nread return nread class CountingWriter: def __init__(self): self.size = 0 def write(self, buf): nwritten = len(buf) self.size += nwritten return nwritten FLAG_REPEATED = 1 def load_message(reader, msg_type): fields = msg_type.get_fields() msg = msg_type() while True: try: fkey = load_uvarint(reader) except EOFError: break # no more fields to load ftag = fkey >> 3 wtype = fkey & 7 field = fields.get(ftag, None) if field is None: # unknown field, skip it if wtype == 0: load_uvarint(reader) elif wtype == 2: ivalue = load_uvarint(reader) reader.readinto(bytearray(ivalue)) else: raise ValueError continue fname, ftype, fflags = field if wtype != ftype.WIRE_TYPE: raise TypeError # parsed wire type differs from the schema ivalue = load_uvarint(reader) if ftype is UVarintType: fvalue = ivalue elif ftype is SVarintType: fvalue = uint_to_sint(ivalue) elif ftype is BoolType: fvalue = bool(ivalue) elif ftype is BytesType: buf = bytearray(ivalue) reader.readinto(buf) fvalue = bytes(buf) elif ftype is UnicodeType: buf = bytearray(ivalue) reader.readinto(buf) fvalue = buf.decode() elif issubclass(ftype, MessageType): fvalue = load_message(LimitedReader(reader, ivalue), ftype) else: raise TypeError # field type is unknown if fflags & FLAG_REPEATED: pvalue = getattr(msg, fname) pvalue.append(fvalue) fvalue = pvalue setattr(msg, fname, fvalue) return msg def dump_message(writer, msg): repvalue = [0] mtype = msg.__class__ fields = mtype.get_fields() for ftag in fields: fname, ftype, fflags = fields[ftag] fvalue = getattr(msg, fname, None) if fvalue is None: continue fkey = (ftag << 3) | ftype.WIRE_TYPE if not fflags & FLAG_REPEATED: repvalue[0] = fvalue fvalue = repvalue for svalue in fvalue: dump_uvarint(writer, fkey) if ftype is UVarintType: dump_uvarint(writer, svalue) elif ftype is SVarintType: dump_uvarint(writer, sint_to_uint(svalue)) elif ftype is BoolType: dump_uvarint(writer, int(svalue)) elif ftype is BytesType: dump_uvarint(writer, len(svalue)) writer.write(svalue) elif ftype is UnicodeType: if not isinstance(svalue, bytes): svalue = svalue.encode() dump_uvarint(writer, len(svalue)) writer.write(svalue) elif issubclass(ftype, MessageType): counter = CountingWriter() dump_message(counter, svalue) dump_uvarint(writer, counter.size) dump_message(writer, svalue) else: raise TypeError def format_message( pb: MessageType, indent: int = 0, sep: str = " " * 4, truncate_after: Optional[int] = 256, truncate_to: Optional[int] = 64, ) -> str: def mostly_printable(bytes): if not bytes: return True printable = sum(1 for byte in bytes if 0x20 <= byte <= 0x7E) return printable / len(bytes) > 0.8 def pformat_value(value: Any, indent: int) -> str: level = sep * indent leadin = sep * (indent + 1) if isinstance(value, MessageType): return format_message(value, indent, sep) if isinstance(value, list): # short list of simple values if not value or not isinstance(value[0], MessageType): return repr(value) # long list, one line per entry lines = ["[", level + "]"] lines[1:1] = [leadin + pformat_value(x, indent + 1) + "," for x in value] return "\n".join(lines) if isinstance(value, dict): lines = ["{"] for key, val in sorted(value.items()): if val is None or val == []: continue lines.append(leadin + key + ": " + pformat_value(val, indent + 1) + ",") lines.append(level + "}") return "\n".join(lines) if isinstance(value, (bytes, bytearray)): length = len(value) suffix = "" if truncate_after and length > truncate_after: suffix = "..." value = value[: truncate_to or 0] if mostly_printable(value): output = repr(value) else: output = "0x" + value.hex() return "{} bytes {}{}".format(length, output, suffix) return repr(value) return "{name} ({size} bytes) {content}".format( name=pb.__class__.__name__, size=pb.ByteSize(), content=pformat_value(pb.__dict__, indent), ) def value_to_proto(ftype, value): if issubclass(ftype, MessageType): raise TypeError("value_to_proto only converts simple values") if ftype in (UVarintType, SVarintType): return int(value) if ftype is BoolType: return bool(value) if ftype is UnicodeType: return str(value) if ftype is BytesType: if isinstance(value, str): return bytes.fromhex(value) elif isinstance(value, bytes): return value else: raise TypeError("can't convert {} value to bytes".format(type(value))) def dict_to_proto(message_type, d): params = {} for fname, ftype, fflags in message_type.get_fields().values(): repeated = fflags & FLAG_REPEATED value = d.get(fname) if value is None: continue if not repeated: value = [value] if issubclass(ftype, MessageType): function = dict_to_proto else: function = value_to_proto newvalue = [function(ftype, v) for v in value] if not repeated: newvalue = newvalue[0] params[fname] = newvalue return message_type(**params) def to_dict(msg): res = {} for key, value in msg.__dict__.items(): if value is None or value == []: continue if isinstance(value, MessageType): value = to_dict(value) res[key] = value return res PK!ɝv!hwilib/devices/trezorlib/tools.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import functools import hashlib import re import struct import unicodedata from typing import List, NewType from .exceptions import TrezorFailure CallException = TrezorFailure HARDENED_FLAG = 1 << 31 Address = NewType("Address", List[int]) def H_(x: int) -> int: """ Shortcut function that "hardens" a number in a BIP44 path. """ return x | HARDENED_FLAG def btc_hash(data): """ Double-SHA256 hash as used in BTC """ return hashlib.sha256(hashlib.sha256(data).digest()).digest() def hash_160(public_key): md = hashlib.new("ripemd160") md.update(hashlib.sha256(public_key).digest()) return md.digest() def hash_160_to_bc_address(h160, address_type): vh160 = struct.pack("= __b58base: div, mod = divmod(long_value, __b58base) result = __b58chars[mod] + result long_value = div result = __b58chars[long_value] + result # Bitcoin does a little leading-zero-compression: # leading 0-bytes in the input become leading-1s nPad = 0 for c in v: if c == 0: nPad += 1 else: break return (__b58chars[0] * nPad) + result def b58decode(v, length=None): """ decode v into a string of len bytes.""" if isinstance(v, bytes): v = v.decode() for c in v: if c not in __b58chars: raise ValueError("invalid Base58 string") long_value = 0 for (i, c) in enumerate(v[::-1]): long_value += __b58chars.find(c) * (__b58base ** i) result = b"" while long_value >= 256: div, mod = divmod(long_value, 256) result = struct.pack("B", mod) + result long_value = div result = struct.pack("B", long_value) + result nPad = 0 for c in v: if c == __b58chars[0]: nPad += 1 else: break result = b"\x00" * nPad + result if length is not None and len(result) != length: return None return result def b58check_encode(v): checksum = btc_hash(v)[:4] return b58encode(v + checksum) def b58check_decode(v, length=None): dec = b58decode(v, length) data, checksum = dec[:-4], dec[-4:] if btc_hash(data)[:4] != checksum: raise ValueError("invalid checksum") return data def parse_path(nstr: str) -> Address: """ Convert BIP32 path string to list of uint32 integers with hardened flags. Several conventions are supported to set the hardened flag: -1, 1', 1h e.g.: "0/1h/1" -> [0, 0x80000001, 1] :param nstr: path string :return: list of integers """ if not nstr: return [] n = nstr.split("/") # m/a/b/c => a/b/c if n[0] == "m": n = n[1:] def str_to_harden(x: str) -> int: if x.startswith("-"): return H_(abs(int(x))) elif x.endswith(("h", "'")): return H_(int(x[:-1])) else: return int(x) try: return [str_to_harden(x) for x in n] except Exception: raise ValueError("Invalid BIP32 path", nstr) def normalize_nfc(txt): """ Normalize message to NFC and return bytes suitable for protobuf. This seems to be bitcoin-qt standard of doing things. """ if isinstance(txt, bytes): txt = txt.decode() return unicodedata.normalize("NFC", txt).encode() class expect: # Decorator checks if the method # returned one of expected protobuf messages # or raises an exception def __init__(self, expected, field=None): self.expected = expected self.field = field def __call__(self, f): @functools.wraps(f) def wrapped_f(*args, **kwargs): __tracebackhide__ = True # for pytest # pylint: disable=W0612 ret = f(*args, **kwargs) if not isinstance(ret, self.expected): raise RuntimeError( "Got %s, expected %s" % (ret.__class__, self.expected) ) if self.field is not None: return getattr(ret, self.field) else: return ret return wrapped_f def session(f): # Decorator wraps a BaseClient method # with session activation / deactivation @functools.wraps(f) def wrapped_f(client, *args, **kwargs): __tracebackhide__ = True # for pytest # pylint: disable=W0612 client.open() try: return f(client, *args, **kwargs) finally: client.close() return wrapped_f # de-camelcasifier # https://stackoverflow.com/a/1176023/222189 FIRST_CAP_RE = re.compile("(.)([A-Z][a-z]+)") ALL_CAP_RE = re.compile("([a-z0-9])([A-Z])") def from_camelcase(s): s = FIRST_CAP_RE.sub(r"\1_\2", s) return ALL_CAP_RE.sub(r"\1_\2", s).lower() def dict_from_camelcase(d, renames=None): if not isinstance(d, dict): return d if renames is None: renames = {} res = {} for key, value in d.items(): newkey = from_camelcase(key) renamed_key = renames.get(newkey) or renames.get(key) if renamed_key: newkey = renamed_key if isinstance(value, list): res[newkey] = [dict_from_camelcase(v, renames) for v in value] else: res[newkey] = dict_from_camelcase(value, renames) return res PK!qAA.hwilib/devices/trezorlib/transport/__init__.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import logging from typing import Iterable, List, Type from ..exceptions import TrezorException from ..protobuf import MessageType LOG = logging.getLogger(__name__) # USB vendor/product IDs for Trezors DEV_TREZOR1 = (0x534C, 0x0001) DEV_TREZOR2 = (0x1209, 0x53C1) DEV_TREZOR2_BL = (0x1209, 0x53C0) DEV_KEEPKEY = (0x2B24, 0x0001) DEV_KEEPKEY_WEBUSB = (0x2B24, 0x0002) TREZORS = {DEV_TREZOR1, DEV_TREZOR2, DEV_TREZOR2_BL, DEV_KEEPKEY, DEV_KEEPKEY_WEBUSB} UDEV_RULES_STR = """ Do you have udev rules installed? https://github.com/trezor/trezor-common/blob/master/udev/51-trezor.rules """.strip() class TransportException(TrezorException): pass class Transport: """Raw connection to a Trezor device. Transport subclass represents a kind of communication link: WebUSB or USB-HID connection, or UDP socket of listening emulator(s). It can also enumerate devices available over this communication link, and return them as instances. Transport instance is a thing that: - can be identified and requested by a string URI-like path - can open and close sessions, which enclose related operations - can read and write protobuf messages You need to implement a new Transport subclass if you invent a new way to connect a Trezor device to a computer. """ PATH_PREFIX = None # type: str ENABLED = False def __str__(self) -> str: return self.get_path() def get_path(self) -> str: raise NotImplementedError def begin_session(self) -> None: raise NotImplementedError def end_session(self) -> None: raise NotImplementedError def read(self) -> MessageType: raise NotImplementedError def write(self, message: MessageType) -> None: raise NotImplementedError @classmethod def enumerate(cls) -> Iterable["Transport"]: raise NotImplementedError @classmethod def find_by_path(cls, path: str, prefix_search: bool = False) -> "Transport": for device in cls.enumerate(): if ( path is None or device.get_path() == path or (prefix_search and device.get_path().startswith(path)) ): return device raise TransportException( "{} device not found: {}".format(cls.PATH_PREFIX, path) ) def all_transports() -> Iterable[Type[Transport]]: from .hid import HidTransport from .udp import UdpTransport from .webusb import WebUsbTransport return set( cls for cls in (HidTransport, UdpTransport, WebUsbTransport) if cls.ENABLED ) def enumerate_devices() -> Iterable[Transport]: devices = [] # type: List[Transport] for transport in all_transports(): name = transport.__name__ try: found = list(transport.enumerate()) LOG.info("Enumerating {}: found {} devices".format(name, len(found))) devices.extend(found) except NotImplementedError: LOG.error("{} does not implement device enumeration".format(name)) except Exception as e: excname = e.__class__.__name__ LOG.error("Failed to enumerate {}. {}: {}".format(name, excname, e)) return devices def get_transport(path: str = None, prefix_search: bool = False) -> Transport: if path is None: try: return next(iter(enumerate_devices())) except StopIteration: raise TransportException("No TREZOR device found") from None # Find whether B is prefix of A (transport name is part of the path) # or A is prefix of B (path is a prefix, or a name, of transport). # This naively expects that no two transports have a common prefix. def match_prefix(a: str, b: str) -> bool: return a.startswith(b) or b.startswith(a) LOG.info( "looking for device by {}: {}".format( "prefix" if prefix_search else "full path", path ) ) transports = [t for t in all_transports() if match_prefix(path, t.PATH_PREFIX)] if transports: return transports[0].find_by_path(path, prefix_search=prefix_search) raise TransportException("Could not find device by path: {}".format(path)) PK!,)hwilib/devices/trezorlib/transport/hid.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import logging import sys import time from typing import Any, Dict, Iterable from . import DEV_TREZOR1, DEV_KEEPKEY, UDEV_RULES_STR, TransportException from .protocol import ProtocolBasedTransport, ProtocolV1 LOG = logging.getLogger(__name__) try: import hid except Exception as e: LOG.info("HID transport is disabled: {}".format(e)) hid = None HidDevice = Dict[str, Any] HidDeviceHandle = Any class HidHandle: def __init__( self, path: bytes, serial: str, probe_hid_version: bool = False ) -> None: self.path = path self.serial = serial self.handle = None # type: HidDeviceHandle self.hid_version = None if probe_hid_version else 2 def open(self) -> None: self.handle = hid.device() try: self.handle.open_path(self.path) except (IOError, OSError) as e: if sys.platform.startswith("linux"): e.args = e.args + (UDEV_RULES_STR,) raise e # On some platforms, HID path stays the same over device reconnects. # That means that someone could unplug a Trezor, plug a different one # and we wouldn't even know. # So we check that the serial matches what we expect. serial = self.handle.get_serial_number_string() if serial != self.serial: self.handle.close() self.handle = None raise TransportException( "Unexpected device {} on path {}".format(serial, self.path.decode()) ) self.handle.set_nonblocking(True) if self.hid_version is None: self.hid_version = self.probe_hid_version() def close(self) -> None: if self.handle is not None: # reload serial, because device.wipe() can reset it self.serial = self.handle.get_serial_number_string() self.handle.close() self.handle = None def write_chunk(self, chunk: bytes) -> None: if len(chunk) != 64: raise TransportException("Unexpected chunk size: %d" % len(chunk)) if self.hid_version == 2: self.handle.write(b"\0" + bytearray(chunk)) else: self.handle.write(chunk) def read_chunk(self) -> bytes: while True: chunk = self.handle.read(64) if chunk: break else: time.sleep(0.001) if len(chunk) != 64: raise TransportException("Unexpected chunk size: %d" % len(chunk)) return bytes(chunk) def probe_hid_version(self) -> int: n = self.handle.write([0, 63] + [0xFF] * 63) if n == 65: return 2 n = self.handle.write([63] + [0xFF] * 63) if n == 64: return 1 raise TransportException("Unknown HID version") class HidTransport(ProtocolBasedTransport): """ HidTransport implements transport over USB HID interface. """ PATH_PREFIX = "hid" ENABLED = hid is not None def __init__(self, device: HidDevice) -> None: self.device = device self.handle = HidHandle(device["path"], device["serial_number"]) protocol = ProtocolV1(self.handle) super().__init__(protocol=protocol) def get_path(self) -> str: return "%s:%s" % (self.PATH_PREFIX, self.device["path"].decode()) @classmethod def enumerate(cls, debug: bool = False) -> Iterable["HidTransport"]: devices = [] for dev in hid.enumerate(0, 0): usb_id = (dev["vendor_id"], dev["product_id"]) if usb_id != DEV_TREZOR1 and usb_id != DEV_KEEPKEY: continue if debug: if not is_debuglink(dev): continue else: if not is_wirelink(dev): continue devices.append(HidTransport(dev)) return devices def find_debug(self) -> "HidTransport": if self.protocol.VERSION >= 2: # use the same device return self else: # For v1 protocol, find debug USB interface for the same serial number for debug in HidTransport.enumerate(debug=True): if debug.device["serial_number"] == self.device["serial_number"]: return debug raise TransportException("Debug HID device not found") def is_wirelink(dev: HidDevice) -> bool: return dev["usage_page"] == 0xFF00 or dev["interface_number"] == 0 def is_debuglink(dev: HidDevice) -> bool: return dev["usage_page"] == 0xFF01 or dev["interface_number"] == 1 PK!΁'.hwilib/devices/trezorlib/transport/protocol.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import logging import os import struct from io import BytesIO from typing import Tuple from typing_extensions import Protocol as StructuralType from . import Transport from .. import mapping, protobuf REPLEN = 64 V2_FIRST_CHUNK = 0x01 V2_NEXT_CHUNK = 0x02 V2_BEGIN_SESSION = 0x03 V2_END_SESSION = 0x04 LOG = logging.getLogger(__name__) class Handle(StructuralType): """PEP 544 structural type for Handle functionality. (called a "Protocol" in the proposed PEP, name which is impractical here) Handle is a "physical" layer for a protocol. It can open/close a connection and read/write bare data in 64-byte chunks. Functionally we gain nothing from making this an (abstract) base class for handle implementations, so this definition is for type hinting purposes only. You can, but don't have to, inherit from it. """ def open(self) -> None: ... def close(self) -> None: ... def read_chunk(self) -> bytes: ... def write_chunk(self, chunk: bytes) -> None: ... class Protocol: """Wire protocol that can communicate with a Trezor device, given a Handle. A Protocol implements the part of the Transport API that relates to communicating logical messages over a physical layer. It is a thing that can: - open and close sessions, - send and receive protobuf messages, given the ability to: - open and close physical connections, - and send and receive binary chunks. We declare a protocol version (we have implementations of v1 and v2). For now, the class also handles session counting and opening the underlying Handle. This will probably be removed in the future. We will need a new Protocol class if we change the way a Trezor device encapsulates its messages. """ VERSION = None # type: int def __init__(self, handle: Handle) -> None: self.handle = handle self.session_counter = 0 # XXX we might be able to remove this now that TrezorClient does session handling def begin_session(self) -> None: if self.session_counter == 0: self.handle.open() self.session_counter += 1 def end_session(self) -> None: if self.session_counter == 1: self.handle.close() self.session_counter -= 1 def read(self) -> protobuf.MessageType: raise NotImplementedError def write(self, message: protobuf.MessageType) -> None: raise NotImplementedError class ProtocolBasedTransport(Transport): """Transport that implements its communications through a Protocol. Intended as a base class for implementations that proxy their communication operations to a Protocol. """ def __init__(self, protocol: Protocol) -> None: self.protocol = protocol def write(self, message: protobuf.MessageType) -> None: self.protocol.write(message) def read(self) -> protobuf.MessageType: return self.protocol.read() def begin_session(self) -> None: self.protocol.begin_session() def end_session(self) -> None: self.protocol.end_session() class ProtocolV1(Protocol): """Protocol version 1. Currently (11/2018) in use on all Trezors. Does not understand sessions. """ VERSION = 1 def write(self, msg: protobuf.MessageType) -> None: LOG.debug( "sending message: {}".format(msg.__class__.__name__), extra={"protobuf": msg}, ) data = BytesIO() protobuf.dump_message(data, msg) ser = data.getvalue() header = struct.pack(">HL", mapping.get_type(msg), len(ser)) buffer = bytearray(b"##" + header + ser) while buffer: # Report ID, data padded to 63 bytes chunk = b"?" + buffer[: REPLEN - 1] chunk = chunk.ljust(REPLEN, b"\x00") self.handle.write_chunk(chunk) buffer = buffer[63:] def read(self) -> protobuf.MessageType: buffer = bytearray() # Read header with first part of message data msg_type, datalen, first_chunk = self.read_first() buffer.extend(first_chunk) # Read the rest of the message while len(buffer) < datalen: buffer.extend(self.read_next()) # Strip padding data = BytesIO(buffer[:datalen]) # Parse to protobuf msg = protobuf.load_message(data, mapping.get_class(msg_type)) LOG.debug( "received message: {}".format(msg.__class__.__name__), extra={"protobuf": msg}, ) return msg def read_first(self) -> Tuple[int, int, bytes]: chunk = self.handle.read_chunk() if chunk[:3] != b"?##": raise RuntimeError("Unexpected magic characters") try: headerlen = struct.calcsize(">HL") msg_type, datalen = struct.unpack(">HL", chunk[3 : 3 + headerlen]) except Exception: raise RuntimeError("Cannot parse header") data = chunk[3 + headerlen :] return msg_type, datalen, data def read_next(self) -> bytes: chunk = self.handle.read_chunk() if chunk[:1] != b"?": raise RuntimeError("Unexpected magic characters") return chunk[1:] def get_protocol(handle: Handle, want_v2: bool) -> Protocol: """Make a Protocol instance for the given handle. Each transport can have a preference for using a particular protocol version. This preference is overridable through `TREZOR_PROTOCOL_V1` environment variable, which forces the library to use V1 anyways. As of 11/2018, no devices support V2, so we enforce V1 here. It is still possible to set `TREZOR_PROTOCOL_V1=0` and thus enable V2 protocol for transports that ask for it (i.e., USB transports for Trezor T). """ return ProtocolV1(handle) PK!>^)hwilib/devices/trezorlib/transport/udp.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import socket from typing import Iterable, Optional, cast from . import TransportException from .protocol import ProtocolBasedTransport, get_protocol class UdpTransport(ProtocolBasedTransport): DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 21324 PATH_PREFIX = "udp" ENABLED = True def __init__(self, device: str = None) -> None: if not device: host = UdpTransport.DEFAULT_HOST port = UdpTransport.DEFAULT_PORT else: devparts = device.split(":") host = devparts[0] port = int(devparts[1]) if len(devparts) > 1 else UdpTransport.DEFAULT_PORT self.device = (host, port) self.socket = None # type: Optional[socket.socket] protocol = get_protocol(self, want_v2=False) super().__init__(protocol=protocol) def get_path(self) -> str: return "{}:{}:{}".format(self.PATH_PREFIX, *self.device) def find_debug(self) -> "UdpTransport": host, port = self.device return UdpTransport("{}:{}".format(host, port + 1)) @classmethod def _try_path(cls, path: str) -> "UdpTransport": d = cls(path) try: d.open() if d._ping(): return d else: raise TransportException( "No TREZOR device found at address {}".format(path) ) finally: d.close() @classmethod def enumerate(cls) -> Iterable["UdpTransport"]: default_path = "{}:{}".format(cls.DEFAULT_HOST, cls.DEFAULT_PORT) try: return [cls._try_path(default_path)] except TransportException: return [] @classmethod def find_by_path(cls, path: str, prefix_search: bool = False) -> "UdpTransport": if prefix_search: return cast(UdpTransport, super().find_by_path(path, prefix_search)) # This is *technically* type-able: mark `find_by_path` as returning # the same type from which `cls` comes from. # Mypy can't handle that though, so here we are. else: path = path.replace("{}:".format(cls.PATH_PREFIX), "") return cls._try_path(path) def open(self) -> None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.connect(self.device) self.socket.settimeout(10) def close(self) -> None: if self.socket is not None: self.socket.close() self.socket = None def _ping(self) -> bool: """Test if the device is listening.""" assert self.socket is not None resp = None try: self.socket.sendall(b"PINGPING") resp = self.socket.recv(8) except Exception: pass return resp == b"PONGPONG" def write_chunk(self, chunk: bytes) -> None: assert self.socket is not None if len(chunk) != 64: raise TransportException("Unexpected data length") self.socket.sendall(chunk) def read_chunk(self) -> bytes: assert self.socket is not None while True: try: chunk = self.socket.recv(64) break except socket.timeout: continue if len(chunk) != 64: raise TransportException("Unexpected chunk size: %d" % len(chunk)) return bytearray(chunk) PK!BB,hwilib/devices/trezorlib/transport/webusb.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import atexit import logging import sys import time from typing import Iterable, Optional from . import TREZORS, UDEV_RULES_STR, TransportException from .protocol import ProtocolBasedTransport, ProtocolV1 LOG = logging.getLogger(__name__) try: import usb1 except Exception as e: LOG.warning("WebUSB transport is disabled: {}".format(e)) usb1 = None INTERFACE = 0 ENDPOINT = 1 DEBUG_INTERFACE = 1 DEBUG_ENDPOINT = 2 class WebUsbHandle: def __init__(self, device: "usb1.USBDevice", debug: bool = False) -> None: self.device = device self.interface = DEBUG_INTERFACE if debug else INTERFACE self.endpoint = DEBUG_ENDPOINT if debug else ENDPOINT self.count = 0 self.handle = None # type: Optional[usb1.USBDeviceHandle] def open(self) -> None: self.handle = self.device.open() if self.handle is None: if sys.platform.startswith("linux"): args = (UDEV_RULES_STR,) else: args = () raise IOError("Cannot open device", *args) self.handle.claimInterface(self.interface) def close(self) -> None: if self.handle is not None: self.handle.releaseInterface(self.interface) self.handle.close() self.handle = None def write_chunk(self, chunk: bytes) -> None: assert self.handle is not None if len(chunk) != 64: raise TransportException("Unexpected chunk size: %d" % len(chunk)) self.handle.interruptWrite(self.endpoint, chunk) def read_chunk(self) -> bytes: assert self.handle is not None endpoint = 0x80 | self.endpoint while True: chunk = self.handle.interruptRead(endpoint, 64) if chunk: break else: time.sleep(0.001) if len(chunk) != 64: raise TransportException("Unexpected chunk size: %d" % len(chunk)) return chunk class WebUsbTransport(ProtocolBasedTransport): """ WebUsbTransport implements transport over WebUSB interface. """ PATH_PREFIX = "webusb" ENABLED = usb1 is not None context = None def __init__( self, device: str, handle: WebUsbHandle = None, debug: bool = False ) -> None: if handle is None: handle = WebUsbHandle(device, debug) self.device = device self.handle = handle self.debug = debug super().__init__(protocol=ProtocolV1(handle)) def get_path(self) -> str: return "%s:%s" % (self.PATH_PREFIX, dev_to_str(self.device)) @classmethod def enumerate(cls) -> Iterable["WebUsbTransport"]: if cls.context is None: cls.context = usb1.USBContext() cls.context.open() atexit.register(cls.context.close) devices = [] for dev in cls.context.getDeviceIterator(skip_on_error=True): usb_id = (dev.getVendorID(), dev.getProductID()) if usb_id not in TREZORS: continue if not is_vendor_class(dev): continue try: # workaround for issue #223: # on certain combinations of Windows USB drivers and libusb versions, # Trezor is returned twice (possibly because Windows know it as both # a HID and a WebUSB device), and one of the returned devices is # non-functional. dev.getProduct() devices.append(WebUsbTransport(dev)) except: pass return devices def find_debug(self) -> "WebUsbTransport": if self.protocol.VERSION >= 2: # TODO test this # XXX this is broken right now because sessions don't really work # For v2 protocol, use the same WebUSB interface with a different session return WebUsbTransport(self.device, self.handle) else: # For v1 protocol, find debug USB interface for the same serial number return WebUsbTransport(self.device, debug=True) def is_vendor_class(dev: "usb1.USBDevice") -> bool: configurationId = 0 altSettingId = 0 return ( dev[configurationId][INTERFACE][altSettingId].getClass() == usb1.libusb1.LIBUSB_CLASS_VENDOR_SPEC ) def dev_to_str(dev: "usb1.USBDevice") -> str: return ":".join( str(x) for x in ["%03i" % (dev.getBusNumber(),)] + dev.getPortNumberList() ) PK!gƥt t hwilib/devices/trezorlib/ui.py# This file is part of the Trezor project. # # Copyright (C) 2012-2018 SatoshiLabs and contributors # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 # as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the License along with this library. # If not, see . import os import sys from mnemonic import Mnemonic from . import device from .exceptions import Cancelled from .messages import PinMatrixRequestType, WordRequestType PIN_MATRIX_DESCRIPTION = """ Use the numeric keypad to describe number positions. The layout is: 7 8 9 4 5 6 1 2 3 """.strip() RECOVERY_MATRIX_DESCRIPTION = """ Use the numeric keypad to describe positions. For the word list use only left and right keys. Use backspace to correct an entry. The keypad layout is: 7 8 9 7 | 9 4 5 6 4 | 6 1 2 3 1 | 3 """.strip() PIN_GENERIC = None PIN_CURRENT = PinMatrixRequestType.Current PIN_NEW = PinMatrixRequestType.NewFirst PIN_CONFIRM = PinMatrixRequestType.NewSecond def echo(msg): print(msg, file=sys.stderr) def prompt(msg, hide_input=False): if hide_input: import getpass return getpass.getpass(msg + ' :\n') else: return input(msg + ':\n') class PassphraseUI: def __init__(self, passphrase): self.passphrase = passphrase self.pinmatrix_shown = False self.prompt_shown = False self.always_prompt = False def button_request(self, code): if not self.prompt_shown: echo("Please confirm action on your Trezor device") if not self.always_prompt: self.prompt_shown = True def get_pin(self, code=None): raise NotImplementedError('get_pin is not needed') def get_passphrase(self): return self.passphrase def mnemonic_words(expand=False, language="english"): if expand: wordlist = Mnemonic(language).wordlist else: wordlist = set() def expand_word(word): if not expand: return word if word in wordlist: return word matches = [w for w in wordlist if w.startswith(word)] if len(matches) == 1: return word echo("Choose one of: " + ", ".join(matches)) raise KeyError(word) def get_word(type): assert type == WordRequestType.Plain while True: try: word = prompt("Enter one word of mnemonic") return expand_word(word) except KeyError: pass return get_word PK!W)hwilib/errors.py# Defines errors and error codes # Error codes NO_DEVICE_PATH = -1 NO_DEVICE_TYPE = -2 DEVICE_CONN_ERROR = -3 UNKNWON_DEVICE_TYPE = -4 INVALID_TX = -5 NO_PASSWORD = -6 BAD_ARGUMENT = -7 NOT_IMPLEMENTED = -8 UNAVAILABLE_ACTION = -9 DEVICE_ALREADY_INIT = -10 DEVICE_ALREADY_UNLOCKED = -11 DEVICE_NOT_READY = -12 UNKNOWN_ERROR = -13 ACTION_CANCELED = -14 DEVICE_BUSY = -15 # Exceptions class HWWError(Exception): def __init__(self, msg, code): Exception.__init__(self) self.code = code self.msg = msg def get_code(self): return self.code def get_msg(self): return self.msg def __str__(self): return self.msg class NoPasswordError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, NO_PASSWORD) class UnavailableActionError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, UNAVAILABLE_ACTION) class DeviceAlreadyInitError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, DEVICE_ALREADY_INIT) class DeviceNotReadyError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, DEVICE_NOT_READY) class DeviceAlreadyUnlockedError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, DEVICE_ALREADY_UNLOCKED) class UnknownDeviceError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, UNKNWON_DEVICE_TYPE) class NotImplementedError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, NOT_IMPLEMENTED) class PSBTSerializationError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, INVALID_TX) class BadArgumentError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, BAD_ARGUMENT) class DeviceFailureError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, UNKNOWN_ERROR) class ActionCanceledError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, ACTION_CANCELED) class DeviceConnectionError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, DEVICE_CONN_ERROR) class DeviceBusyError(HWWError): def __init__(self, msg): HWWError.__init__(self, msg, DEVICE_BUSY) PK!wE E hwilib/hwwclient.py# This is an abstract class that defines all of the methods that each Hardware # wallet subclass must implement. class HardwareWalletClient(object): # device is an HID device that has already been opened. def __init__(self, path, password): self.path = path self.password = password self.message_magic = b"\x18Bitcoin Signed Message:\n" self.is_testnet = False self.fingerprint = None # Get the master BIP 44 pubkey def get_master_xpub(self): return self.get_pubkey_at_path('m/44\'/0\'/0\'') # Must return a dict with the xpub # Retrieves the public key at the specified BIP 32 derivation path def get_pubkey_at_path(self, path): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Must return a hex string with the signed transaction # The tx must be in the combined unsigned transaction format def sign_tx(self, tx): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Must return a base64 encoded string with the signed message # The message can be any string. keypath is the bip 32 derivation path for the key to sign with def sign_message(self, message, keypath): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Setup a new device def setup_device(self, label='', passphrase=''): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Wipe this device def wipe_device(self): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Restore device from mnemonic or xprv def restore_device(self, label=''): raise NotImplementedError('The HardwareWalletClient base class does not implement this method') # Begin backup process def backup_device(self, label='', passphrase=''): raise NotImplementedError('The HardwareWalletClient base class does not implement this method') # Close the device def close(self): raise NotImplementedError('The HardwareWalletClient base class does not ' 'implement this method') # Prompt pin def prompt_pin(self): raise NotImplementedError('The HardwareWalletClient base class does not implement this method') # Send pin def send_pin(self): raise NotImplementedError('The HardwareWalletClient base class does not implement this method') PK!(1MhMhhwilib/serializations.py#!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin Object Python Serializations Modified from the test/test_framework/mininode.py file from the Bitcoin repository CTransaction,CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives for transactions only ser_*, deser_*: functions that handle serialization/deserialization """ from io import BytesIO, BufferedReader from codecs import encode from .errors import PSBTSerializationError import struct import binascii import hashlib import copy import base64 def sha256(s): return hashlib.new('sha256', s).digest() def ripemd160(s): return hashlib.new('ripemd160', s).digest() def hash256(s): return sha256(sha256(s)) def hash160(s): return ripemd160(sha256(s)) # Serialization/deserialization tools def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack(" 42: return (False, None, None) if self.scriptPubKey[0] != 0 and (self.scriptPubKey[0] < 81 or self.scriptPubKey[0] > 96): return (False, None, None) if self.scriptPubKey[1] + 2 == len(self.scriptPubKey): return (True, self.scriptPubKey[0] - 0x50 if self.scriptPubKey[0] else 0, self.scriptPubKey[2:]) return (False, None, None) def __repr__(self): return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \ % (self.nValue, self.nValue, binascii.hexlify(self.scriptPubKey)) class CScriptWitness(object): def __init__(self): # stack is a vector of strings self.stack = [] def __repr__(self): return "CScriptWitness(%s)" % \ (",".join([bytes_to_hex_str(x) for x in self.stack])) def is_null(self): if self.stack: return False return True class CTxInWitness(object): def __init__(self): self.scriptWitness = CScriptWitness() def deserialize(self, f): self.scriptWitness.stack = deser_string_vector(f) def serialize(self): return ser_string_vector(self.scriptWitness.stack) def __repr__(self): return repr(self.scriptWitness) def is_null(self): return self.scriptWitness.is_null() class CTxWitness(object): def __init__(self): self.vtxinwit = [] def deserialize(self, f): for i in range(len(self.vtxinwit)): self.vtxinwit[i].deserialize(f) def serialize(self): r = b"" # This is different than the usual vector serialization -- # we omit the length of the vector, which is required to be # the same length as the transaction's vin vector. for x in self.vtxinwit: r += x.serialize() return r def __repr__(self): return "CTxWitness(%s)" % \ (';'.join([repr(x) for x in self.vtxinwit])) def is_null(self): for x in self.vtxinwit: if not x.is_null(): return False return True class CTransaction(object): def __init__(self, tx=None): if tx is None: self.nVersion = 1 self.vin = [] self.vout = [] self.wit = CTxWitness() self.nLockTime = 0 self.sha256 = None self.hash = None else: self.nVersion = tx.nVersion self.vin = copy.deepcopy(tx.vin) self.vout = copy.deepcopy(tx.vout) self.nLockTime = tx.nLockTime self.sha256 = tx.sha256 self.hash = tx.hash self.wit = copy.deepcopy(tx.wit) def deserialize(self, f): self.nVersion = struct.unpack(" 0: raise PSBTSerializationError("Duplicate key, input sighash type already provided") elif len(key) != 1: raise PSBTSerializationError("sighash key is more than one byte type") value = deser_string(f) self.sighash = struct.unpack(" 0: r += ser_string(b"\x03") r += ser_string(struct.pack(" 1: raise PSBTSerializationError("Global unsigned tx key is more than one byte type") # read in value value = BufferedReader(BytesIO(deser_string(f))) self.tx.deserialize(value) # Make sure that all scriptSigs and scriptWitnesses are empty for txin in self.tx.vin: if len(txin.scriptSig) != 0 or not self.tx.wit.is_null(): raise PSBTSerializationError("Unsigned tx does not have empty scriptSigs and scriptWitnesses") else: if key in self.unknown: raise PSBTSerializationError("Duplicate key, key for unknown value already provided") value = deser_string(f) self.unknown[key] = value # make sure that we got an unsigned tx if self.tx.is_null(): raise PSBTSerializationError("No unsigned trasaction was provided") # Read input data for txin in self.tx.vin: if f.tell() == end: break input = PartiallySignedInput() input.deserialize(f) self.inputs.append(input) if input.non_witness_utxo and input.non_witness_utxo.rehash() and input.non_witness_utxo.sha256 != txin.prevout.sha256: raise PSBTSerializationError("Non-witness UTXO does not match outpoint hash") if (len(self.inputs) != len(self.tx.vin)): raise PSBTSerializationError("Inputs provided does not match the number of inputs in transaction") # Read output data for txout in self.tx.vout: if f.tell() == end: break output = PartiallySignedOutput() output.deserialize(f) self.outputs.append(output) if len(self.outputs) != len(self.tx.vout): raise PSBTSerializationError("Outputs provided does not match the number of outputs in transaction") if not self.is_sane(): raise PSBTSerializationError("PSBT is not sane") def serialize(self): r = b"" # magic bytes r += b"psbt\xff" # unsigned tx flag r += b"\x01\x00" # write serialized tx tx = self.tx.serialize_with_witness() r += ser_compact_size(len(tx)) r += tx # separator r += b"\x00" # unknowns for key, value in sorted(self.unknown.items()): r += ser_string(key) r += ser_string(value) # inputs for input in self.inputs: r += input.serialize() # outputs for output in self.outputs: r += output.serialize() # return hex string return HexToBase64(binascii.hexlify(r)).decode() def is_sane(self): for input in self.inputs: if not input.is_sane(): return False return True PK!HDy+''$hwi-1.0.0.dist-info/entry_points.txtN+I/N.,()(ϴ$L<..PK!<,,hwi-1.0.0.dist-info/LICENSEMIT License Copyright (c) 2017 Andrew Chow Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUhwi-1.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H+XK.hwi-1.0.0.dist-info/METADATAW[S6~8}(Nv/)r!lζS[$*JrB:~pYB|wnY,%lD]OE4H:I'TEK{#]+MSoT1 *!it6eӔ9&:RK6r֖nP TaPSyKEWٱҰ+3ͧt0Vz/y Kj0sO%47qQDrfvlУj3J_fEb>frT!('̸?pj&"C|Zͺȿ̄qb%fN'J:Ci="O>~tC+3p'd %EZPh9cwO3DvVNO'.~١7u <&rCDG{-27OmKTFXg+u!7l|$,*117(z;f8.pPZ<~uyFle\k\@aT$JygL;3kڊ17XӂcY)\HNVܳpz,sfxIԳT= dcbԵ VyyʒYEG RPiW3"kf|rw )PL.~o?PK!H L1hwi-1.0.0.dist-info/RECORDZIHoa_s@,/ ƾ/b_?dUw*CYGx <0y^$Tt BxAu7Eb!>Ds*eU'O&}P*ϻԞ+MstL2~J4uLryٖo{W;b{EA{fe/II0/awZfn$⴯  2&JB Ǐg=Y׎qnG:pT[ӓɢIY8-0I ݄z&}*˜N%^yF"7Zut-W]?Kt 7G3r聝x@u A|<38oZ _:#nF@8Jn'x(S˞ɊFTgMC!Ea_R,m81%vJzH-[!vɼ d[}wuQY Eb]Sq¶0䰴k,ߚdxh3t5).8;_۱ wBqQtNd2)fűܓ nyν]ʝ^k!O؏6 2w=DQF=DL1PJd%IjixV:%ŊcJ.큇T~>@zDwɶ(蹷YpvXQ J(.&RGEStGK`0h!샅t ƿh7Vr`}iR^=_t<|FWx6CqEUq_ "&yx}86*DF'2:)|BORDjI9)=sWT|=rIe& Ggh0ŐUؾEFR Mf{[P^:=g2խ9jV &ʈcs֏>:9"uᗎ>zoO]OܼPH-u<ԪuPn ||զO}R1, +X\m_݂vEl8q36g\MO7ZOP8S07V%Rv > &MDxtEȜVF >ԢX33ݙq7JuU~LSZBZVеH8\g' ұsMyAZg& ҇Tp& PDEHk )Ԅ_T?,|T6> 079WZnVf7g~"Ʊm9yCGsK s9b#_rȧui3^8K#ߦxx.M ڦ+tbR9Fp:/b_3YkdIK])1 &axz>a_l߄񳕙E%nz~+4=;ŒrKDr;bg/? IG{|"4g 6OY߹ 7-Ȱ>^Q.oJb\H0AUve wqy 'qbZ&OlK DG#dj(a3m<-Yަv%9mW_ٶ0dlzXSƭن4E!)v13Hx"3G?h̦ɣRX1{ On\tzd$["ju B!r¸Tb:ܣ"hΊ ˡVꦿ#]4Cє^b 4SI]nw<&R|"쨌(s1 UkĪ<C͏}Ї앖8@5 )a 9~v^Sߵ~R!5AG7`;鉫}>.4bгIϹ)IV8nZ|7`lzvl!h?EB8rYf8??lhFpɱWe>:F#m164b(=Oz!)m,Y9}M(sS=AP)7gct+3`" h4o_{֞$bSo/r=i^o=i}]A)f|at^u`T3!& (=Η2b`,Om\+;_UN7͋籛dJ$xrw$֙^-B:vɩZنiahڷӪ:Q%ލu؏cwZZZhf:iJ|I2}H亴hN|!<]C虶C`@{Q4z1|{<Ty(q"]t#SɻPMQ*^$="Il!❮3JV>w,ѫ TGW!%]Z~2@aoUTh e,8{f"uV;i:numzFfuIolBЧT!a4tOvGNh1L3&J)/gDzPs1|ҵT]֔f~ꤾY1Vu< <y`ߧ^PEyn^Ǭ);+o~(}(q,tdАRBq=jVwna{iP!]$H+HV:-?n*Qr} Jn@bWb 1qUh_8f/}%9j.q~r6Star+w_ g C`Ok%x`ܾƋ*oGr-|[gf\%֟id2HșPEM!3;"}3g?6{.yFw ftP_05d}j 4g Opַd 4f!۬N0 ,Vx?D%XZb(Z~OK@ӈ3®fѮ9i?;7Ow?Ǎf. gh֏*h0At9yQKaoȾ@Gry `*CJ=E>@ {oU_dDFN,U)0,! K;;baϫӺj Aܰ"Mʊ]k3_pjЌb4Տק} ýΈc_fBP.,E/ѺQ%kջ|Ĵd0<6jeZ KrRd擄wKO³l5^JOƠ:OpoUyNGE؇t v{(A.AWk; }GK;qP?̾VuXaA B㷞x> 6s!(\)9 G~XY]IɪXP}D5T;~o`YƇ! C1飔 +uleƂ示bs)vBzQ-o[{: ~NhdiZ^*Bͳz\1teP3?NW싙1C g}D=v:T DM$M]M(%T<C= #=oޡwn~3|F2IIhE4Gk). h` <~e asiJ'ի3`o5}Ʈw0NnAscEv#sLC<\*ypX a0p\| VVboj* R܌l/h^7i zA*毧- ~}Ɣ oK եZq@HNNu"'G/?(gSs^0< ~tx~.97A*y KƃV>S1PJ.7h)_d֢ʢ~cJ|i,CiFG6د"ƘS{5XZ5PK!cchwi.pyPK!(frhwilib/__init__.pyPK!;QYL hwilib/base58.pyPK! K hwilib/bech32.pyPK!,, /hwilib/cli.pyPK! 3||Ihwilib/commands.pyPK! Hbhwilib/descriptor.pyPK!i!]]shwilib/devices/__init__.pyPK!@@!$thwilib/devices/btchip/__init__.pyPK!ba  +whwilib/devices/btchip/bitcoinTransaction.pyPK!+&hwilib/devices/btchip/bitcoinVarint.pyPK!I99#hwilib/devices/btchip/btchip.pyPK!55#Yhwilib/devices/btchip/btchipComm.pyPK!d9(hwilib/devices/btchip/btchipException.pyPK!.~  &hwilib/devices/btchip/btchipHelpers.pyPK!Tb`)X X $hwilib/devices/btchip/btchipUtils.pyPK!]4 &hwilib/devices/btchip/ledgerWrapper.pyPK!uJJdhwilib/devices/ckcc/__init__.pyPK!'B22hwilib/devices/ckcc/client.pyPK!NU  9hwilib/devices/ckcc/constants.pyPK!}ڻLChwilib/devices/ckcc/protocol.pyPK!D -[hwilib/devices/ckcc/sigheader.pyPK!8k  zbhwilib/devices/ckcc/utils.pyPK!7##mhwilib/devices/coldcard.pyPK!\ \ \hwilib/devices/digitalbitbox.pyPK!8Ñ;;Yhwilib/devices/keepkey.pyPK!!::hwilib/devices/ledger.pyPK!h}ݥ/D/D-hwilib/devices/trezor.pyPK! $Krhwilib/devices/trezorlib/__init__.pyPK!.DDshwilib/devices/trezorlib/btc.pyPK!3<"<""hwilib/devices/trezorlib/client.pyPK!RoA==%hwilib/devices/trezorlib/debuglink.pyPK!#"hwilib/devices/trezorlib/device.pyPK!1o&hwilib/devices/trezorlib/exceptions.pyPK!{L''$hwilib/devices/trezorlib/firmware.pyPK! Cnj3/hwilib/devices/trezorlib/log.pyPK!|[[#w6hwilib/devices/trezorlib/mapping.pyPK!wrr,>hwilib/devices/trezorlib/messages/Address.pyPK!G(waa/?hwilib/devices/trezorlib/messages/ApplyFlags.pyPK!N2}Ahwilib/devices/trezorlib/messages/ApplySettings.pyPK![1Ehwilib/devices/trezorlib/messages/BackupDevice.pyPK!ݣY.Fhwilib/devices/trezorlib/messages/ButtonAck.pyPK!2oGhwilib/devices/trezorlib/messages/ButtonRequest.pyPK!\h006}Ihwilib/devices/trezorlib/messages/ButtonRequestType.pyPK!3+Khwilib/devices/trezorlib/messages/Cancel.pyPK! aa.Khwilib/devices/trezorlib/messages/ChangePin.pyPK!}1|Mhwilib/devices/trezorlib/messages/ClearSession.pyPK!3556VNhwilib/devices/trezorlib/messages/DebugLinkDecision.pyPK! Uoo8Phwilib/devices/trezorlib/messages/DebugLinkFlashErase.pyPK!]~6Rhwilib/devices/trezorlib/messages/DebugLinkGetState.pyPK!J((1Shwilib/devices/trezorlib/messages/DebugLinkLog.pyPK!kk4Vhwilib/devices/trezorlib/messages/DebugLinkMemory.pyPK! 8Whwilib/devices/trezorlib/messages/DebugLinkMemoryRead.pyPK!lP::9Yhwilib/devices/trezorlib/messages/DebugLinkMemoryWrite.pyPK!ll3}\hwilib/devices/trezorlib/messages/DebugLinkState.pyPK!~xL2:chwilib/devices/trezorlib/messages/DebugLinkStop.pyPK!9drr,dhwilib/devices/trezorlib/messages/Entropy.pyPK!ii/ehwilib/devices/trezorlib/messages/EntropyAck.pyPK!&„3ghwilib/devices/trezorlib/messages/EntropyRequest.pyPK!;,ghhwilib/devices/trezorlib/messages/Failure.pyPK!>\0tjhwilib/devices/trezorlib/messages/FailureType.pyPK!Ug -khwilib/devices/trezorlib/messages/Features.pyPK!(/gg2zhwilib/devices/trezorlib/messages/FirmwareErase.pyPK!% 4{hwilib/devices/trezorlib/messages/FirmwareRequest.pyPK!}3}hwilib/devices/trezorlib/messages/FirmwareUpload.pyPK!cbY{{/hwilib/devices/trezorlib/messages/GetAddress.pyPK! K hh/ڄhwilib/devices/trezorlib/messages/GetEntropy.pyPK!Pb0hwilib/devices/trezorlib/messages/GetFeatures.pyPK!?==1ghwilib/devices/trezorlib/messages/GetPublicKey.pyPK!S_3hwilib/devices/trezorlib/messages/HDNodePathType.pyPK!/َhwilib/devices/trezorlib/messages/HDNodeType.pyPK!V//1hwilib/devices/trezorlib/messages/IdentityType.pyPK!˟/lhwilib/devices/trezorlib/messages/Initialize.pyPK!.܄4hwilib/devices/trezorlib/messages/InputScriptType.pyPK!/whwilib/devices/trezorlib/messages/LoadDevice.pyPK!5hwilib/devices/trezorlib/messages/MessageSignature.pyPK!;0 hwilib/devices/trezorlib/messages/MessageType.pyPK!n99=hwilib/devices/trezorlib/messages/MultisigRedeemScriptType.pyPK!7 {5hwilib/devices/trezorlib/messages/OutputScriptType.pyPK!-2hwilib/devices/trezorlib/messages/PassphraseAck.pyPK!79vv6«hwilib/devices/trezorlib/messages/PassphraseRequest.pyPK! JJ9hwilib/devices/trezorlib/messages/PassphraseSourceType.pyPK!N7-hwilib/devices/trezorlib/messages/PassphraseStateAck.pyPK!?jmm;hwilib/devices/trezorlib/messages/PassphraseStateRequest.pyPK!gg1ٰhwilib/devices/trezorlib/messages/PinMatrixAck.pyPK!&(cc5hwilib/devices/trezorlib/messages/PinMatrixRequest.pyPK!14.UU9Ehwilib/devices/trezorlib/messages/PinMatrixRequestType.pyPK!r)hwilib/devices/trezorlib/messages/Ping.pyPK!Fڀ.Nhwilib/devices/trezorlib/messages/PublicKey.pyPK!?MM3hwilib/devices/trezorlib/messages/RecoveryDevice.pyPK!׭2/LL7&hwilib/devices/trezorlib/messages/RecoveryDeviceType.pyPK!-rqq0hwilib/devices/trezorlib/messages/RequestType.pyPK!ZH||0hwilib/devices/trezorlib/messages/ResetDevice.pyPK!OKgg-Phwilib/devices/trezorlib/messages/SelfTest.pyPK!\ II1hwilib/devices/trezorlib/messages/SignIdentity.pyPK!婨0hwilib/devices/trezorlib/messages/SignMessage.pyPK!$*4pp+hwilib/devices/trezorlib/messages/SignTx.pyPK!̉UU3Ihwilib/devices/trezorlib/messages/SignedIdentity.pyPK!A-ee,hwilib/devices/trezorlib/messages/Success.pyPK!LT4hwilib/devices/trezorlib/messages/TransactionType.pyPK!*whwilib/devices/trezorlib/messages/TxAck.pyPK!-6#330Khwilib/devices/trezorlib/messages/TxInputType.pyPK!d4hwilib/devices/trezorlib/messages/TxOutputBinType.pyPK!baoS1hwilib/devices/trezorlib/messages/TxOutputType.pyPK! N.}hwilib/devices/trezorlib/messages/TxRequest.pyPK!:,/9hwilib/devices/trezorlib/messages/TxRequestDetailsType.pyPK! oo<hwilib/devices/trezorlib/messages/TxRequestSerializedType.pyPK!2T2hwilib/devices/trezorlib/messages/VerifyMessage.pyPK!L{wc/hwilib/devices/trezorlib/messages/WipeDevice.pyPK!7ff,hwilib/devices/trezorlib/messages/WordAck.pyPK!ļ^^0hwilib/devices/trezorlib/messages/WordRequest.pyPK!W;PP40hwilib/devices/trezorlib/messages/WordRequestType.pyPK!' -hwilib/devices/trezorlib/messages/__init__.pyPK!R..$hwilib/devices/trezorlib/protobuf.pyPK!ɝv!Bhwilib/devices/trezorlib/tools.pyPK!qAA.]hwilib/devices/trezorlib/transport/__init__.pyPK!,)rqhwilib/devices/trezorlib/transport/hid.pyPK!΁'.mhwilib/devices/trezorlib/transport/protocol.pyPK!>^)^hwilib/devices/trezorlib/transport/udp.pyPK!BB,hwilib/devices/trezorlib/transport/webusb.pyPK!gƥt t @hwilib/devices/trezorlib/ui.pyPK!W)hwilib/errors.pyPK!wE E hwilib/hwwclient.pyPK!(1MhMhdhwilib/serializations.pyPK!HDy+''$Lhwi-1.0.0.dist-info/entry_points.txtPK!<,,PMhwi-1.0.0.dist-info/LICENSEPK!HڽTUQhwi-1.0.0.dist-info/WHEELPK!H+XK.@Rhwi-1.0.0.dist-info/METADATAPK!H L1Xhwi-1.0.0.dist-info/RECORDPK=,o