PK!p+PPvserialport/__init__.py#from SerialPort.PortManager import PortHandler, PortsManager, SerialPortId, PortConfiguration #from SerialPort.ConcreteSerial import BaudRate from .objectTransaction import createPortId, transactObjectThroughSerial from .PortManager import SerialPortId from .ListConcreteSerialPorts import serial_ports __version__ = '0.1.0' PK!t:.wyyvserialport/ConcreteSerial.py# Concrete Serial port representation # author: Flavio Vilante from typing import NamedTuple, Any, Dict from enum import Enum import serial from Common.Byte import Byte, Bytes, convertToBytes, fromBytesTobytes from SerialPort.ListConcreteSerialPorts import serial_ports class BaudRate(Enum): # for complete list see: https://pythonhosted.org/pyserial/pyserial_api.html Bps_2400 = 2400 Bps_9600 = 9600 Bps_19200 = 19200 class Parity(Enum): EVEN = serial.PARITY_EVEN NONE = serial.PARITY_NONE ODD = serial.PARITY_ODD DEFAULT = serial.PARITY_NONE class XonXoff(Enum): DEFAULT = 0 # todo: don't use 'default' keyword, it's error prone class RtsCts(Enum): HARDWARE = 1 DEFAULT = 0 PortName = str TimeOut = int #in seconds ByteSize = int #ie: 8 StopBits = int #ie: 1 class PortParameters(NamedTuple): port: PortName baudRate: BaudRate timeout: TimeOut parity: Parity byteSize: ByteSize stopBits: StopBits xonxoff: XonXoff rtscts: RtsCts Size = int # for more see: https://pyserial.readthedocs.io/en/latest/shortintro.html#opening-serial-ports # todo: insert a exception handler where is necessary # todo: there is a bug, can't make (read)timeout=0 [no blocking], if I do, test hang on serial read. # I'll delay solution, in hope reveal new information on use. class ConcreteSerialPort_Impl1: def __init__(self, parameters: PortParameters ) -> None: self._parameters: PortParameters = parameters def initialize(self) -> serial.Serial: ser = self._init() self._configure(ser) self._open(ser) return ser def _getConfiguration(self) -> Dict[str, Any]: params = { 'url' : self._parameters.port, 'baudrate' : self._parameters.baudRate.value, 'timeout' : self._parameters.timeout, 'parity' : self._parameters.parity.value, 'bytesize' : self._parameters.byteSize, 'stopbits' : self._parameters.stopBits, 'xonxoff' : self._parameters.xonxoff.value, 'rtscts' : self._parameters.rtscts.value, 'write_timeout': 1 # todo: eventually make this parameter available for clients of this class } return params def _read(self, s: Size) -> bytes: config = self._getConfiguration() res: bytes = bytes([]) with serial.serial_for_url(**config) as ser: res = ser.read(s) return res def _write(self, b: bytes) -> Size: config = self._getConfiguration() with serial.serial_for_url(**config) as ser: res = ser.write(b) return res def read(self, s: Size) -> Bytes: r = self._read(s) return convertToBytes(r) def write(self, b: Bytes) -> Size: d = fromBytesTobytes(b) return self._write(d) def createConcretePort(portName: PortName, baudRate: BaudRate) -> ConcreteSerialPort_Impl1: params = PortParameters( port= portName, baudRate= baudRate, timeout= 0.5, # timeout = 0: non-blocking mode, return immediately in any case, returning zero or more, up to the requested number of bytes parity= Parity.NONE, byteSize= 8, stopBits= 1, xonxoff= XonXoff.DEFAULT, rtscts= RtsCts.DEFAULT ) return ConcreteSerialPort_Impl1(params) if __name__ == "__main__": print(serial_ports()) #com5 = createConcretePort('COM5', BaudRate.B_2400_Bps) #com4.write(Byte(65)) #print(com5.read(1)) a = "junior".encode() for i in range(10): #print("ooo") #ser = serial.serial_for_url('COM4', baudrate=9600, timeout=0.5) #print("->",ser.is_open()) #with serial.Serial('COM4', baudrate=9600, timeout=0.5) as ser: # print("gravei") # ser.write(bytes(a)) com4 = createConcretePort('COM5', BaudRate.Bps_19200) size = com4.write(convertToBytes(bytes(a))) com5 = createConcretePort('COM4', BaudRate.Bps_19200) data = com5.read(len(a)) print("li") print(data) #print(data) #with serial.Serial('COM5', baudrate=9600, timeout=0.5) as ser: # s = ser.read(len(a)) # print("li") # print(s) passPK!N]X==&vserialport/ListConcreteSerialPorts.py# This code I take originally from: https://stackoverflow.com/questions/12090503/listing-available-com-ports-with-python # It includes this msg: # Successfully tested on Windows 8.1 x64, Windows 10 x64, Mac OS X 10.9.x / 10.10.x / 10.11.x and # Ubuntu 14.04 / 14.10 / 15.04 / 15.10 with both Python 2 and Python 3. import sys import glob import serial def serial_ports(): """ Lists serial port names :raises EnvironmentError: On unsupported or unknown platforms :returns: A list of the serial ports available on the system """ if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # this excludes your current terminal "/dev/tty" ports = glob.glob('/dev/tty[A-Za-z]*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.*') else: raise EnvironmentError('Unsupported platform') result = [] for port in ports: try: s = serial.Serial(port) s.close() result.append(port) except (OSError, serial.SerialException): pass return result if __name__ == '__main__': print(serial_ports())PK!}Lf vserialport/objectTransaction.py# Transact a unique object through serial. # Algorithm: Send the object, wait, and take the returned object from SerialPort.PortManager import PortsManager, SerialPortId, PortConfiguration from SerialPort.ConcreteSerial import BaudRate from Common.Byte import Byte, Bytes, convertToBytes, fromBytesTobytes from time import sleep # portId helper factory def createPortId(portName:str, baud_rate:str) -> SerialPortId: #todo: check if portName is valid # set baud rate if baud_rate == 9600 : rate = PortConfiguration(BaudRate.Bps_9600) elif baud_rate == 2400: rate = PortConfiguration(BaudRate.Bps_2600) else: raise("PortId esta send criado com baud_rate nao implementado") return SerialPortId(portName, rate) def transactObjectThroughSerial(portId: SerialPortId, dataObj: bytes) -> bytes: # openPhisicalPort manager = PortsManager() port, error1 = manager.requestPortHandler(portId) print(port, error1) if (error1 == None): # Send d = convertToBytes(dataObj) print("Enviando: ", d) s = port.write(d) # Wait sleep(0.150) # read # Create Port Spec portId2 = createPortId('COM4', 9600) port2, error2 = manager.requestPortHandler(portId2) d = port2.read(s) print(d) data = fromBytesTobytes(d) return data else: raise ("Erro na transferencia do pacote pela porta fisica") PK! vserialport/PortManager.pyfrom typing import NamedTuple, List, Union, Callable, Optional, Tuple, cast, Any from SerialPort.ListConcreteSerialPorts import serial_ports from SerialPort.ConcreteSerial import BaudRate from SerialPort.SerialPort_ import SerialPort from Common.Byte import Byte, Bytes from enum import Enum from time import sleep Milliseconds = int UUID = int # Universal Unique Identifier / A ticket number (hash_code) that client will use to request response to server Hash = int # ============ # data model # ============ class PortConfiguration(NamedTuple): baudRate: BaudRate readtTimerIn: Milliseconds = 1200 # don't wait response after this time inferiorWaitTime: Milliseconds = 100 # dont' read before this time PortName = str class SerialPortId(NamedTuple): name: PortName config: PortConfiguration class PortManagerInfo(NamedTuple): listOfPorts: List[PortName] # ============ # functions # ============ Size = int # Port Manager Owns this object # Clients request him this obj to make his sync communication, when he finish they call close: # the spefic protocol to this class is not defined yet. class PortHandler: def __init__(self, s: SerialPortId, m: 'PortsManager'): self._portId: SerialPortId = s self._serialPort: SerialPort = m def read(self, s: Size) -> Bytes: return self._serialPort.read(s) def write(self, b: Bytes) -> Size: return self._serialPort.write(b) Error = Optional[str] # Basicly receive serial messages, and direct these messages to de respective ports, while wait for async_j response. class PortsManager: def __init__(self): self._listOfPortsInUse: List[SerialPortId] = [] def requestPortHandler(self, s: SerialPortId) -> (PortHandler, Error): error: Optional[str] = None handler: PortHandler = None if s.name not in serial_ports(): error = "This port is not available by the operational system." elif s in self._listOfPortsInUse: error = "Port already in use by other routine. Wait until it become available again" else: self._listOfPortsInUse.append(s) ser = SerialPort(s.name, s.config.baudRate) handler = PortHandler(s, ser) return handler, error def update(self): pass def info(self) -> PortManagerInfo: info = PortManagerInfo( listOfPorts=serial_ports() ) return info if __name__=="__main__": portId1 = SerialPortId('COM5', PortConfiguration(BaudRate.Bps_9600)) portId2 = SerialPortId('COM4', PortConfiguration(BaudRate.Bps_9600)) manager = PortsManager() p1, error1 = manager.requestPortHandler(portId1) p2, error2 = manager.requestPortHandler(portId2) print(error1) print(error2) data = [Byte(65),Byte(67),Byte(68)] p1.write(data) print(p2.read(3)) p2.write(data) print(p1.read(2)) print(p1.read(1)) print(p1.read(1))PK!l0ggvserialport/SerialPort_.py#A serial port object from typing import NamedTuple, List, Callable from time import sleep from Common.Byte import Byte, Bytes from SerialPort.ConcreteSerial import createConcretePort, BaudRate # ===================================== class SerialPortInfo(NamedTuple): #initialization time #first transmission type #number of bytes sent #number of bytes read #reconfigurations history... #etc... pass Size = int SerialWriteFunction = Callable[[Bytes], Size] SerialReadFunction = Callable[[Size], Bytes] SerialPortOSName = str #Operational System port Name # Mid-level port representation # Responsible to send and receive Bytes to the concrete serial driver class SerialPort: def __init__(self, p: SerialPortOSName, b: BaudRate): self.osName = p self.baudRate = b self._info = SerialPortInfo() def reconfigure(self, p: SerialPortOSName, b: BaudRate): self.osName = p self.baudRate = b def read(self, s: Size) -> Bytes: port = createConcretePort(self.osName, self.baudRate) return port.read(s) def write(self, b: Bytes) -> Size: port = createConcretePort(self.osName, self.baudRate) return port.write(b) def info(self) -> SerialPortInfo: return self._info if __name__ == "__main__": for i in range(5): port1 = SerialPort("COM5", BaudRate.Bps_9600) port1.write([Byte(65),Byte(67)]) sleep(0.1) port2 = SerialPort("COM4", BaudRate.Bps_2400) data = port2.read(2) print(data) PK!HnHTU!vserialport-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H>y$vserialport-0.1.0.dist-info/METADATAj@)bai"B5zHu`: Rz _S3V;1׵jГ!0S䉮D}ޣ\ l=A8#hcA d9N-zNhKyo>ZqK!/}>F1p;v2<3p~p#m= js)2t<ճv0%>0D+= + /{~VݜxTPK!Hy"vserialport-0.1.0.dist-info/RECORD}ɒP}} X"Z0(2Æx"2?fn6'ṉK`gfUևYzuv^7x! A*Zy. Bۏq uO?*x;VDJ|qvߔ;'(xnX p BQ ׸[zcd %2Et)0aȵB$m$DAՁjOXN䡱vKA{ UT!l ix@2, A.QuH)0KcAm۽_Fq}M1>^|u>xAEJ%5b|mWhVOw/Iϰ<9Wk-radN3Qvgs'Bv&YY_:25sV,8Γe 0| ڐA?Ez0?چ$ A>~PK!p+PPvserialport/__init__.pyPK!t:.wyyvserialport/ConcreteSerial.pyPK!N]X==&9vserialport/ListConcreteSerialPorts.pyPK!}Lf vserialport/objectTransaction.pyPK! vserialport/PortManager.pyPK!l0gg"+vserialport/SerialPort_.pyPK!HnHTU!1vserialport-0.1.0.dist-info/WHEELPK!H>y$T2vserialport-0.1.0.dist-info/METADATAPK!Hy"k3vserialport-0.1.0.dist-info/RECORDPK r5