PK! j5/__init__.py"""j5 Robotics API.""" from .base_robot import BaseRobot from .boards import BoardGroup __all__ = ["BoardGroup", "BaseRobot", "__version__", "__version_short__"] __version__ = "0.7.7" __version_short__ = "0.7.7" PK!wdQj5/backends/__init__.py"""Backend classes.""" from .backend import Backend, BackendMeta, CommunicationError, Environment __all__ = ["Backend", "BackendMeta", "CommunicationError", "Environment"] PK!Ƥcj5/backends/backend.py"""The base classes for backends.""" import inspect import logging from abc import ABCMeta, abstractmethod from functools import wraps from typing import TYPE_CHECKING, Dict, Optional, Set, Type if TYPE_CHECKING: # pragma: nocover from j5.boards import Board # noqa class CommunicationError(Exception): """ A communication error occurred. This error is thrown when there is an error communicating with a board, if a more specific exception is available, then that may be thrown instead, but it should inherit from this one. """ def _wrap_method_with_logging( backend_class: Type['Backend'], method_name: str, logger: logging.Logger, ) -> None: old_method = getattr(backend_class, method_name) signature = inspect.signature(old_method) @wraps(old_method) def new_method(*args, **kwargs): # type: ignore retval = old_method(*args, **kwargs) arg_map = signature.bind(*args, **kwargs).arguments args_str = ", ".join( f"{name}={value!r}" for name, value in arg_map.items() if name != "self" ) retval_str = (f" -> {retval!r}" if retval is not None else "") message = f"{method_name}({args_str}){retval_str}" logger.debug(message) return retval setattr(backend_class, method_name, new_method) def _wrap_methods_with_logging(backend_class: Type['Backend']) -> None: component_classes = backend_class.board.supported_components() # type: ignore for component_class in component_classes: logger = logging.getLogger(component_class.__module__) interface_class = component_class.interface_class() for method_name in interface_class.__abstractmethods__: _wrap_method_with_logging(backend_class, method_name, logger) class BackendMeta(ABCMeta): """ The metaclass for a backend. Responsible for registering the board-backend mapping with an Environment. """ def __new__(mcs, name, bases, namespace, **kwargs): # type:ignore """Create a new class object.""" cls = super().__new__(mcs, name, bases, namespace, **kwargs) # type: ignore if cls.__name__ == "Backend": return cls # Check if this is an abstract Backend. if len(cls.__bases__) == 1 and cls.__base__.__name__ != "Backend": return cls if hasattr(cls, "environment"): if cls.environment is not None and cls.board is not None: mcs._check_compatibility(cls) mcs._check_component_interfaces(cls) cls.environment.register_backend(cls.board, cls) _wrap_methods_with_logging(cls) return cls # The following line should never run, as _check_compatibility should fail first. raise RuntimeError( # pragma: nocover f"The {str(cls)} has no environment attribute", ) def _check_compatibility(cls): # type: ignore """Check that the backend and environment are compatible.""" if type(cls.environment) != Environment: raise ValueError("The environment must be of type Environment.") if cls.board in cls.environment.supported_boards: raise RuntimeError( "You cannot register multiple backends for the same board in the same Environment.") # noqa: E501 def _check_component_interfaces(cls): # type: ignore """ Check that the backend has the right interfaces. Certain interfaces are required to support components, and we want to make sure that the Backend implements them. This is a run-time type check. """ for component in cls.board.supported_components(): if not issubclass(cls, component.interface_class()): raise TypeError("The backend class doesn't have a required interface.") # noqa: E501 class Backend(metaclass=BackendMeta): """ The base class for a backend. A backend is an implementation of a specific board for an environment. It can hold data about the actual board it is controlling. There should be a ratio of one instance of a Backend to one instance of a Board. The Backend object should not hold any references to the Board, instead having it's methods executed by the code for the individual Board. A Backend usually also implements a number of ComponentInterfaces which thus allow a physical component to be controlled by the abstract Component representation. """ @classmethod @abstractmethod def discover(cls) -> Set['Board']: """Discover boards that this backend can control.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def environment(self) -> 'Environment': """Environment the backend belongs too.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def board(self) -> Type['Board']: """Type of board this backend implements.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" raise NotImplementedError # pragma: no cover class Environment: """ A collection of board implementations that can work together. Auto-populated with board mappings using metaclass magic. """ def __init__(self, name: str): self.name = name self.board_backend_mapping: Dict[Type['Board'], Type[Backend]] = {} @property def supported_boards(self) -> Set[Type['Board']]: """The boards that are supported by this backend group.""" return set(self.board_backend_mapping.keys()) def __str__(self) -> str: """Get a string representation of this group.""" return self.name def register_backend(self, board: Type['Board'], backend: Type[Backend]) -> None: """Register a new backend with this Backend Group.""" self.board_backend_mapping[board] = backend def get_backend(self, board: Type['Board']) -> Type[Backend]: """Get the backend for a board.""" if board not in self.supported_boards: raise NotImplementedError(f"The {str(self)} does not support {str(board)}") return self.board_backend_mapping[board] PK!OPDj5/backends/console/__init__.py"""Backends for the Console Environment.""" from .env import Console, ConsoleEnvironment __all__ = ["Console", "ConsoleEnvironment"] PK!gc??j5/backends/console/env.py"""The Console Environment.""" from typing import Callable, Optional, Type, TypeVar from j5.backends import Environment ConsoleEnvironment = Environment("ConsoleEnvironment") T = TypeVar("T") class Console: """A helper class for the console environment.""" def __init__( self, descriptor: str, print_function: Callable = print, # type: ignore input_function: Callable = input, # type: ignore ) -> None: self._descriptor = descriptor self._print = print_function self._input = input_function def info(self, message: str) -> None: """Print information to the user.""" self._print(f"{self._descriptor}: {message}") def read( # type: ignore self, prompt: str, return_type: Optional[Type[T]] = str, ) -> T: """Get a value of type 'return_type' from the user.""" if return_type is not None: while True: response = self._input(f"{self._descriptor}: {prompt}: ") try: # We have to ignore the types on this function unfortunately, # as static type checking is not powerful enough to confirm # that it is correct at runtime. return return_type(response) # type: ignore except ValueError: self.info(f"Unable to construct a {return_type.__name__}" f" from '{response}'") else: self._input(f"{self._descriptor}: {prompt}: ") PK!iJ;AA"j5/backends/console/sb/__init__.py"""Backends for SourceBots boards in the Console Environment.""" PK!J!j5/backends/console/sb/arduino.py"""Console Backend for the SourceBots Arduino.""" from datetime import timedelta from typing import Mapping, Optional, Set, Type from j5.backends import Backend from j5.backends.console import Console, ConsoleEnvironment from j5.boards import Board from j5.boards.sb import SBArduinoBoard from j5.components import GPIOPinInterface, GPIOPinMode, LEDInterface from j5.components.derived import UltrasoundInterface class PinData: """Contains data about a pin.""" mode: GPIOPinMode digital_state: bool def __init__(self, *, mode: GPIOPinMode, digital_state: bool): self.mode = mode self.digital_state = digital_state class SBArduinoConsoleBackend( GPIOPinInterface, LEDInterface, UltrasoundInterface, Backend, ): """Console Backend for the SourceBots Arduino.""" environment = ConsoleEnvironment board = SBArduinoBoard @classmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError("The Console Backend cannot discover boards.") def __init__(self, serial: str, console_class: Type[Console] = Console) -> None: self._serial = serial self._pins: Mapping[int, PinData] = { i: PinData(mode=GPIOPinMode.DIGITAL_OUTPUT, digital_state=False) for i in range(2, 20) # Digital 2 - 13 # Analogue 14 - 19 } # Setup console helper self._console = console_class(f"{self.board.__name__}({self._serial})") @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return None # Console, so no firmware def set_gpio_pin_mode(self, identifier: int, pin_mode: GPIOPinMode) -> None: """Set the hardware mode of a GPIO pin.""" self._console.info(f"Set pin {identifier} to {pin_mode.name}") self._pins[identifier].mode = pin_mode def get_gpio_pin_mode(self, identifier: int) -> GPIOPinMode: """Get the hardware mode of a GPIO pin.""" return self._pins[identifier].mode def write_gpio_pin_digital_state(self, identifier: int, state: bool) -> None: """Write to the digital state of a GPIO pin.""" if self._pins[identifier].mode is not GPIOPinMode.DIGITAL_OUTPUT: raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_OUTPUT" f"in order to set the digital state.") self._console.info(f"Set pin {identifier} state to {state}") self._pins[identifier].digital_state = state def get_gpio_pin_digital_state(self, identifier: int) -> bool: """Get the last written state of the GPIO pin.""" if self._pins[identifier].mode is not GPIOPinMode.DIGITAL_OUTPUT: raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_OUTPUT" f"in order to read the digital state.") return self._pins[identifier].digital_state def read_gpio_pin_digital_state(self, identifier: int) -> bool: """Read the digital state of the GPIO pin.""" if self._pins[identifier].mode not in [ GPIOPinMode.DIGITAL_INPUT_PULLUP, GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLDOWN, ]: raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_INPUT_*" f"in order to read the digital state.") return self._console.read(f"Pin {identifier} digital state [true/false]", bool) def read_gpio_pin_analogue_value(self, identifier: int) -> float: """Read the scaled analogue value of the GPIO pin.""" if self._pins[identifier].mode is not GPIOPinMode.ANALOGUE_INPUT: raise ValueError(f"Pin {identifier} mode needs to be ANALOGUE_INPUT", f"in order to read the digital state.") return self._console.read(f"Pin {identifier} ADC state [float]", float) def write_gpio_pin_dac_value(self, identifier: int, scaled_value: float) -> None: """Write a scaled analogue value to the DAC on the GPIO pin.""" # Uno doesn't have any of these. raise NotImplementedError def write_gpio_pin_pwm_value(self, identifier: int, duty_cycle: float) -> None: """Write a scaled analogue value to the PWM on the GPIO pin.""" # Not implemented on SBArduinoBoard yet. raise NotImplementedError def get_led_state(self, identifier: int) -> bool: """Get the state of an LED.""" if identifier != 0: raise ValueError("Arduino Uno only has LED 0 (digital pin 13).") return self.get_gpio_pin_digital_state(13) def set_led_state(self, identifier: int, state: bool) -> None: """Set the state of an LED.""" if identifier != 0: raise ValueError("Arduino Uno only has LED 0 (digital pin 13)") self.write_gpio_pin_digital_state(13, state) def get_ultrasound_pulse( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[timedelta]: """ Get a timedelta for the ultrasound time. Returns None if the sensor times out. """ microseconds = self._console.read( f"Response time for ultrasound sensor on pins " f"{trigger_pin_identifier}/{echo_pin_identifier} [microseconds]", float, ) self._update_ultrasound_pin_modes(trigger_pin_identifier, echo_pin_identifier) return timedelta(microseconds=microseconds) def get_ultrasound_distance( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[float]: """Get a distance in metres.""" metres = self._console.read( f"Distance for ultrasound sensor on pins " f"{trigger_pin_identifier}/{echo_pin_identifier} [metres]", float, ) self._update_ultrasound_pin_modes(trigger_pin_identifier, echo_pin_identifier) return metres def _update_ultrasound_pin_modes( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> None: # Ultrasound functions force the pins into particular modes. self._pins[trigger_pin_identifier].mode = GPIOPinMode.DIGITAL_OUTPUT self._pins[trigger_pin_identifier].digital_state = False self._pins[echo_pin_identifier].mode = GPIOPinMode.DIGITAL_INPUT PK!ۭVGG"j5/backends/console/sr/__init__.py"""Backends for Student Robotics boards in the console environment.""" PK!F%j5/backends/console/sr/v4/__init__.py"""Backends for Student Robotics version 4 boards in the console environment.""" from .motor_board import SRV4MotorBoardConsoleBackend # noqa: F401 from .power_board import SRV4PowerBoardConsoleBackend # noqa: F401 PK!(j5/backends/console/sr/v4/motor_board.py"""Console Backend for the SR v4 Motor Board.""" from typing import List, Optional, Set, Type from j5.backends import Backend from j5.backends.console import Console, ConsoleEnvironment from j5.boards import Board from j5.boards.sr.v4.motor_board import MotorBoard from j5.components.motor import MotorInterface, MotorSpecialState, MotorState class SRV4MotorBoardConsoleBackend( MotorInterface, Backend, ): """The console implementation of the SR v4 motor board.""" environment = ConsoleEnvironment board = MotorBoard @classmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError("The Console Backend cannot discover boards.") def __init__(self, serial: str, console_class: Type[Console] = Console) -> None: self._serial = serial # Initialise our stored values for the state. self._state: List[MotorState] = [ MotorSpecialState.BRAKE for _ in range(0, 2) ] # Setup console helper self._console = console_class(f"{self.board.__name__}({self._serial})") @property def serial(self) -> str: """The serial number reported by the board.""" return self._serial @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return None # Console, so no firmware def get_motor_state(self, identifier: int) -> MotorState: """Get the current motor state.""" # We are unable to read the state from the motor board, in hardware # so instead of asking, we'll get the last set value. return self._state[identifier] def set_motor_state(self, identifier: int, power: MotorState) -> None: """Set the state of a motor.""" if identifier not in range(0, 2): raise ValueError( f"Invalid motor identifier: {identifier}, valid values are: 0, 1", ) self._state[identifier] = power if isinstance(power, MotorSpecialState): power_human_name = power.name else: power_human_name = str(power) self._console.info(f"Setting motor {identifier} to {power_human_name}.") PK! %%(j5/backends/console/sr/v4/power_board.py"""Console Backend for the SR V4 power board.""" from datetime import timedelta from typing import Dict, Optional, Set, Type from j5.backends import Backend from j5.backends.console.env import Console, ConsoleEnvironment from j5.boards import Board from j5.boards.sr.v4.power_board import PowerBoard, PowerOutputPosition from j5.components import ( BatterySensorInterface, ButtonInterface, LEDInterface, PiezoInterface, PowerOutputInterface, ) class SRV4PowerBoardConsoleBackend( PowerOutputInterface, PiezoInterface, ButtonInterface, BatterySensorInterface, LEDInterface, Backend, ): """The console implementation of the SR V4 power board.""" environment = ConsoleEnvironment board = PowerBoard @classmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError("The Console Backend cannot discover boards.") def __init__(self, serial: str, console_class: Type[Console] = Console) -> None: self._serial = serial self._output_states: Dict[int, bool] = { output.value: False for output in PowerOutputPosition } self._led_states: Dict[int, bool] = { i: False for i in range(2) } # Setup console helper self._console = console_class(f"{self.board.__name__}({self._serial})") @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return None # Console, so no firmware @property def serial(self) -> str: """The serial number reported by the board.""" return self._serial def get_power_output_enabled(self, identifier: int) -> bool: """Get whether a power output is enabled.""" try: return self._output_states[identifier] except KeyError: raise ValueError(f"Invalid power output identifier {identifier!r}; " f"valid identifiers are " f"{self._output_states.keys()}") from None def set_power_output_enabled( self, identifier: int, enabled: bool, ) -> None: """Set whether a power output is enabled.""" self._console.info(f"Setting output {identifier} to {enabled}") if identifier not in self._output_states.keys(): raise ValueError(f"Invalid power output identifier {identifier!r}; " f"valid identifiers are " f"{self._output_states.keys()}") self._output_states[identifier] = enabled def get_power_output_current(self, identifier: int) -> float: """Get the current being drawn on a power output, in amperes.""" if identifier in self._output_states: return self._console.read( f"Current for power output {identifier} [amps]", float, ) else: raise ValueError(f"Invalid power output identifier {identifier!r}; " f"valid identifiers are " f"{self._output_states.keys()}") from None def buzz(self, identifier: int, duration: timedelta, pitch: float) -> None: """Queue a pitch to be played.""" if identifier != 0: raise ValueError(f"invalid piezo identifier {identifier!r}; " f"the only valid identifier is 0") duration_ms = round(duration / timedelta(milliseconds=1)) if duration_ms > 65535: raise ValueError("Maximum piezo duration is 65535ms.") self._console.info(f"Buzzing at {pitch}Hz for {duration_ms}ms") def get_button_state(self, identifier: int) -> bool: """Get the state of a button.""" if identifier != 0: raise ValueError(f"invalid button identifier {identifier!r}; " f"the only valid identifier is 0") return self._console.read("Start button state [true/false]", bool) def wait_until_button_pressed(self, identifier: int) -> None: """Halt the program until this button is pushed.""" self._console.info("Waiting for start button press.") self._console.read("Hit return to press start button", None) def get_battery_sensor_voltage(self, identifier: int) -> float: """Get the voltage of a battery sensor.""" if identifier != 0: raise ValueError(f"invalid battery sensor identifier {identifier!r}; " f"the only valid identifier is 0") return self._console.read("Battery voltage [volts]", float) def get_battery_sensor_current(self, identifier: int) -> float: """Get the current of a battery sensor.""" if identifier != 0: raise ValueError(f"invalid battery sensor identifier {identifier!r}; " f"the only valid identifier is 0") return self._console.read("Battery current [amps]", float) def get_led_state(self, identifier: int) -> bool: """Get the state of an LED.""" return self._led_states[identifier] def set_led_state(self, identifier: int, state: bool) -> None: """Set the state of an LED.""" if identifier in self._led_states.keys(): self._console.info(f"Set LED {identifier} to {state}") self._led_states[identifier] = state else: raise ValueError(f"invalid LED identifier {identifier!r}; valid identifiers " f"are 0 (run LED) and 1 (error LED)") from None PK!9u(j5/backends/console/sr/v4/servo_board.py"""Console Backend for the SR v4 Servo Board.""" from typing import List, Optional, Set, Type from j5.backends import Backend from j5.backends.console import Console, ConsoleEnvironment from j5.boards import Board from j5.boards.sr.v4.servo_board import ServoBoard from j5.components.servo import ServoInterface, ServoPosition class SRV4ServoBoardConsoleBackend( ServoInterface, Backend, ): """The console implementation of the SR v4 Servo board.""" environment = ConsoleEnvironment board = ServoBoard @classmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError("The Console Backend cannot discover boards.") def __init__(self, serial: str, console_class: Type[Console] = Console) -> None: self._serial = serial # Initialise our stored values for the positions. self._positions: List[ServoPosition] = [None for _ in range(0, 12)] # Setup console helper self._console = console_class(f"{self.board.__name__}({self._serial})") @property def serial(self) -> str: """The serial number reported by the board.""" return self._serial @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return None # Console, so no firmware def get_servo_position(self, identifier: int) -> ServoPosition: """Get the servo position.""" # We are unable to read the state from the servo board, in hardware # so instead of asking, we'll get the last set value. return self._positions[identifier] def set_servo_position(self, identifier: int, position: ServoPosition) -> None: """Set the servo position.""" if identifier not in range(0, 12): raise ValueError( f"Invalid servo identifier: {identifier}, valid values are: 0 - 11", ) self._positions[identifier] = position if position is None: position_human_name = "unpowered" else: position_human_name = str(position) self._console.info(f"Setting servo {identifier} to {position_human_name}.") PK!Zd66&j5/backends/console/zoloto/__init__.py"""Backends for the Zoloto computer vision system.""" PK!c *j5/backends/console/zoloto/camera_board.py"""Console backend for the Zoloto Virtual Board.""" from pathlib import Path from typing import Optional, Set, Type from j5.backends import Backend from j5.backends.console import Console, ConsoleEnvironment from j5.boards import Board from j5.boards.zoloto import ZolotoCameraBoard from j5.components import MarkerCameraInterface from j5.vision import Coordinate, Marker, MarkerList, Orientation class ZolotoCameraBoardConsoleBackend( MarkerCameraInterface, Backend, ): """Console Backend for Zoloto Camera.""" environment = ConsoleEnvironment board = ZolotoCameraBoard @classmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError("The Console Backend cannot discover boards.") def __init__(self, serial: str, console_class: Type[Console] = Console) -> None: self._serial = serial # Setup console helper self._console = console_class(f"{self.board.__name__}({self._serial})") @property def serial(self) -> str: """The serial number reported by the board.""" return self._serial @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return None # Console, so no firmware def get_visible_markers(self, identifier: int) -> MarkerList: """Get markers that are visible to the camera.""" amount = self._console.read( f"How many markers are visible to camera {identifier}?", int, ) markers = MarkerList() for i in range(0, amount): marker_id = self._console.read( f"What is the id of marker {i}?", int, ) marker_x = self._console.read( f"What is the x coordinate of marker {i}?", float, ) marker_y = self._console.read( f"What is the y coordinate of marker {i}?", float, ) marker_z = self._console.read( f"What is the z coordinate of marker {i}?", float, ) orient = self._console.read( f"Do you know the orientation of marker {i}?", bool, ) orientation = None if orient: orient_x = self._console.read( f"What is the x rotation of marker {i}?", float, ) orient_y = self._console.read( f"What is the y rotation of marker {i}?", float, ) orient_z = self._console.read( f"What is the z rotation of marker {i}?", float, ) orientation = Orientation.from_cartesian( orient_x, orient_y, orient_z, ) markers.append( Marker( marker_id, Coordinate(marker_x, marker_y, marker_z), orientation=orientation, ), ) return markers def save_annotated_image(self, file: Path) -> None: """Save an annotated image to a file.""" self._console.info(f"Saved annotated image to {file}") self._console.info( "Image not actually saved due to backend limitations.", ) PK!+> j5/backends/hardware/__init__.py"""Backends for the hardware environment.""" from .env import HardwareEnvironment, NotSupportedByHardwareError __all__ = [ "HardwareEnvironment", "NotSupportedByHardwareError", ] PK!Fj5/backends/hardware/env.py"""The hardware Environment.""" from j5.backends import Environment HardwareEnvironment = Environment("HardwareEnvironment") class NotSupportedByHardwareError(Exception): """The hardware does not support that functionality.""" PK!q>>#j5/backends/hardware/j5/__init__.py"""Abstract hardware backend implementions provided by j5.""" PK!SsJJ"j5/backends/hardware/j5/raw_usb.py""" Abstract hardware backend implemention provided by j5 for Raw USB communication. This has been written to reduce code duplication between backends for boards that communicate very similarly. It has been written such that it could potentially be distributed separately in the future, to remove the PyUSB dependency from the j5 core. """ from abc import abstractmethod from functools import wraps from typing import Callable, NamedTuple, Optional, Set, TypeVar, Union import usb from j5.backends import BackendMeta, CommunicationError, Environment from j5.boards import Board # Stop the library from closing the USB connections before make_safe is called. usb._objfinalizer._AutoFinalizedObjectBase._do_finalize_object = ( # type: ignore lambda x: None ) class ReadCommand(NamedTuple): """ Models a command to read information from the power board using USB controlRead. code identifies the command in accordance with the definitions in usb.h in the firmware source. data_len is the number of bytes that will be returned by the command. """ code: int data_len: int class WriteCommand(NamedTuple): """ Models a command to write information to the power board using USB controlWrite. code identifies the command in accordance with the definitions in usb.h in the firmware source. """ code: int class USBCommunicationError(CommunicationError): """An error occurred during USB communication.""" def __init__(self, usb_error: usb.core.USBError) -> None: message = usb_error.strerror if usb_error.errno == 110: # "operation timed out" message += "; are you sure the servo board is being correctly powered?" elif usb_error.errno == 32: # pipe error message += "; are you sending buzz commands to the power board too quickly?" super().__init__(message) RT = TypeVar('RT') def handle_usb_error(func: Callable[..., RT]) -> Callable[..., RT]: # type: ignore """ Wrap functions that use usb1 and give friendly errors. The exceptions from PyUSB are hard to find in documentation or code and are confusing to users. This decorator catches the USBErrors and throws a friendlier exception that can also be caught more easily. """ @wraps(func) def catch_exceptions(*args, **kwargs): # type: ignore try: return func(*args, **kwargs) except usb.core.USBError as e: raise USBCommunicationError(e) from e return catch_exceptions class RawUSBHardwareBackend(metaclass=BackendMeta): """An abstract class for creating backends that use Raw USB communication.""" _usb_device: usb.core.Device @classmethod @abstractmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def environment(self) -> 'Environment': """Environment the backend belongs too.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" raise NotImplementedError # pragma: no cover @property # type: ignore # https://github.com/python/mypy/issues/1362 @handle_usb_error def serial(self) -> str: """The serial number reported by the board.""" # https://github.com/python/mypy/issues/1362 return self._usb_device.serial_number @handle_usb_error def __del__(self) -> None: """Clean up device on destruction of object.""" usb.util.dispose_resources(self._usb_device) @handle_usb_error def _read(self, command: ReadCommand) -> bytes: return self._usb_device.ctrl_transfer( 0x80, 64, wValue=0, wIndex=command.code, data_or_wLength=command.data_len, ) @handle_usb_error def _write(self, command: WriteCommand, param: Union[int, bytes]) -> None: req_val: int = 0 req_data: bytes = b"" if isinstance(param, int): req_val = param else: req_data = param self._usb_device.ctrl_transfer( 0x00, 64, wValue=req_val, wIndex=command.code, data_or_wLength=req_data, ) PK!-!j5/backends/hardware/j5/serial.py"""Abstract hardware backend implementation provided by j5 for serial comms.""" from abc import abstractmethod from datetime import timedelta from functools import wraps from typing import TYPE_CHECKING, Callable, Optional, Set, Type, TypeVar from serial import Serial, SerialException, SerialTimeoutException from j5.backends import BackendMeta, CommunicationError, Environment from j5.boards import Board RT = TypeVar("RT") # pragma: nocover if TYPE_CHECKING: from typing_extensions import Protocol else: class Protocol: """Dummy class since typing_extensions is not available at runtime.""" pass def handle_serial_error(func: Callable[..., RT]) -> Callable[..., RT]: # type: ignore """ Wrap functions that use the serial port, and rethrow the errors. This is a decorator that should be used to wrap any functions that call the serial interface. It will catch and rethrow the errors as a CommunicationError, so that it is more explicit what is going wrong. """ @wraps(func) def catch_exceptions(*args, **kwargs): # type: ignore try: return func(*args, **kwargs) except SerialTimeoutException as e: raise CommunicationError(f"Serial Timeout Error: {e}") except SerialException as e: raise CommunicationError(f"Serial Error: {e}") return catch_exceptions class Seriallike(Protocol): """ Something that walks like a Serial and quacks like a Serial. This is used instead of hardcoding the Serial class to allow it to be mocked out. """ def __init__(self, port: Optional[str] = None, baudrate: int = 9600, bytesize: int = 8, parity: str = 'N', stopbits: float = 1, timeout: Optional[float] = None): ... def close(self) -> None: """Close the connection.""" ... def flush(self) -> None: """Flush all pending write operations.""" ... def readline(self) -> bytes: """Read a line from the serial port.""" ... def write(self, data: bytes) -> int: """Write data to the serial port.""" ... class SerialHardwareBackend(metaclass=BackendMeta): """An abstract class for creating backends that use USB serial communication.""" @handle_serial_error def __init__( self, serial_port: str, serial_class: Type[Seriallike] = Serial, baud: int = 115200, timeout: timedelta = timedelta(milliseconds=250), ) -> None: timeout_secs = timeout / timedelta(seconds=1) self._serial = serial_class( port=serial_port, baudrate=baud, timeout=timeout_secs, ) @classmethod @abstractmethod def discover(cls) -> Set[Board]: """Discover boards that this backend can control.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def environment(self) -> Environment: """Environment the backend belongs too.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" raise NotImplementedError # pragma: no cover @handle_serial_error def read_serial_line(self, empty: bool = False) -> str: """Read a line from the serial interface.""" bdata = self._serial.readline() if len(bdata) == 0: if empty: return "" raise CommunicationError( "No response from board. " "Is it correctly powered?", ) ldata = bdata.decode('utf-8') return ldata.rstrip() PK!h|BB#j5/backends/hardware/sb/__init__.py"""Backends for SourceBots boards in the Hardware Environment.""" PK!33"j5/backends/hardware/sb/arduino.py"""SourceBots Arduino Hardware Implementation.""" from datetime import timedelta from typing import Callable, List, Mapping, Optional, Set, Tuple, Type from serial import Serial from serial.tools.list_ports import comports from serial.tools.list_ports_common import ListPortInfo from j5.backends import CommunicationError from j5.backends.hardware.env import ( HardwareEnvironment, NotSupportedByHardwareError, ) from j5.backends.hardware.j5.serial import ( SerialHardwareBackend, handle_serial_error, ) from j5.boards import Board from j5.boards.sb.arduino import SBArduinoBoard from j5.components import GPIOPinInterface, GPIOPinMode, LEDInterface from j5.components.derived import UltrasoundInterface USB_IDS: Set[Tuple[int, int]] = { (0x2341, 0x0043), # Fake Uno (0x2a03, 0x0043), # Fake Uno (0x1a86, 0x7523), # Real Uno } FIRST_ANALOGUE_PIN = 14 def is_arduino_uno(port: ListPortInfo) -> bool: """Check if a ListPortInfo represents an Arduino Uno.""" return (port.vid, port.pid) in USB_IDS class DigitalPinData: """Contains data about a digital pin.""" mode: GPIOPinMode state: bool def __init__(self, *, mode: GPIOPinMode, state: bool): self.mode = mode self.state = state class SBArduinoHardwareBackend( LEDInterface, GPIOPinInterface, UltrasoundInterface, SerialHardwareBackend, ): """ Hardware Backend for the Arduino Uno. Currently only for the SourceBots Arduino Firmware. """ environment = HardwareEnvironment board = SBArduinoBoard @classmethod def discover( cls, comports: Callable = comports, serial_class: Type[Serial] = Serial, ) -> Set[Board]: """Discover all connected motor boards.""" # Find all serial ports. ports: List[ListPortInfo] = comports() # Get a list of boards from the ports. boards: Set[Board] = set() for port in filter(is_arduino_uno, ports): boards.add( SBArduinoBoard( port.serial_number, cls(port.device, serial_class), ), ) return boards @handle_serial_error def __init__(self, serial_port: str, serial_class: Type[Serial] = Serial) -> None: super(SBArduinoHardwareBackend, self).__init__( serial_port=serial_port, serial_class=serial_class, baud=115200, timeout=timedelta(milliseconds=1250), ) self._digital_pins: Mapping[int, DigitalPinData] = { i: DigitalPinData(mode=GPIOPinMode.DIGITAL_INPUT, state=False) for i in range(2, FIRST_ANALOGUE_PIN) } count = 0 line = self.read_serial_line(empty=True) while len(line) == 0: line = self.read_serial_line(empty=True) count += 1 if count > 25: raise CommunicationError(f"Arduino ({serial_port}) is not responding.") if line != "# Booted": raise CommunicationError("Arduino Boot Error.") self._version_line = self.read_serial_line() if self.firmware_version is not None: version_ids = tuple(map(int, self.firmware_version.split("."))) else: version_ids = (0, 0, 0) if version_ids < (2019, 6, 0): raise CommunicationError( f"Unexpected firmware version: {self.firmware_version},", f" expected at least: \"2019.6.0\".", ) for pin_number in self._digital_pins.keys(): self.set_gpio_pin_mode(pin_number, GPIOPinMode.DIGITAL_INPUT) @property def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" return self._version_line.split("v")[1] @handle_serial_error def _command(self, command: str, *params: str) -> List[str]: """Send a command to the board.""" message = " ".join([command] + list(params)) + "\n" self._serial.write(message.encode("utf-8")) results: List[str] = [] while True: line = self.read_serial_line(empty=False) code, param = line.split(None, 1) if code == "+": return results elif code == "-": raise CommunicationError(f"Arduino error: {param}") elif code == ">": results.append(param) elif code == "#": pass # Ignore comment lines else: raise CommunicationError( f"Arduino returned unrecognised response line: {line}", ) def _update_digital_pin(self, identifier: int) -> None: if identifier >= FIRST_ANALOGUE_PIN: raise RuntimeError("this should be unreachable") pin = self._digital_pins[identifier] char: str if pin.mode == GPIOPinMode.DIGITAL_INPUT: char = "Z" elif pin.mode == GPIOPinMode.DIGITAL_INPUT_PULLUP: char = "P" elif pin.mode == GPIOPinMode.DIGITAL_OUTPUT: if pin.state: char = "H" else: char = "L" else: raise RuntimeError("this should be unreachable") self._command("W", str(identifier), char) def set_gpio_pin_mode(self, identifier: int, pin_mode: GPIOPinMode) -> None: """Set the hardware mode of a GPIO pin.""" digital_pin_modes = ( GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLUP, GPIOPinMode.DIGITAL_OUTPUT, ) if identifier < FIRST_ANALOGUE_PIN: # Digital pin if pin_mode in digital_pin_modes: self._digital_pins[identifier].mode = pin_mode self._update_digital_pin(identifier) return else: # Analogue pin if pin_mode is GPIOPinMode.ANALOGUE_INPUT: return raise NotSupportedByHardwareError( f"Arduino Uno does not support mode {pin_mode} on pin {identifier}", ) def get_gpio_pin_mode(self, identifier: int) -> GPIOPinMode: """Get the hardware mode of a GPIO pin.""" if identifier < FIRST_ANALOGUE_PIN: return self._digital_pins[identifier].mode else: return GPIOPinMode.ANALOGUE_INPUT def write_gpio_pin_digital_state(self, identifier: int, state: bool) -> None: """Write to the digital state of a GPIO pin.""" if identifier >= FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Digital functions not supported on analogue pins", ) if self._digital_pins[identifier].mode is not GPIOPinMode.DIGITAL_OUTPUT: raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_OUTPUT" f"in order to set the digital state.") self._digital_pins[identifier].state = state self._update_digital_pin(identifier) def get_gpio_pin_digital_state(self, identifier: int) -> bool: """Get the last written state of the GPIO pin.""" if identifier >= FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Digital functions not supported on analogue pins", ) if self._digital_pins[identifier].mode is not GPIOPinMode.DIGITAL_OUTPUT: raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_OUTPUT" f"in order to read the digital state.") return self._digital_pins[identifier].state def read_gpio_pin_digital_state(self, identifier: int) -> bool: """Read the digital state of the GPIO pin.""" if identifier >= FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Digital functions not supported on analogue pins", ) if self._digital_pins[identifier].mode not in ( GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLUP, ): raise ValueError(f"Pin {identifier} mode needs to be DIGITAL_INPUT_*" f"in order to read the digital state.") results = self._command("R", str(identifier)) if len(results) != 1: raise CommunicationError(f"Invalid response from Arduino: {results!r}") result = results[0] if result == "H": return True elif result == "L": return False else: raise CommunicationError(f"Invalid response from Arduino: {result!r}") def read_gpio_pin_analogue_value(self, identifier: int) -> float: """Read the analogue voltage of the GPIO pin.""" if identifier < FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Analogue functions not supported on digital pins", ) if identifier >= FIRST_ANALOGUE_PIN + 4: raise NotSupportedByHardwareError( f"Arduino Uno firmware only supports analogue pins 0-3 (IDs 14-17)", ) analogue_pin_num = identifier - 14 results = self._command("A") for result in results: pin_name, reading = result.split(None, 1) if pin_name == f"a{analogue_pin_num}": voltage = (int(reading) / 1024.0) * 5.0 return voltage raise CommunicationError(f"Invalid response from Arduino: {results!r}") def write_gpio_pin_dac_value(self, identifier: int, scaled_value: float) -> None: """Write a scaled analogue value to the DAC on the GPIO pin.""" raise NotSupportedByHardwareError("SB Arduino does not have a DAC") def write_gpio_pin_pwm_value(self, identifier: int, duty_cycle: float) -> None: """Write a scaled analogue value to the PWM on the GPIO pin.""" raise NotSupportedByHardwareError( "SB Arduino firmware does not implement PWM output", ) def get_led_state(self, identifier: int) -> bool: """Get the state of an LED.""" if identifier != 0: raise ValueError("Arduino Uno only has LED 0 (digital pin 13).") return self.get_gpio_pin_digital_state(13) def set_led_state(self, identifier: int, state: bool) -> None: """Set the state of an LED.""" if identifier != 0: raise ValueError("Arduino Uno only has LED 0 (digital pin 13)") self.write_gpio_pin_digital_state(13, state) def get_ultrasound_pulse( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[timedelta]: """ Get a timedelta for the ultrasound time. Returns None if the sensor times out. """ self._check_ultrasound_pins(trigger_pin_identifier, echo_pin_identifier) results = self._command("T", str(trigger_pin_identifier), str(echo_pin_identifier)) self._update_ultrasound_pin_modes(trigger_pin_identifier, echo_pin_identifier) if len(results) != 1: raise CommunicationError(f"Invalid response from Arduino: {results!r}") result = results[0] microseconds = float(result) if microseconds == 0: # arduino pulseIn() returned 0 which indicates a timeout. return None else: return timedelta(microseconds=microseconds) def get_ultrasound_distance( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[float]: """Get a distance in metres.""" self._check_ultrasound_pins(trigger_pin_identifier, echo_pin_identifier) results = self._command("U", str(trigger_pin_identifier), str(echo_pin_identifier)) self._update_ultrasound_pin_modes(trigger_pin_identifier, echo_pin_identifier) if len(results) != 1: raise CommunicationError(f"Invalid response from Arduino: {results!r}") result = results[0] millimetres = float(result) if millimetres == 0: # arduino pulseIn() returned 0 which indicates a timeout. return None else: return millimetres / 1000.0 def _check_ultrasound_pins( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> None: if trigger_pin_identifier >= FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Ultrasound functions not supported on analogue pins", ) if echo_pin_identifier >= FIRST_ANALOGUE_PIN: raise NotSupportedByHardwareError( "Ultrasound functions not supported on analogue pins", ) def _update_ultrasound_pin_modes( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> None: # Ultrasound functions force the pins into particular modes. self._digital_pins[trigger_pin_identifier].mode = GPIOPinMode.DIGITAL_OUTPUT self._digital_pins[trigger_pin_identifier].state = False self._digital_pins[echo_pin_identifier].mode = GPIOPinMode.DIGITAL_INPUT PK!AHH#j5/backends/hardware/sr/__init__.py"""Backends for Student Robotics boards in the hardware environment.""" PK!9a jvv&j5/backends/hardware/sr/v4/__init__.py"""Backends for Student Robotics version 4 boards in the hardware environment.""" from .motor_board import SRV4MotorBoardHardwareBackend from .power_board import SRV4PowerBoardHardwareBackend from .servo_board import SRV4ServoBoardHardwareBackend __all__ = [ "SRV4MotorBoardHardwareBackend", "SRV4PowerBoardHardwareBackend", "SRV4ServoBoardHardwareBackend", ] PK!)K)j5/backends/hardware/sr/v4/motor_board.py"""Hardware Backend for the SR v4 motor board.""" from typing import Callable, List, Optional, Set, Type, cast from serial import Serial from serial.tools.list_ports import comports from serial.tools.list_ports_common import ListPortInfo from j5.backends import CommunicationError from j5.backends.hardware.env import HardwareEnvironment from j5.backends.hardware.j5.serial import ( SerialHardwareBackend, Seriallike, handle_serial_error, ) from j5.boards import Board from j5.boards.sr.v4.motor_board import MotorBoard from j5.components.motor import MotorInterface, MotorSpecialState, MotorState CMD_RESET = 0 CMD_VERSION = 1 CMD_MOTOR = [2, 3] CMD_BOOTLOADER = 4 SPEED_COAST = 1 SPEED_BRAKE = 2 def is_motor_board(port: ListPortInfo) -> bool: """Check if a ListPortInfo represents a MCV4B.""" return port.manufacturer == "Student Robotics" and port.product == "MCV4B" \ and port.vid == 0x0403 and port.pid == 0x6001 class SRV4MotorBoardHardwareBackend( MotorInterface, SerialHardwareBackend, ): """The hardware implementation of the SR v4 motor board.""" environment = HardwareEnvironment board = MotorBoard @classmethod def discover( cls, find: Callable = comports, serial_class: Type[Seriallike] = Serial, ) -> Set[Board]: """Discover all connected motor boards.""" # Find all serial ports. ports: List[ListPortInfo] = find() # Get a list of boards from the ports. boards: Set[Board] = set() for port in filter(is_motor_board, ports): boards.add( MotorBoard( port.serial_number, SRV4MotorBoardHardwareBackend(port.device, serial_class), ), ) return boards @handle_serial_error def __init__(self, serial_port: str, serial_class: Type[Seriallike] = Serial) -> None: super(SRV4MotorBoardHardwareBackend, self).__init__( serial_port=serial_port, serial_class=serial_class, baud=1000000, ) # Initialise our stored values for the state. self._state: List[MotorState] = [ MotorSpecialState.BRAKE for _ in range(0, 2) ] # Check we have the correct firmware version. version = self.firmware_version if version != "3": raise CommunicationError( f"Unexpected firmware version: {version}, expected: \"3\".", ) # Brake both of the motors for i, val in enumerate(self._state): self.set_motor_state(i, val) def __del__(self) -> None: """Clean up device on destruction of object.""" # Brake both of the motors for safety for i, val in enumerate(self._state): self.set_motor_state(i, MotorSpecialState.BRAKE) self._serial.flush() self._serial.close() @handle_serial_error def send_command(self, command: int, data: Optional[int] = None) -> None: """Send a serial command to the board.""" message: List[int] = [command] if data is not None: message += [data] bytes_written = self._serial.write(bytes(message)) if len(message) != bytes_written: raise CommunicationError( "Mismatch in command bytes written to serial interface.", ) @property def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" self.send_command(CMD_VERSION) firmware_data = self.read_serial_line() model = firmware_data[:5] if model != "MCV4B": raise CommunicationError( f"Unexpected model string: {model}, expected MCV4B.", ) return firmware_data[6:] # Strip model and return version def get_motor_state(self, identifier: int) -> MotorState: """Get the state of a motor.""" # We are unable to read the state from the motor board, # so we'll get the last set value. return self._state[identifier] def set_motor_state(self, identifier: int, power: MotorState) -> None: """Set the state of a motor.""" if identifier not in range(0, 2): raise ValueError( f"Invalid motor identifier: {identifier}, valid values are: 0, 1", ) if power == MotorSpecialState.BRAKE: value = SPEED_BRAKE elif power == MotorSpecialState.COAST: value = SPEED_COAST else: ipower = cast(float, power) if not -1 <= ipower <= 1: raise ValueError( "Only motor powers between -1 and 1 are supported.", ) # We are using a range of -125 to 125 power so that it is equal in both # forward and reverse directions. This is due to BRAKE and COAST being # magic numbers. value = round(ipower * 125) + 128 self._state[identifier] = power self.send_command(CMD_MOTOR[identifier], value) PK!yy)j5/backends/hardware/sr/v4/power_board.py"""Hardware Backend for the SR V4 power board.""" import struct from datetime import timedelta from time import sleep from typing import Callable, Dict, Mapping, Set, cast import usb from j5.backends.hardware.env import ( HardwareEnvironment, NotSupportedByHardwareError, ) from j5.backends.hardware.j5.raw_usb import ( RawUSBHardwareBackend, ReadCommand, WriteCommand, handle_usb_error, ) from j5.boards import Board from j5.boards.sr.v4.power_board import PowerBoard, PowerOutputPosition from j5.components import ( BatterySensorInterface, ButtonInterface, LEDInterface, PiezoInterface, PowerOutputInterface, ) # The names and codes of these commands match the definitions in usb.h in the firmware # source. CMD_READ_OUTPUT: Mapping[int, ReadCommand] = { output.value: ReadCommand(output.value, 4) for output in PowerOutputPosition } CMD_READ_5VRAIL = ReadCommand(6, 4) CMD_READ_BATTERY = ReadCommand(7, 8) CMD_READ_BUTTON = ReadCommand(8, 4) CMD_READ_FWVER = ReadCommand(9, 4) CMD_WRITE_OUTPUT: Mapping[int, WriteCommand] = { output.value: WriteCommand(output.value) for output in PowerOutputPosition } CMD_WRITE_RUNLED = WriteCommand(6) CMD_WRITE_ERRORLED = WriteCommand(7) CMD_WRITE_PIEZO = WriteCommand(8) class SRV4PowerBoardHardwareBackend( PowerOutputInterface, PiezoInterface, ButtonInterface, BatterySensorInterface, LEDInterface, RawUSBHardwareBackend, ): """The hardware implementation of the SR V4 power board.""" environment = HardwareEnvironment board = PowerBoard @classmethod @handle_usb_error def discover(cls, find: Callable = usb.core.find) -> Set[Board]: """Discover boards that this backend can control.""" boards: Set[Board] = set() device_list = find(idVendor=0x1bda, idProduct=0x0010, find_all=True) for device in device_list: backend = cls(device) board = PowerBoard(backend.serial, backend) boards.add(cast(Board, board)) return boards @handle_usb_error def __init__(self, usb_device: usb.core.Device) -> None: self._usb_device = usb_device self._output_states: Dict[int, bool] = { output.value: False for output in PowerOutputPosition } self._led_states: Dict[int, bool] = { i: False for i in range(2) } self.check_firmware_version_supported() def check_firmware_version_supported(self) -> None: """Raises an exception if the firmware version is not supported.""" v = self.firmware_version if v != "3": raise NotImplementedError(f"this power board is running firmware " f"version {v}, but only version 3 is supported") @property def firmware_version(self) -> str: """The firmware version reported by the board.""" version, = struct.unpack(" bool: """Get whether a power output is enabled.""" try: return self._output_states[identifier] except KeyError: raise ValueError(f"Invalid power output identifier {identifier!r}; " f"valid identifiers are {CMD_WRITE_OUTPUT.keys()}") from None def set_power_output_enabled( self, identifier: int, enabled: bool, ) -> None: """Set whether a power output is enabled.""" try: cmd = CMD_WRITE_OUTPUT[identifier] except KeyError: raise ValueError(f"Invalid power output identifier {identifier!r}; " f"valid identifiers are {CMD_WRITE_OUTPUT.keys()}") from None self._write(cmd, int(enabled)) self._output_states[identifier] = enabled def get_power_output_current(self, identifier: int) -> float: """Get the current being drawn on a power output, in amperes.""" try: cmd = CMD_READ_OUTPUT[identifier] except KeyError: raise ValueError(f"invalid power output identifier {identifier!r}; " f"valid identifiers are {CMD_READ_OUTPUT.keys()}") from None current, = struct.unpack(" None: """Queue a pitch to be played.""" if identifier != 0: raise ValueError(f"invalid piezo identifier {identifier!r}; " f"the only valid identifier is 0") duration_ms = round(duration / timedelta(milliseconds=1)) if duration_ms > 65535: raise NotSupportedByHardwareError("Maximum piezo duration is 65535ms.") frequency_int = int(round(frequency)) if frequency_int > 65535: raise NotSupportedByHardwareError("Maximum piezo frequency is 65535Hz.") data = struct.pack(" bool: """Get the state of a button.""" if identifier != 0: raise ValueError(f"invalid button identifier {identifier!r}; " f"the only valid identifier is 0") state, = struct.unpack(" None: """Halt the program until this button is pushed.""" while not self.get_button_state(identifier): sleep(0.05) def get_battery_sensor_voltage(self, identifier: int) -> float: """Get the voltage of a battery sensor.""" if identifier != 0: raise ValueError(f"invalid battery sensor identifier {identifier!r}; " f"the only valid identifier is 0") current, voltage = struct.unpack(" float: """Get the current of a battery sensor.""" if identifier != 0: raise ValueError(f"invalid battery sensor identifier {identifier!r}; " f"the only valid identifier is 0") current, voltage = struct.unpack(" bool: """Get the state of an LED.""" return self._led_states[identifier] def set_led_state(self, identifier: int, state: bool) -> None: """Set the state of an LED.""" cmds = {0: CMD_WRITE_RUNLED, 1: CMD_WRITE_ERRORLED} try: cmd = cmds[identifier] except KeyError: raise ValueError(f"invalid LED identifier {identifier!r}; valid identifiers " f"are 0 (run LED) and 1 (error LED)") from None self._write(cmd, int(state)) self._led_states[identifier] = state PK!dX X )j5/backends/hardware/sr/v4/servo_board.py"""Hardware Backend for the SR V4 Servo Board.""" import struct from typing import Callable, List, Set, cast import usb from j5.backends.hardware.env import ( HardwareEnvironment, NotSupportedByHardwareError, ) from j5.backends.hardware.j5.raw_usb import ( RawUSBHardwareBackend, ReadCommand, WriteCommand, handle_usb_error, ) from j5.boards import Board from j5.boards.sr.v4 import ServoBoard from j5.components.servo import ServoInterface, ServoPosition CMD_READ_FWVER = ReadCommand(9, 4) CMD_WRITE_SET_SERVO = [ WriteCommand(i) for i in range(0, 12) ] CMD_WRITE_INIT = WriteCommand(12) class SRV4ServoBoardHardwareBackend( ServoInterface, RawUSBHardwareBackend, ): """The hardware implementaton of the SR v4 Servo Board.""" board = ServoBoard environment = HardwareEnvironment @classmethod @handle_usb_error def discover(cls, find: Callable = usb.core.find) -> Set[Board]: """Discover boards that this backend can control.""" boards: Set[Board] = set() device_list = find(idVendor=0x1bda, idProduct=0x0011, find_all=True) for device in device_list: backend = cls(device) board = ServoBoard(backend.serial, backend) boards.add(cast(Board, board)) return boards @handle_usb_error def __init__(self, usb_device: usb.core.Device) -> None: self._usb_device = usb_device self.check_firmware_version_supported() self._positions: List[float] = [ 0.0 for _ in range(0, 12) ] # Initialise servos. self._usb_device.ctrl_transfer(0, 64, 0, 12, b"") for s in range(0, 12): self.set_servo_position(s, 0.0) @property def firmware_version(self) -> str: """The firmware version reported by the board.""" version, = struct.unpack(" None: """Raises an exception if the firmware version is not supported.""" v = self.firmware_version if v != "2": raise NotImplementedError(f"Servo Board ({self.serial}) is running firmware " f"version {v}, but only version 2 is supported") def get_servo_position(self, identifier: int) -> ServoPosition: """ Get the position of a servo. Currently reads back the last known position as we cannot read from the hardware. """ return self._positions[identifier] def set_servo_position(self, identifier: int, position: ServoPosition) -> None: """Set the position of a servo.""" if identifier not in range(0, 12): raise ValueError("Only integers 0 - 12 are valid servo identifiers.") if position is None: raise NotSupportedByHardwareError( f"{self.board.name} does not support unpowered servos.", ) self._positions[identifier] = position value = round(position * 100) self._write(CMD_WRITE_SET_SERVO[identifier], value) PK!¶'j5/backends/hardware/zoloto/__init__.py"""Hardware implementations of Zoloto.""" from .camera_board import ZolotoCameraBoardHardwareBackend __all__ = [ "ZolotoCameraBoardHardwareBackend", ] PK!`~ +j5/backends/hardware/zoloto/camera_board.py"""Hardware implementation of the Zoloto Virtual Camera Board.""" from pathlib import Path from platform import system from typing import Optional, Set, Type, TypeVar from zoloto import __version__ as zoloto_version from zoloto.cameras.camera import Camera from j5.backends import Backend from j5.backends.hardware import HardwareEnvironment from j5.boards import Board from j5.boards.zoloto import ZolotoCameraBoard from j5.components import MarkerCameraInterface from j5.vision import Coordinate, Marker, MarkerList, Orientation CAMERA_PATH = Path("/dev/video0") CAMERA_SERIAL = "video0" T = TypeVar("T", bound=Camera) class DefaultCamera(Camera): """ A default camera that doesn't do much. Mostly here to ensure sound types. """ def get_marker_size(self, marker_id: int) -> int: """Get the size of a particular marker.""" return 10 class ZolotoCameraBoardHardwareBackend( MarkerCameraInterface, Backend, ): """Hardware Backend for Zoloto Camera.""" environment = HardwareEnvironment board = ZolotoCameraBoard @classmethod def discover(cls, camera_class: Type[Camera] = DefaultCamera) -> Set[Board]: """Discover boards that this backend can control.""" if system() != "Linux": # We currently only support Zoloto on Linux platforms as there is # no easy way to detect the presence of webcams on other platforms. return set() if not CAMERA_PATH.exists(): # We currently only support a hardcoded path. return set() return { ZolotoCameraBoard("video0", cls(CAMERA_PATH, camera_class)), } def __init__(self, device_path: Path, camera_class: Type[T]) -> None: self._device_path = device_path self._zcamera = camera_class(0) @property def firmware_version(self) -> Optional[str]: """The firmware version reported by the board.""" return zoloto_version def get_visible_markers(self, identifier: int) -> MarkerList: """Get markers that are visible to the camera.""" markers = MarkerList() marker_gen = self._zcamera.process_frame() for zmarker in marker_gen: position = Coordinate( # Convert to metres zmarker.cartesian.x / 100, zmarker.cartesian.y / 100, zmarker.cartesian.z / 100, ) orientation = Orientation.from_cartesian( zmarker.orientation.rot_x, zmarker.orientation.rot_y, zmarker.orientation.rot_z, ) pixel_corners = list( map(lambda x: (x.x, x.y), zmarker.pixel_corners), ) pixel_centre = (zmarker.pixel_centre.x, zmarker.pixel_centre.y) markers.append( Marker( zmarker.id, position, pixel_centre=pixel_centre, pixel_corners=pixel_corners, orientation=orientation, ), ) return markers def save_annotated_image(self, file: Path) -> None: """Save an annotated image to file.""" self._zcamera.save_frame(file, annotate=True) PK!U+kj5/base_robot.py"""A base class for robots.""" import socket from j5.boards import Board class UnableToObtainLock(OSError): """Unable to obtain lock.""" pass class BaseRobot: """A base robot.""" def __new__(cls, *args, **kwargs) -> 'BaseRobot': # type: ignore """Create a new instance of the class.""" obj: BaseRobot = super().__new__(cls) obj._obtain_lock() return obj def make_safe(self) -> None: """Make this robot safe.""" Board.make_all_safe() def _obtain_lock(self, lock_port: int = 10653) -> None: """ Obtain a lock. This ensures that there can only be one instance of Robot at any time, which is a safety feature. """ if not hasattr(self, '_lock'): self._lock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self._lock.bind(('localhost', lock_port)) except OSError: raise UnableToObtainLock( "Unable to obtain lock. \ Are you trying to create more than one Robot object?", ) from None # We have no need to listen on the socket - we just bind to claim the address # and prevent another process using it. lock_details = self._lock.getsockname() if lock_details[1] != lock_port: raise OSError("Socket for lock is on the wrong port.") PK! j5/boards/__init__.py"""This module contains the boards that we support.""" from .board import Board, BoardGroup __all__ = ["Board", "BoardGroup"] PK!A]j5/boards/board.py"""The base classes for boards and group of boards.""" import atexit import logging import os import signal from abc import ABCMeta, abstractmethod from collections import OrderedDict from types import FrameType from typing import ( TYPE_CHECKING, Dict, Generic, Iterator, List, Optional, Set, Type, TypeVar, cast, ) from j5.backends import Backend, CommunicationError if TYPE_CHECKING: # pragma: nocover from j5.components import Component # noqa from typing import Callable, Union SignalHandler = Union[ Callable[[signal.Signals, FrameType], None], int, signal.Handlers, None, ] class Board(metaclass=ABCMeta): """A collection of hardware that has an implementation.""" # BOARDS is a set of currently instantiated boards. # This is useful to know so that we can make them safe in a crash. BOARDS: Set['Board'] = set() def __str__(self) -> str: """A string representation of this board.""" return f"{self.name} - {self.serial}" def __new__(cls, *args, **kwargs): # type: ignore """Ensure any instantiated board is added to the boards list.""" instance = super().__new__(cls) Board.BOARDS.add(instance) return instance def __repr__(self) -> str: """A representation of this board.""" return f"<{self.__class__.__name__} serial={self.serial}>" @property @abstractmethod def name(self) -> str: """A human friendly name for this board.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def serial(self) -> str: """The serial number of the board.""" raise NotImplementedError # pragma: no cover @property @abstractmethod def firmware_version(self) -> Optional[str]: """The firmware version of the board.""" raise NotImplementedError # pragma: no cover @abstractmethod def make_safe(self) -> None: """Make all components on this board safe.""" raise NotImplementedError # pragma: no cover @staticmethod @abstractmethod def supported_components() -> Set[Type['Component']]: """The types of component supported by this board.""" raise NotImplementedError # pragma: no cover @staticmethod def make_all_safe() -> None: """Make all boards safe.""" for board in Board.BOARDS: board.make_safe() @staticmethod def _make_all_safe_at_exit() -> None: # Register make_all_safe to be called upon normal program termination. atexit.register(Board.make_all_safe) # Register make_all_safe to be called when a termination signal is received. old_signal_handlers: Dict[signal.Signals, SignalHandler] = {} def new_signal_handler(signal_type: signal.Signals, frame: FrameType) -> None: logging.getLogger(__name__).error("program terminated prematurely") Board.make_all_safe() # Do what the signal originally would have done. signal.signal(signal_type, old_signal_handlers[signal_type]) os.kill(0, signal_type) # 0 = current process for signal_type in (signal.SIGHUP, signal.SIGINT, signal.SIGTERM): old_signal_handler = signal.signal(signal_type, new_signal_handler) old_signal_handlers[signal_type] = old_signal_handler Board._make_all_safe_at_exit() T = TypeVar('T', bound='Board') class BoardGroup(Generic[T]): """A group of boards that can be accessed.""" def __init__(self, backend_class: Type[Backend]): self._backend_class = backend_class self._boards: Dict[str, T] = OrderedDict() self.update_boards() def update_boards(self) -> None: """Update the boards in this group to see if new boards have been added.""" self._boards.clear() discovered_boards = self._backend_class.discover() for board in sorted(discovered_boards, key=lambda b: b.serial): self._boards.update({board.serial: cast(T, board)}) def singular(self) -> T: """If there is only a single board in the group, return that board.""" num = len(self) if num == 1: return list(self._boards.values())[0] else: name = self._backend_class.board.__name__ raise CommunicationError( f"expected exactly one {name} to be connected, but found {num}", ) def make_safe(self) -> None: """Make all of the boards safe.""" for board in self._boards.values(): board.make_safe() def __str__(self) -> str: """A string representation of the board group.""" list_str = ', '.join(map(str, self._boards.values())) return f"Group of Boards - [{list_str}]" def __repr__(self) -> str: """A representation of this board.""" return f"BoardGroup(backend_class={self._backend_class.__name__})" def __len__(self) -> int: """Get the number of boards in this group.""" return len(self._boards) def __contains__(self, serial: str) -> bool: """Check if a board is in this group.""" return serial in self._boards def __iter__(self) -> Iterator[T]: """ Iterate over the boards in the group. The boards are ordered lexiographically by serial number. """ return iter(self._boards.values()) def __getitem__(self, serial: str) -> T: """Get the board from serial.""" try: return self._boards[serial] except KeyError: if type(serial) != str: raise TypeError("Serial must be a string") raise KeyError(f"Could not find a board with the serial {serial}") @property def backend_class(self) -> Type[Backend]: """The Backend that this group uses for Boards.""" return self._backend_class @property def boards(self) -> List[T]: """Get an unordered list of boards in this group.""" return list(self._boards.values()) PK!Q"Kj5/boards/sb/__init__.py"""SourceBots Boards.""" from .arduino import AnaloguePin, SBArduinoBoard __all__ = [ 'AnaloguePin', 'SBArduinoBoard', ] PK!5wj5/boards/sb/arduino.py"""Classes for the SourceBots Arduino.""" from enum import IntEnum from typing import Mapping, Optional, Set, Tuple, Type, Union, cast from j5.backends import Backend from j5.boards import Board from j5.components import ( LED, Component, GPIOPin, GPIOPinInterface, GPIOPinMode, LEDInterface, ) from j5.components.derived import UltrasoundInterface, UltrasoundSensor class AnaloguePin(IntEnum): """Analogue Pins numbering.""" A0 = 14 A1 = 15 A2 = 16 A3 = 17 A4 = 18 A5 = 19 PinNumber = Union[int, AnaloguePin] class SBArduinoBoard(Board): """SourceBots Arduino Board.""" _led: LED _digital_pins: Mapping[int, GPIOPin] _analogue_pins: Mapping[AnaloguePin, GPIOPin] name: str = "Arduino Uno" def __init__(self, serial: str, backend: Backend): self._serial = serial self._backend = backend self._led = LED(0, cast(LEDInterface, self._backend)) # Digital Pins # Note that pins 0 and 1 are used for serial comms. self._digital_pins = { i: GPIOPin( i, cast(GPIOPinInterface, self._backend), initial_mode=GPIOPinMode.DIGITAL_INPUT, hardware_modes={ GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLUP, GPIOPinMode.DIGITAL_OUTPUT, }, firmware_modes={UltrasoundSensor}, ) for i in range(2, 14) } self._analogue_pins = { i: GPIOPin( i, cast(GPIOPinInterface, self._backend), initial_mode=GPIOPinMode.ANALOGUE_INPUT, hardware_modes={ GPIOPinMode.ANALOGUE_INPUT, GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLUP, GPIOPinMode.DIGITAL_OUTPUT, }, ) for i in AnaloguePin } self.ultrasound_sensors = UltrasoundSensors(self) @property def serial(self) -> str: """Get the serial number.""" return self._serial @property def firmware_version(self) -> Optional[str]: """Get the firmware version of the board.""" return self._backend.firmware_version @property def pins(self) -> Mapping[PinNumber, GPIOPin]: """Get the GPIO pins.""" pins: Mapping[PinNumber, GPIOPin] = { **cast(Mapping[PinNumber, GPIOPin], self._analogue_pins), **cast(Mapping[PinNumber, GPIOPin], self._digital_pins), } return pins def make_safe(self) -> None: """Make this board safe.""" pass @staticmethod def supported_components() -> Set[Type[Component]]: """List the types of components supported by this board.""" return { GPIOPin, LED, UltrasoundSensor, } class UltrasoundSensors: """ Helper class for constructing UltrasoundSensor objects on the fly. This exists so that arduino.ultrasound_sensors can be accessed using square bracket notation like a mapping, for consistency with how other types of component are accessed. """ def __init__(self, arduino: SBArduinoBoard): self._arduino = arduino def __getitem__(self, key: Tuple[PinNumber, PinNumber]) -> UltrasoundSensor: """Get an ultrasound sensor with the given pin configuration.""" trigger_pin, echo_pin = key return UltrasoundSensor( gpio_trigger=self._arduino.pins[trigger_pin], gpio_echo=self._arduino.pins[echo_pin], backend=cast(UltrasoundInterface, self._arduino._backend), ) PK!p''j5/boards/sr/__init__.py"""Boards made by Student Robotics.""" PK!c-;FFj5/boards/sr/v4/__init__.py"""Boards in the v4 series of Student Robotics boards.""" from .motor_board import MotorBoard from .power_board import PowerBoard, PowerOutputGroup, PowerOutputPosition from .servo_board import ServoBoard __all__ = [ 'MotorBoard', 'PowerBoard', 'PowerOutputGroup', 'PowerOutputPosition', 'ServoBoard', ] PK!Emwj5/boards/sr/v4/motor_board.py"""Classes for the SR v4 Motor Board.""" from typing import TYPE_CHECKING, List, Optional, Set, Type, cast from j5.backends import Backend from j5.boards import Board from j5.components.motor import Motor, MotorInterface, MotorSpecialState if TYPE_CHECKING: # pragma: no cover from j5.components import ( # noqa: F401 Component, ) class MotorBoard(Board): """Student Robotics v4 Motor Board.""" name: str = "Student Robotics v4 Motor Board" def __init__(self, serial: str, backend: Backend): self._serial = serial self._backend = backend self._outputs: List[Motor] = [ Motor(output, cast(MotorInterface, self._backend)) for output in range(0, 2) ] @property def serial(self) -> str: """Get the serial number.""" return self._serial @property def firmware_version(self) -> Optional[str]: """Get the firmware version of the board.""" return self._backend.firmware_version @property def motors(self) -> List[Motor]: """Get the motors on this board.""" return self._outputs def make_safe(self) -> None: """Make this board safe.""" for output in self._outputs: # Brake both motors. output.power = MotorSpecialState.BRAKE @staticmethod def supported_components() -> Set[Type['Component']]: """List the types of components supported by this board.""" return {Motor} PK!c j5/boards/sr/v4/power_board.py"""Classes for the SR v4 Power Board.""" from enum import Enum from time import sleep from typing import TYPE_CHECKING, Mapping, Optional, Set, cast from j5.backends import Backend from j5.boards import Board from j5.components import ( LED, BatterySensor, Button, Piezo, PowerOutput, PowerOutputGroup, ) if TYPE_CHECKING: # pragma: no cover from j5.components import ( # noqa: F401 Component, ButtonInterface, PowerOutputInterface, PiezoInterface, BatterySensorInterface, LEDInterface, ) from typing import Type # noqa: F401 class PowerOutputPosition(Enum): """ A mapping of name to number of the PowerBoard outputs. The numbers here are the same as used in wire communication with the PowerBoard. """ H0 = 0 H1 = 1 L0 = 2 L1 = 3 L2 = 4 L3 = 5 class PowerBoard(Board): """Student Robotics v4 Power Board.""" name: str = "Student Robotics v4 Power Board" def __init__(self, serial: str, backend: Backend): self._serial = serial self._backend = backend self._outputs: Mapping[PowerOutputPosition, PowerOutput] = { output: PowerOutput( output.value, cast("PowerOutputInterface", self._backend), ) for output in PowerOutputPosition # Note that in Python 3, Enums are ordered. } self._output_group = PowerOutputGroup(self._outputs) self._piezo = Piezo(0, cast("PiezoInterface", self._backend)) self._start_button = Button(0, cast("ButtonInterface", self._backend)) self._battery_sensor = BatterySensor( 0, cast("BatterySensorInterface", self._backend), ) self._run_led = LED(0, cast("LEDInterface", self._backend)) self._error_led = LED(1, cast("LEDInterface", self._backend)) @property def serial(self) -> str: """Get the serial number.""" return self._serial @property def firmware_version(self) -> Optional[str]: """Get the firmware version of the board.""" return self._backend.firmware_version @property def outputs(self) -> PowerOutputGroup: """Get the power outputs.""" return self._output_group @property def piezo(self) -> Piezo: """Get the piezo sounder.""" return self._piezo @property def start_button(self) -> Button: """Get the start button.""" return self._start_button @property def battery_sensor(self) -> BatterySensor: """Get the battery sensor.""" return self._battery_sensor def make_safe(self) -> None: """Make this board safe.""" self._output_group.power_off() def wait_for_start_flash(self) -> None: """ Wait for the start button to be pressed and flash. The LED will remain on once the start button has been pressed. """ counter = 0 led_state = False while not self.start_button.is_pressed: if counter % 6 == 0: led_state = not led_state self._run_led.state = led_state sleep(0.05) counter += 1 # Turn on the LED now the button has been pressed. self._run_led.state = True @staticmethod def supported_components() -> Set["Type[Component]"]: """List the types of components supported by this board.""" return {PowerOutput, Piezo, Button, BatterySensor, LED} PK!rj5/boards/sr/v4/servo_board.py"""Classes for the SR v4 Servo Board.""" from typing import TYPE_CHECKING, List, Optional, Set, Type, cast from j5.backends import Backend from j5.boards import Board from j5.components.servo import Servo, ServoInterface if TYPE_CHECKING: # pragma: no cover from j5.components import ( # noqa: F401 Component, ) class ServoBoard(Board): """Student Robotics v4 Servo Board.""" name: str = "Student Robotics v4 Servo Board" def __init__(self, serial: str, backend: Backend): self._serial = serial self._backend = backend self._servos: List[Servo] = [ Servo(servo, cast(ServoInterface, self._backend)) for servo in range(0, 12) ] @property def serial(self) -> str: """Get the serial number.""" return self._serial @property def firmware_version(self) -> Optional[str]: """Get the firmware version of the board.""" return self._backend.firmware_version def make_safe(self) -> None: """ Make this board safe. It is safest to leave the servos where they are, so do nothing. """ pass @staticmethod def supported_components() -> Set[Type['Component']]: """List the types of components supported by this board.""" return {Servo} @property def servos(self) -> List[Servo]: """Get the servos on this board.""" return self._servos PK!=j5/boards/zoloto/__init__.py""" Boards for the Zoloto CV system. These boards are not physical, they are virtual. """ from .camera_board import ZolotoCameraBoard __all__ = [ "ZolotoCameraBoard", ] PK!L7 j5/boards/zoloto/camera_board.py"""Virtual Camera Board for detecting Fiducial Markers.""" from typing import Optional, Set, Type, cast from j5.backends import Backend from j5.boards import Board from j5.components import Component, MarkerCamera, MarkerCameraInterface class ZolotoCameraBoard(Board): """Virtual Camera Board for detecting fiducial markers.""" name: str = "Zoloto Camera Board" def __init__(self, serial: str, backend: Backend): self._serial = serial self._backend = backend self._camera = MarkerCamera(0, cast(MarkerCameraInterface, backend)) @property def serial(self) -> str: """Get the serial number.""" return self._serial @property def firmware_version(self) -> Optional[str]: """Get the firmware version of the board.""" return self._backend.firmware_version @property def camera(self) -> MarkerCamera: """The camera on this board.""" return self._camera def make_safe(self) -> None: """Make this board safe.""" pass @staticmethod def supported_components() -> Set[Type[Component]]: """List the types of components supported by this board.""" return { MarkerCamera, } PK!0 j5/components/__init__.py"""This module contains components, which are the smallest logical element of hardware.""" from .battery_sensor import BatterySensor, BatterySensorInterface from .button import Button, ButtonInterface from .component import ( Component, DerivedComponent, Interface, NotSupportedByComponentError, ) from .gpio_pin import GPIOPin, GPIOPinInterface, GPIOPinMode from .led import LED, LEDInterface from .marker_camera import MarkerCamera, MarkerCameraInterface from .motor import Motor, MotorInterface, MotorSpecialState from .piezo import Piezo, PiezoInterface from .power_output import PowerOutput, PowerOutputGroup, PowerOutputInterface from .servo import Servo, ServoInterface __all__ = [ "BatterySensor", "BatterySensorInterface", "Button", "ButtonInterface", "Component", "DerivedComponent", "GPIOPin", "GPIOPinInterface", "GPIOPinMode", "Interface", "LED", "LEDInterface", "MarkerCamera", "MarkerCameraInterface", "Motor", "MotorInterface", "MotorSpecialState", "NotSupportedByComponentError", "Piezo", "PiezoInterface", "PowerOutput", "PowerOutputInterface", "PowerOutputGroup", "Servo", "ServoInterface", ] PK!<?TTj5/components/battery_sensor.py"""Classes for Battery Sensing Components.""" from abc import abstractmethod from typing import Type from j5.components.component import Component, Interface class BatterySensorInterface(Interface): """An interface containing the methods required to read data from a BatterySensor.""" @abstractmethod def get_battery_sensor_voltage(self, identifier: int) -> float: """Get the voltage of a battery sensor.""" raise NotImplementedError # pragma: no cover @abstractmethod def get_battery_sensor_current(self, identifier: int) -> float: """Get the current of a battery sensor.""" raise NotImplementedError # pragma: no cover class BatterySensor(Component): """A sensor capable of monitoring a battery.""" def __init__( self, identifier: int, backend: BatterySensorInterface, ) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[BatterySensorInterface]: """Get the interface class that is required to use this component.""" return BatterySensorInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def voltage(self) -> float: """Get the voltage of the battery sensor.""" return self._backend.get_battery_sensor_voltage(self._identifier) @property def current(self) -> float: """Get the current of the battery sensor.""" return self._backend.get_battery_sensor_current(self._identifier) PK!&j5/components/button.py"""Classes for Button.""" from abc import abstractmethod from typing import Type from j5.components.component import Component, Interface class ButtonInterface(Interface): """An interface containing the methods required for a button.""" @abstractmethod def get_button_state(self, identifier: int) -> bool: """Set the state of a button.""" raise NotImplementedError # pragma: no cover @abstractmethod def wait_until_button_pressed(self, identifier: int) -> None: """Halt the program until this button is pushed.""" raise NotImplementedError # pragma: no cover class Button(Component): """A button.""" def __init__(self, identifier: int, backend: ButtonInterface) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[ButtonInterface]: """Get the interface class that is required to use this component.""" return ButtonInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def is_pressed(self) -> bool: """Get the current pushed state of the button.""" return self._backend.get_button_state(self._identifier) def wait_until_pressed(self) -> None: """Halt the program until this button is pushed.""" self._backend.wait_until_button_pressed(self._identifier) PK!-0Oj5/components/component.py"""Base classes for components.""" from abc import ABCMeta, abstractmethod from typing import Type class Interface(metaclass=ABCMeta): """A base class for interfaces to inherit from.""" class Component(metaclass=ABCMeta): """A component is the smallest logical part of some hardware.""" @property @abstractmethod def identifier(self) -> int: """An integer to identify the component on a board.""" raise NotImplementedError # pragma: no cover @staticmethod @abstractmethod def interface_class() -> Type[Interface]: """Get the interface class that is required to use this component.""" raise NotImplementedError # pragma: no cover class DerivedComponent(Component): """ A derived component is a component that can take another component as a parameter. For example, a device may be attached to various pins on the board, and this could vary depending on what the user wants. We solve this by passing the pins to the derived component. >>> u = Ultrasound(pin_0, pin_1) """ @property def identifier(self) -> int: """An integer to identify the component on a board.""" raise NotSupportedByComponentError( "The identifier of a derived component is a ", "function of the components that it consists of", ) @staticmethod @abstractmethod def interface_class() -> Type[Interface]: """Get the interface class that is required to use this component.""" raise NotImplementedError # pragma: no cover class NotSupportedByComponentError(Exception): """This is thrown when hardware does not support the action that is attempted.""" pass PK!1O!j5/components/derived/__init__.py""" Derived components. A derived component is a component that can take another component as a parameter. For example, a device may be attached to various pins on the board, and this could vary depending on what the user wants. We solve this by passing the pins to the derived component. """ from .ultrasound import UltrasoundInterface, UltrasoundSensor __all__ = [ "UltrasoundInterface", "UltrasoundSensor", ] PK!yUdo o #j5/components/derived/ultrasound.py""" Ultrasonic distance sensor. A sensor that utilises the reflection of ultrasound to calculate the distance to a nearby object. """ from abc import abstractmethod from datetime import timedelta from typing import Optional, Type from j5.components import NotSupportedByComponentError from j5.components.component import DerivedComponent, Interface from j5.components.gpio_pin import GPIOPin class UltrasoundInterface(Interface): """An interface containing the methods required for an UltrasoundSensor.""" @abstractmethod def get_ultrasound_pulse( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[timedelta]: """ Get a timedelta for the ultrasound time. Returns None if the sensor times out. """ raise NotImplementedError # pragma: no cover @abstractmethod def get_ultrasound_distance( self, trigger_pin_identifier: int, echo_pin_identifier: int, ) -> Optional[float]: """Get a distance in metres.""" raise NotImplementedError # pragma: no cover class UltrasoundSensor(DerivedComponent): """ Ultrasonic distance sensor. A sensor that utilises the reflection of ultrasound to calculate the distance to a nearby object. """ def __init__( self, gpio_trigger: GPIOPin, gpio_echo: GPIOPin, backend: UltrasoundInterface, *, distance_mode: bool = True, ) -> None: if self.__class__ not in gpio_trigger.firmware_modes or \ self.__class__ not in gpio_echo.firmware_modes: raise NotSupportedByComponentError( f"Pins {gpio_trigger.identifier} and {gpio_echo.identifier}", f" must support Ultrasound.", ) self._gpio_trigger = gpio_trigger self._gpio_echo = gpio_echo self._backend = backend self._distance_mode = distance_mode @staticmethod def interface_class() -> Type[Interface]: """Get the interface class that is required to use this component.""" return UltrasoundInterface def pulse(self) -> Optional[timedelta]: """ Send a pulse and return the time taken. Returns None if timeout occurred. """ return self._backend.get_ultrasound_pulse( self._gpio_trigger.identifier, self._gpio_echo.identifier, ) def distance(self) -> Optional[float]: """ Send a pulse and return the distance to the object. Returns none if a timeout occurred. """ if not self._distance_mode: raise Exception("Distance mode is disabled. Use pulse() to get the time.") return self._backend.get_ultrasound_distance( self._gpio_trigger.identifier, self._gpio_echo.identifier, ) PK!I@66j5/components/gpio_pin.py"""Classes for GPIO Pins.""" from abc import abstractmethod from enum import IntEnum from typing import List, Set, Type, Union from j5.components.component import ( Component, DerivedComponent, Interface, NotSupportedByComponentError, ) class BadGPIOPinModeError(Exception): """The pin is not in the correct mode.""" pass class GPIOPinMode(IntEnum): """Hardware modes that a GPIO pin can be set to.""" DIGITAL_INPUT = 0 #: The digital state of the pin can be read DIGITAL_INPUT_PULLUP = 1 #: Same as DIGITAL_INPUT but internal pull-up is enabled DIGITAL_INPUT_PULLDOWN = 2 #: Same as DIGITAL_INPUT but internal pull-down is enabled DIGITAL_OUTPUT = 3 #: The digital state of the pin can be set. ANALOGUE_INPUT = 4 #: The analogue voltage of the pin can be read. ANALOGUE_OUTPUT = 5 #: The analogue voltage of the pin can be set using a DAC. PWM_OUTPUT = 6 #: A PWM output signal can be created on the pin. FirmwareMode = Type[DerivedComponent] PinMode = Union[FirmwareMode, GPIOPinMode] class GPIOPinInterface(Interface): """An interface containing the methods required for a GPIO Pin.""" @abstractmethod def set_gpio_pin_mode(self, identifier: int, pin_mode: GPIOPinMode, ) -> None: """Set the hardware mode of a GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def get_gpio_pin_mode(self, identifier: int) -> GPIOPinMode: """Get the hardware mode of a GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def write_gpio_pin_digital_state(self, identifier: int, state: bool, ) -> None: """Write to the digital state of a GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def get_gpio_pin_digital_state(self, identifier: int) -> bool: """Get the last written state of the GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def read_gpio_pin_digital_state(self, identifier: int) -> bool: """Read the digital state of the GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def read_gpio_pin_analogue_value(self, identifier: int) -> float: """Read the scaled analogue value of the GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def write_gpio_pin_dac_value( self, identifier: int, scaled_value: float, ) -> None: """Write a scaled analogue value to the DAC on the GPIO pin.""" raise NotImplementedError # pragma: nocover @abstractmethod def write_gpio_pin_pwm_value( self, identifier: int, duty_cycle: float, ) -> None: """Write a scaled analogue value to the PWM on the GPIO pin.""" raise NotImplementedError # pragma: nocover class GPIOPin(Component): """A GPIO Pin.""" def __init__( self, identifier: int, backend: GPIOPinInterface, *, initial_mode: PinMode, hardware_modes: Set[GPIOPinMode] = {GPIOPinMode.DIGITAL_OUTPUT}, firmware_modes: Set[FirmwareMode] = set(), ) -> None: self._backend = backend self._identifier = identifier self._supported_modes = hardware_modes self._firmware_modes = firmware_modes if len(hardware_modes) < 1: raise ValueError("A GPIO pin must support at least one hardware mode.") self.mode = initial_mode @staticmethod def interface_class() -> Type[GPIOPinInterface]: """Get the interface class that is required to use this component.""" return GPIOPinInterface def _require_pin_modes(self, pin_modes: List[PinMode]) -> None: """Ensure that this pin is in the specified hardware mode.""" if not any(self.mode == mode for mode in pin_modes) and not len(pin_modes) == 0: raise BadGPIOPinModeError( f"Pin {self._identifier} needs to be in one of {pin_modes}", ) @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def mode(self) -> PinMode: """Get the mode of this pin.""" return self._backend.get_gpio_pin_mode(self._identifier) @mode.setter def mode(self, pin_mode: PinMode) -> None: """Set the mode of this pin.""" if pin_mode not in self._supported_modes | self._firmware_modes: raise NotSupportedByComponentError( f"Pin {self._identifier} does not support {str(pin_mode)}.", ) if isinstance(pin_mode, GPIOPinMode): self._backend.set_gpio_pin_mode(self._identifier, pin_mode) @property def digital_state(self) -> bool: """Get the digital state of the pin.""" self._require_pin_modes([ GPIOPinMode.DIGITAL_OUTPUT, GPIOPinMode.DIGITAL_INPUT, GPIOPinMode.DIGITAL_INPUT_PULLUP, GPIOPinMode.DIGITAL_INPUT_PULLDOWN], ) # Behave differently depending on the hardware mode. if self.mode is GPIOPinMode.DIGITAL_OUTPUT: return self._backend.get_gpio_pin_digital_state(self._identifier) return self._backend.read_gpio_pin_digital_state(self._identifier) @digital_state.setter def digital_state(self, state: bool) -> None: """Set the digital state of the pin.""" self._require_pin_modes([GPIOPinMode.DIGITAL_OUTPUT]) self._backend.write_gpio_pin_digital_state(self._identifier, state) @property def analogue_value(self) -> float: """Get the scaled analogue reading of the pin.""" self._require_pin_modes([GPIOPinMode.ANALOGUE_INPUT]) return self._backend.read_gpio_pin_analogue_value(self._identifier) @analogue_value.setter def analogue_value(self, new_value: float) -> None: """Set the analogue value of the pin.""" self._require_pin_modes([ GPIOPinMode.ANALOGUE_OUTPUT, GPIOPinMode.PWM_OUTPUT, ]) if new_value < 0 or new_value > 1: raise ValueError("An analogue pin value must be between 0 and 1.") if self.mode is GPIOPinMode.ANALOGUE_OUTPUT: self._backend.write_gpio_pin_dac_value( self._identifier, new_value, ) else: # We must be a PWM_OUTPUT self._backend.write_gpio_pin_pwm_value( self._identifier, new_value, ) @property def firmware_modes(self) -> Set[FirmwareMode]: """Get the supported firmware modes.""" return self._firmware_modes @firmware_modes.setter def firmware_modes(self, modes: Set[FirmwareMode]) -> None: """Set the supported firmware modes.""" self.firmware_modes = modes PK!!j5/components/led.py"""Classes for the LED support.""" from abc import abstractmethod from typing import Type from j5.components.component import Component, Interface class LEDInterface(Interface): """An interface containing the methods required to control an LED.""" @abstractmethod def get_led_state(self, identifier: int) -> bool: """Get the state of an LED.""" raise NotImplementedError # pragma: no cover @abstractmethod def set_led_state(self, identifier: int, state: bool) -> None: """Set the state of an LED.""" raise NotImplementedError # pragma: no cover class LED(Component): """A standard Light Emitting Diode.""" def __init__(self, identifier: int, backend: LEDInterface) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[LEDInterface]: """Get the interface class that is required to use this component.""" return LEDInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def state(self) -> bool: """Get the current state of the LED.""" return self._backend.get_led_state(self._identifier) @state.setter def state(self, new_state: bool) -> None: """Set the state of the LED.""" self._backend.set_led_state(self._identifier, new_state) PK!}pj5/components/marker_camera.py"""Classes for Fiducial Marker Camera.""" from abc import abstractmethod from pathlib import Path from typing import Type, Union from j5.components.component import Component, Interface from j5.vision import MarkerList class MarkerCameraInterface(Interface): """An interface containing the methods required for a marker camera.""" @abstractmethod def get_visible_markers(self, identifier: int) -> MarkerList: """Get markers that the camera can see.""" raise NotImplementedError # pragma: nocover @abstractmethod def save_annotated_image(self, file: Path) -> None: """Save an annotated image to a file.""" raise NotImplementedError # pragma: nocover class MarkerCamera(Component): """ Camera that can identify fiducial markers. Additionally, it will do pose estimation, along with some calibration in order to determine the spatial positon and orientation of the markers that it has detected. This component can be used with systems such as ArUco, LibKoki, etc. """ def __init__( self, identifier: int, backend: MarkerCameraInterface, ) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[MarkerCameraInterface]: """Get the interface class that is required to use this component.""" return MarkerCameraInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier def see(self) -> MarkerList: """ Capture an image and identify fiducial markers. Returns a list of markers that it found. """ return self._backend.get_visible_markers(self._identifier) def save(self, path: Union[Path, str]) -> None: """Save an annotated image to a path.""" if isinstance(path, str): path = Path(path) self._backend.save_annotated_image(path) PK!!uuj5/components/motor.py"""Classes for Motor support.""" from abc import abstractmethod from enum import Enum from typing import Type, Union from j5.components.component import Component, Interface class MotorSpecialState(Enum): """An enum of the special states that a motor can be set to.""" COAST = 0 BRAKE = 1 MotorState = Union[float, MotorSpecialState] class MotorInterface(Interface): """An interface containing the methods required to control a motor board.""" @abstractmethod def get_motor_state(self, identifier: int) -> MotorState: """Get the motor state.""" raise NotImplementedError # pragma: no cover @abstractmethod def set_motor_state(self, identifier: int, power: MotorState) -> None: """Set the motor state.""" raise NotImplementedError # pragma: no cover class Motor(Component): """Brushed DC motor output.""" def __init__( self, identifier: int, backend: MotorInterface, ) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[Interface]: """Get the interface class that is required to use this component.""" return MotorInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def power(self) -> MotorState: """Get the current power of this output.""" return self._backend.get_motor_state(self._identifier) @power.setter def power(self, new_power: MotorState) -> None: """Set the current state of this output.""" if isinstance(new_power, float): if new_power < -1 or new_power > 1: raise ValueError("Motor power must be between 1 and -1.") self._backend.set_motor_state(self._identifier, new_power) PK!|{ { j5/components/piezo.py"""Classes for Piezo support.""" from abc import abstractmethod from datetime import timedelta from enum import Enum from typing import Generator, Type, Union from j5.components.component import Component, Interface class Note(float, Enum): """An enumeration of notes. An enumeration of notes from scientific pitch notation and their related frequencies in Hz. """ C6 = 1047.0 D6 = 1174.7 E6 = 1318.5 F6 = 1396.9 G6 = 1568.0 A6 = 1760.0 B6 = 1975.5 C7 = 2093.0 D7 = 2349.3 E7 = 2637.0 F7 = 2793.8 G7 = 3136.0 A7 = 3520.0 B7 = 3951.1 C8 = 4186.0 def __reverse__(self) -> Generator['Note', None, None]: # Type is ignored because of an open bug within mypy # https://github.com/python/typeshed/issues/1590 # https://github.com/python/typeshed/issues/1595 yield from reversed(self.__members__.items()) # type: ignore Pitch = Union[int, float, Note] class PiezoInterface(Interface): """An interface containing the methods required to control an piezo.""" @abstractmethod def buzz(self, identifier: int, duration: timedelta, frequency: float) -> None: """Queue a pitch to be played.""" raise NotImplementedError # pragma: no cover class Piezo(Component): """A standard piezo.""" def __init__(self, identifier: int, backend: PiezoInterface) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[PiezoInterface]: """Get the interface class that is required to use this component.""" return PiezoInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier def buzz(self, duration: Union[int, float, timedelta], pitch: Pitch) -> None: """ Queue a note to be played. Float and integer durations are measured in seconds. """ if isinstance(duration, float) or isinstance(duration, int): duration = timedelta(seconds=duration) if type(pitch) is int: pitch = float(pitch) self.verify_pitch(pitch) self.verify_duration(duration) self._backend.buzz(self._identifier, duration, pitch) @staticmethod def verify_pitch(pitch: Pitch) -> None: """Verify that a pitch is valid.""" # Verify that the type is correct. pitch_is_float = type(pitch) is float pitch_is_note = type(pitch) is Note if not (pitch_is_float or pitch_is_note): raise TypeError("Pitch must be float or Note") # Verify the length of the pitch is non-zero if pitch < 0: raise ValueError("Frequency must be greater than zero") @staticmethod def verify_duration(duration: timedelta) -> None: """Verify that a duration is valid.""" if not isinstance(duration, timedelta): raise TypeError("Duration must be of type datetime.timedelta") if duration < timedelta(seconds=0): raise ValueError("Duration must be greater than or equal to zero.") PK!A2g g j5/components/power_output.py"""Classes for supporting toggleable power output channels.""" from abc import abstractmethod from typing import Iterator, Mapping, Type, TypeVar from j5.components.component import Component, Interface class PowerOutputInterface(Interface): """An interface containing the methods required to control a power output channel.""" @abstractmethod def get_power_output_enabled(self, identifier: int) -> bool: """Get whether a power output is enabled.""" raise NotImplementedError # pragma: no cover @abstractmethod def set_power_output_enabled( self, identifier: int, enabled: bool, ) -> None: """Set whether a power output is enabled.""" raise NotImplementedError # pragma: no cover @abstractmethod def get_power_output_current(self, identifier: int) -> float: """Get the current being drawn on a power output, in amperes.""" raise NotImplementedError # pragma: no cover class PowerOutput(Component): """ A power output channel. It can be enabled/disabled, and the current being drawn on this channel can be measured. """ def __init__( self, identifier: int, backend: PowerOutputInterface, ) -> None: self._identifier = identifier self._backend = backend @staticmethod def interface_class() -> Type[PowerOutputInterface]: """Get the interface class that is required to use this component.""" return PowerOutputInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def is_enabled(self) -> bool: """Get whether the output is enabled.""" return self._backend.get_power_output_enabled(self._identifier) @is_enabled.setter def is_enabled(self, new_state: bool) -> None: """Set whether the output is enabled.""" self._backend.set_power_output_enabled(self._identifier, new_state) @property def current(self) -> float: """Get the current being drawn on this power output, in amperes.""" return self._backend.get_power_output_current(self._identifier) T = TypeVar('T') class PowerOutputGroup: """A group of PowerOutputs.""" def __init__(self, outputs: Mapping[T, PowerOutput]): self._outputs = outputs def power_on(self) -> None: """Enable all outputs in the group.""" for output in self._outputs.values(): output.is_enabled = True def power_off(self) -> None: """Disable all outputs in the group.""" for output in self._outputs.values(): output.is_enabled = False def __getitem__(self, index: T) -> PowerOutput: """Get an output using list notation.""" return self._outputs[index] def __iter__(self) -> Iterator[PowerOutput]: """ Iterate over the outputs in the group. The outputs are in no particular order. """ return iter(self._outputs.values()) def __len__(self) -> int: """Get the number of outputs in the group.""" return len(self._outputs) PK!3/j5/components/servo.py"""Classes for supporting Servomotors.""" from abc import abstractmethod from typing import Type, Union from j5.components.component import Component, Interface # A servo can be powered down by setting its position to None. ServoPosition = Union[float, None] class ServoInterface(Interface): """An interface containing the methods required to control a Servo.""" @abstractmethod def get_servo_position(self, identifier: int) -> ServoPosition: """Get the position of a Servo.""" raise NotImplementedError # pragma: no cover @abstractmethod def set_servo_position( self, identifier: int, position: ServoPosition, ) -> None: """Set the position of a Servo.""" raise NotImplementedError # pragma: no cover class Servo(Component): """A standard servomotor.""" def __init__(self, identifier: int, backend: ServoInterface) -> None: self._backend = backend self._identifier = identifier @staticmethod def interface_class() -> Type[ServoInterface]: """Get the interface class that is required to use this component.""" return ServoInterface @property def identifier(self) -> int: """An integer to identify the component on a board.""" return self._identifier @property def position(self) -> ServoPosition: """Get the current position of the Servo.""" return self._backend.get_servo_position(self._identifier) @position.setter def position(self, new_position: ServoPosition) -> None: """Set the position of the Servo.""" if new_position is not None: if not -1 <= new_position <= 1: raise ValueError self._backend.set_servo_position(self._identifier, new_position) PK!XX j5/py.typed# Marker file for PEP 561. j5 uses inline type hints. See PEP 484 for more information. PK!J<'j5/vision/__init__.py"""Vision Data Constructs.""" from .coordinates import Coordinate from .markers import Marker, MarkerList from .orientation import Orientation __all__ = [ "Coordinate", "Marker", "MarkerList", "Orientation", ] PK![aS S j5/vision/coordinates.py"""Coordinate Classes to represent position in space.""" from math import atan2, cos, isclose, sin, sqrt from typing import NamedTuple, Optional class Coordinate: """A position in space.""" _cart: 'Cartesian' _cyl: Optional['Cylindrical'] = None _sph: Optional['Spherical'] = None def __init__(self, x: float, y: float, z: float) -> None: self._cart = Cartesian(x, y, z) @classmethod def from_cartesian(cls, x: float, y: float, z: float) -> 'Coordinate': """Create a coordinate from a cartesian position.""" return cls(x, y, z) @classmethod def from_spherical(cls, r: float, theta: float, phi: float) -> 'Coordinate': """Create a coordinate from a spherical position.""" return cls( r * sin(phi) * cos(theta), r * sin(phi) * sin(theta), r * cos(phi), ) @classmethod def from_cylindrical(cls, p: float, theta: float, z: float) -> 'Coordinate': """Create a coordinate from a cylindrical position.""" return cls( p * cos(theta), p * sin(theta), z, ) @property def cartesian(self) -> 'Cartesian': """Cartesian representation.""" return self._cart @property def cylindrical(self) -> 'Cylindrical': """Cylindrical representation.""" if self._cyl is None: self._cyl = Cylindrical( p=sqrt(self._cart.x**2 + self._cart.y**2), phi=atan2(self._cart.y, self._cart.x), z=self._cart.z, ) return self._cyl @property def spherical(self) -> 'Spherical': """Spherical representation.""" if self._sph is None: self._sph = Spherical( sqrt(self._cart.x**2 + self._cart.y**2 + self._cart.z**2), atan2(self._cart.y, self._cart.x), atan2( sqrt(self._cart.x**2 + self._cart.y**2), self._cart.z, ), ) return self._sph def __repr__(self) -> str: return f"Coordinate(x={self._cart.x}, y={self._cart.y}, z={self._cart.z})" def isclose(self, other: 'Coordinate', tolerance: float = 1e-09) -> bool: """ Determine if close to another Coordinate. See math.isclose for more information on the behaviour. """ axes = [ isclose(self._cart.x, other._cart.x, rel_tol=tolerance), isclose(self._cart.y, other._cart.y, rel_tol=tolerance), isclose(self._cart.z, other._cart.z, rel_tol=tolerance), ] return all(axes) class Cartesian(NamedTuple): """ Position in the Cartesian 3D plane. x := Displacement from the origin in the x-axis y := Displacement from the origin in the y-axis z := Displacement from the origin in the z-axis """ x: float y: float z: float class Cylindrical(NamedTuple): """ Position in a cylindrical polar coordinate system. p := axial distance phi := azimuth angle (radians) z := height """ p: float phi: float z: float class Spherical(NamedTuple): """ A Spherical Coordinate. r := radial distance theta := azimuth angle (radians) phi := polar angle (radians) """ r: float theta: float phi: float PK!?ppj5/vision/markers.py"""Marker Class.""" from math import degrees from typing import List, Optional, Sequence, Tuple, overload from .coordinates import Coordinate from .orientation import Orientation PixelCoordinates = Tuple[float, float] class Marker: """ A fiducial marker. Specifically, this class represents a specific measurement of a marker, and its position, ID, and in the future, orientation in 3D space. """ _id: int _orientation: Optional[Orientation] _pixel_corners: Optional[Sequence[PixelCoordinates]] _pixel_centre: Optional[PixelCoordinates] _position: Coordinate def __init__(self, id: int, position: Coordinate, pixel_corners: Optional[Sequence[PixelCoordinates]] = None, pixel_centre: Optional[PixelCoordinates] = None, *, # We need to move this before pixel_corners. orientation: Optional[Orientation] = None, ): self._id = id self._orientation = orientation self._position = position self._pixel_corners = pixel_corners self._pixel_centre = pixel_centre @property def id(self) -> int: """The id of the marker.""" return self._id @property def bearing(self) -> float: """Bearing to the marker from the origin, in radians.""" return self.position.cylindrical.phi @property def distance(self) -> float: """Distance to the marker from the origin, in metres.""" return self.position.cylindrical.p @property def orientation(self) -> Optional[Orientation]: """Orientation of the Marker.""" return self._orientation @property def position(self) -> Coordinate: """Position of the marker.""" return self._position @property def pixel_corners(self) -> Optional[Sequence[PixelCoordinates]]: """ Pixel positions of the marker corners within the image. Specified in clockwise order, starting from the top left corner of the marker. Pixels are counted from the origin of the image, which conventionally is in the top left corner of the image. This is made available so that if users want to use their own pose estimation algorithms, they can! """ return self._pixel_corners @property def pixel_centre(self) -> Optional[PixelCoordinates]: """ Pixel positions of the centre of the marker within the image. Pixels are counted from the origin of the image, which conventionally is in the top left corner of the image. This is made available so that if users want to use their own pose estimation algorithms, they can! """ return self._pixel_centre def __str__(self) -> str: return "".format( self.id, abs(degrees(self.bearing)), "right" if self.bearing > 0 else "left", self.distance, ) class MarkerList(List[Marker]): """ A ``list`` class with nicer error messages. In particular, this class provides a slightly better error description when accessing indexes and the list is empty. This is to mitigate a common beginners issue where a list is indexed without checking that the list has any items. """ @overload def __getitem__(self, index: int) -> Marker: ... # pragma: nocover @overload # noqa: F811 (deliberate method replacement) def __getitem__(self, index: slice) -> List[Marker]: ... # pragma: nocover def __getitem__(self, index): # type:ignore # noqa: F811 try: return super().__getitem__(index) except IndexError: if not self: raise IndexError("Trying to index an empty list") from None else: raise PK!ffF$ $ j5/vision/orientation.py"""Orientation classes to represent rotations in space.""" from typing import List, Tuple from pyquaternion import Quaternion class Orientation: """ Represents an orientation in 3D space. Uses a unit quaternion as an internal representation. """ def __init__(self, orientation: Quaternion): self._quaternion = orientation @classmethod def from_cartesian(cls, x: float, y: float, z: float) -> 'Orientation': """Create a coordinate from a cartesian position.""" q = Quaternion(axis=(1, 0, 0), angle=x) q = q.rotate(Quaternion(axis=(0, 1, 0), angle=y)) q = q.rotate(Quaternion(axis=(0, 0, 1), angle=z)) return cls(q) @property def matrix(self) -> List[List[float]]: """Get a 3x3 rotation matrix representing the 3D rotation.""" return self._quaternion.rotation_matrix @property def yaw_pitch_roll(self) -> Tuple[float, float, float]: """ Get the equivalent yaw-pitch-roll angles in radians. Specifically, intrinsic Tait-Bryan angles following the z-y'-x'' convention. See pyquaternion for details. """ return self._quaternion.yaw_pitch_roll @property def yaw(self) -> float: """Rotation angle around the z-axis in radians.""" return self.yaw_pitch_roll[0] @property def pitch(self) -> float: """Rotation angle around the y'-axis in radians.""" return self.yaw_pitch_roll[1] @property def roll(self) -> float: """Rotation angle around the x''-axis in radians.""" return self.yaw_pitch_roll[2] @property def rot_x(self) -> float: """Returns the rotation around the x axis in radians.""" return self.roll @property def rot_y(self) -> float: """Returns the rotation around the y axis in radians.""" return self.pitch @property def rot_z(self) -> float: """Returns the rotation around the z axis in radians.""" return self.yaw @property def quaternion(self) -> Quaternion: """The quaternion representing the underlying rotation.""" return self._quaternion def __repr__(self) -> str: """ A string representation. Note that the actual parameters used to construct this are not used, because this is likely to confuse students. """ return f"Orientation(" \ f"x_radians={self.rot_x}, " \ f"y_radians={self.rot_y}, " \ f"z_radians={self.rot_z}" \ f")" PK!QW00j5-0.7.7.dist-info/LICENSEMIT License Copyright (c) 2019 j5 contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTUj5-0.7.7.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HAj5-0.7.7.dist-info/METADATAXmSH_1٭*,Z_8᫐P\>P)<Όh>=#g~z9I'{!cu|}% I~:oISҴCqTj8%jk\mIM7Rej|5n QI㴱RCaNWL9 (Cw x~;=9x{Q!U3ET躤ʉb8G'Ίz.wl;Xli2gum2O5TIkWjOv̯΍,Kv A%l#Qxqk򋋬I]zgn) tYK42W:k ښA e.T ^|sFş@}{ n(F:2މNA|8ϭ-% <=+^0jl, bAXv@{z>|tp|Nܜ2X]Uv+7o8%e'Us}zءpGW>fzYEܶݫ2iA׽U?IcYto:1Bpn gD Ⱏ,'?9L<9L%:<:;DOH? Fď1]IJ&PP~SOFÎN6ׁ"ywħ-4֧ʇq\cf1-mGj2,Rcت2\Qy8aQĨ|rCTmqoVH!V7&%;'ߋ+&0RQS=/ Qc Sm4/FK,̥ʢEd7rGHb+IUZ%px0%d^Ye:"baZ,iv؉"EF~$\HO` h+Xua%fKKђcªR/jkrHV_H"PM!lS׌.05t\-n|FTij}hx m,nH|I¯FJt,yҨ"ct}w"b,b x"!> t\&(I"/D2W)3NkόhftUTxEro?2qbػpojp(Œ2  F_GggT$WjWrUUTLM$I/ŁPKڂ,䔼L)D?ɉ5-wԕɲ.?~w)}'us[q3IJG(ɵx˃CV3_a7ʐwb9גg"y5(=q)YKY,κ:ILvI9 \wк]C\v"2WWwL0hfte>E@,9 }t3*(c7^gx0vg}Q>h=|śFBtw& Y8W|]%>2tJk=KtfYI:xIضz!ҹ*tUbR`f_PK!Hw j5-0.7.7.dist-info/RECORD}ɖY=*)=rkfg9a9vq'.Yv}Ӭ> "'X0}-EXan-``yE܉ *:cGů‧wd *̻9z+5wx1S!kvX.\M, . .u]( &%Xq~)J 4X% vAo}.X}MCU{s>Kb*|x,j]5;Nj$)%[wL$tf26.'wB>ϗdC>U;H# x"ld"Ey=o:׼j6|r<ebپk]:֡lW~)sfp[~KL,8BRΏ>6 WF!-qɯr֐}XDŰ ML9UTZ"5c=qطa['ˈEdY%3?Q{mSw`֟j#'<F0$0!a<ĥi3'tR3A)MGaušÚ%E$ ƐG`$#ϩbt^1 {{vsOfRGtk4Mz:viё*;B_ vVfCKNFKҪԲ|^${7c$.)}F4v ӟpMNRмOLtCͤn%͙f|HcݓZ#2BYѡk E?w 7i3/kom%zoSZ&OTc~װ4I"fy-tDʻnĽpnW34 $LE_Fm\9KXd1%~va0E I;GIg-B'&u@`y|(I`e8$U8FΘ'^jAj5im֞v]?5 5E\y5ƪvRitZu[GG࿨u V^0\ڜ)pA=Ye Lkح{# P[gsdbڪҥwĝ.Ok(J;wpexC+@(gT$͊,U!C$@[mc4N4/LS4?PK! j5/__init__.pyPK!wdQj5/backends/__init__.pyPK!Ƥcj5/backends/backend.pyPK!OPD)j5/backends/console/__init__.pyPK!gc??j5/backends/console/env.pyPK!iJ;AA"d"j5/backends/console/sb/__init__.pyPK!J!"j5/backends/console/sb/arduino.pyPK!ۭVGG"<j5/backends/console/sr/__init__.pyPK!F%/=j5/backends/console/sr/v4/__init__.pyPK!(L>j5/backends/console/sr/v4/motor_board.pyPK! %%(vGj5/backends/console/sr/v4/power_board.pyPK!9u(]j5/backends/console/sr/v4/servo_board.pyPK!Zd66&fj5/backends/console/zoloto/__init__.pyPK!c *[gj5/backends/console/zoloto/camera_board.pyPK!+> vuj5/backends/hardware/__init__.pyPK!Fqvj5/backends/hardware/env.pyPK!q>>#wj5/backends/hardware/j5/__init__.pyPK!SsJJ"xj5/backends/hardware/j5/raw_usb.pyPK!-!j5/backends/hardware/j5/serial.pyPK!h|BB#٘j5/backends/hardware/sb/__init__.pyPK!33"\j5/backends/hardware/sb/arduino.pyPK!AHH#}j5/backends/hardware/sr/__init__.pyPK!9a jvv&j5/backends/hardware/sr/v4/__init__.pyPK!)K)j5/backends/hardware/sr/v4/motor_board.pyPK!yy)j5/backends/hardware/sr/v4/power_board.pyPK!dX X )j5/backends/hardware/sr/v4/servo_board.pyPK!¶'z j5/backends/hardware/zoloto/__init__.pyPK!`~ +\j5/backends/hardware/zoloto/camera_board.pyPK!U+kj5/base_robot.pyPK! [!j5/boards/__init__.pyPK!A]"j5/boards/board.pyPK!Q"K@:j5/boards/sb/__init__.pyPK!5w:j5/boards/sb/arduino.pyPK!p''Ij5/boards/sr/__init__.pyPK!c-;FFXJj5/boards/sr/v4/__init__.pyPK!EmwKj5/boards/sr/v4/motor_board.pyPK!c Qj5/boards/sr/v4/power_board.pyPK!r_j5/boards/sr/v4/servo_board.pyPK!=ej5/boards/zoloto/__init__.pyPK!L7 fj5/boards/zoloto/camera_board.pyPK!0 kj5/components/__init__.pyPK!<?TTpj5/components/battery_sensor.pyPK!&wj5/components/button.pyPK!-0Oz}j5/components/component.pyPK!1O!mj5/components/derived/__init__.pyPK!yUdo o #Tj5/components/derived/ultrasound.pyPK!I@66j5/components/gpio_pin.pyPK!!qj5/components/led.pyPK!}pRj5/components/marker_camera.pyPK!!uuj5/components/motor.pyPK!|{ { (j5/components/piezo.pyPK!A2g g j5/components/power_output.pyPK!3/yj5/components/servo.pyPK!XX j5/py.typedPK!J<'Hj5/vision/__init__.pyPK![aS S _j5/vision/coordinates.pyPK!?ppj5/vision/markers.pyPK!ffF$ $ j5/vision/orientation.pyPK!QW00 j5-0.7.7.dist-info/LICENSEPK!HnHTULj5-0.7.7.dist-info/WHEELPK!HAj5-0.7.7.dist-info/METADATAPK!Hw j5-0.7.7.dist-info/RECORDPK>>6%