PK!!gampdup/__init__.py# pylint: skip-file from .idle_client import IdleMPDClient from .mpd_client import MPDClient from .types import * from .errors import * __all__ = [ 'IdleMPDClient', 'MPDClient', *errors.__all__, *types.__all__, ] PK!.zzzampdup/base_client.pyfrom typing import AsyncGenerator, List, Optional, Union, Tuple from dataclasses import dataclass, field from asyncio import ( # pylint:disable=unused-import create_task, get_running_loop, CancelledError, Future, Queue, Task ) from .connection import Connection from .errors import CommandError, ConnectionFailedError from .parsing import parse_error from .util import asynccontextmanager @dataclass class MPDConnection: pending_commands: 'Queue[Future[List[str]]]' = field(default_factory=Queue) connection: Optional[Connection] = None loop: Optional[Task] = None async def connect(self, address: str, port: int): self.connection = Connection() await self.connection.connect(address, port) self._start_loop() async def disconnect(self): if self.connection is not None: await self.connection.close() if self.loop is not None: self.loop.cancel() self.loop = None self.connection = None async def run_command(self, command: str) -> List[str]: if self.connection is None: raise ConnectionFailedError() p: 'Future[List[str]]' = get_running_loop().create_future() await self.pending_commands.put(p) await self.connection.write_line(command) result = await p return result async def _read_response(self) -> Union[List[str], CommandError]: if self.connection is None: raise ConnectionFailedError() lines: List[str] = [] while True: line = await self.connection.read_line() if line.startswith('OK'): break if line.startswith('ACK'): return parse_error(line, lines) lines.append(line) return lines def _start_loop(self): self.loop = create_task(self._reply_loop()) async def _reply_loop(self): try: while True: pending = await self.pending_commands.get() try: reply = await self._read_response() except CancelledError: pending.set_exception(ConnectionFailedError()) raise except Exception as e: # pylint: disable=broad-except pending.set_exception(e) else: pending.set_result(reply) self.pending_commands.task_done() except CancelledError: return @dataclass class BaseMPDClient: connection: Optional[MPDConnection] = None details: Optional[Tuple[str, int]] = None @classmethod @asynccontextmanager async def make(cls, address: str, port: int) -> AsyncGenerator: c = cls() await c.connect(address, port) yield c await c.disconnect() async def connect(self, address: str, port: int): self.connection = MPDConnection() await self.connection.connect(address, port) self.details = (address, port) async def disconnect(self): if self.connection is not None: await self.connection.disconnect() async def reconnect(self): if self.connection is None or self.details is None: raise ConnectionFailedError( 'Cannot reconnect if not previously connected.' ) await self.disconnect() await self.connect(*self.details) async def run_command(self, command: str) -> List[str]: if self.connection is None: raise ConnectionFailedError( 'Connection is not established.' ) return await self.connection.run_command(command) PK!Iampdup/connection.pyfrom asyncio import open_connection, StreamReader, StreamWriter from typing import Optional, NamedTuple from dataclasses import dataclass from .errors import ConnectionFailedError class Socket(NamedTuple): reader: StreamReader writer: StreamWriter async def write(self, data: bytes): self.writer.write(data) try: await self.writer.drain() except Exception as e: raise ConnectionFailedError() from e async def readline(self) -> bytes: x = await self.reader.readline() if not x.endswith(b'\n'): raise ConnectionFailedError('Connection aborted while reading.') return x async def close(self): self.writer.close() await self.writer.wait_closed() @dataclass class Connection: connection: Optional[Socket] = None async def connect(self, address: str, port: int): try: self.connection = Socket(*await open_connection(address, port)) except OSError as e: raise ConnectionFailedError('Could not connect to MPD') from e result = await self.read_line() if not result.startswith('OK MPD'): raise ConnectionFailedError('Got wrong response from MPD.') async def write_line(self, command: str): if self.connection is not None: await self.connection.write(command.encode() + b'\n') return raise ConnectionFailedError('Not connected.') async def read_line(self) -> str: if self.connection is not None: line = await self.connection.readline() return line.decode().strip('\n') raise ConnectionFailedError('Not connected.') async def close(self): if self.connection is not None: await self.connection.close() PK!)Rv v ampdup/errors.py'''Classes for raising when errors happen.''' from enum import Enum from typing import Callable, Dict, List from dataclasses import dataclass __all__ = [ 'MPDError', 'ConnectionFailedError', 'ClientTypeError', 'NoCurrentSongError', 'CommandError', 'URINotFoundError', ] class ErrorCode(Enum): '''MPD Error codes included in ACK responses.''' NOT_LIST = 1 ARG = 2 PASSWORD = 3 PERMISSION = 4 UNKNOWN = 5 NO_EXIST = 50 PLAYLIST_MAX = 51 SYSTEM = 52 PLAYLIST_LOAD = 53 UPDATE_ALREADY = 54 PLAYER_SYNC = 55 EXIST = 56 class MPDError(Exception): '''Base class for errors raised by this library.''' class ConnectionFailedError(MPDError): '''Caused when a client fails to connect.''' class ClientTypeError(MPDError): '''Caused when trying to use the wrong type of client for an operation. Example: using the idle client for playback control and vice-versa. ''' class NoCurrentSongError(MPDError): '''Caused where there is no current song playing and one is expected.''' @dataclass class CommandError(MPDError): '''Wraps an error from MPD (an ACK response with error code and reason). Members: code: The MPD error code. line: Which line in a command list caused the error. command: The command that caused the error. message: The error message provided by MPD, unchanged. partial: The (maybe empty) list of lines representing a partial response. ''' code: ErrorCode line: int command: str message: str partial: List[str] def __post_init__(self): codetext = f'{self.code.name}/{self.code.value}' super().__init__( f'[{codetext}@{self.line}] {{{self.command}}} {self.message}' ) class URINotFoundError(CommandError): '''Wraps an error 50 from MPD.''' def __init__(self, *args): super().__init__(ErrorCode.NO_EXIST, *args) ErrorFactory = Callable[[int, str, str, List[str]], CommandError] ERRORS: Dict[ErrorCode, ErrorFactory] = { ErrorCode.NO_EXIST: URINotFoundError, } def get_error_constructor(error_code: ErrorCode) -> ErrorFactory: '''Get the error constructor for an error code, or a generic one. If the error code is not mapped to an exception type, a factory function for CommandError with the correct code is returned. Args: error_code: The error code from MPD. Returns: A function that constructs the correct exception with the remaining arguments. ''' return (ERRORS.get(error_code) or (lambda *args: CommandError(error_code, *args))) PK!xiampdup/idle_client.py'''Idle client module.''' from typing import List from .base_client import BaseMPDClient from .errors import ClientTypeError from .parsing import split_item from .types import Subsystem from .util import has_any_prefix class IdleMPDClient(BaseMPDClient): '''Client that is only capable of running the idle command. More information in the idle() method docstring. ''' async def run_command(self, command: str): if not has_any_prefix(command, ('idle', 'noidle')): raise ClientTypeError( 'Use an MPDClient to run commands other than idle and noidle.' ) return await super().run_command(command) async def idle(self, *subsystems: Subsystem) -> List[Subsystem]: '''Run the idle command, fetching events from the player. Args: *subsystems (Subsystem): Subsystems to listen to (variadic). If empty or omitted listens to all systems. Returns: List[Subsystem]: subsystems that changed since the command was called. ''' subsystem_names = [s.value for s in subsystems] command = ' '.join(['idle', *subsystem_names]) changed = (split_item(i) for i in await self.run_command(command)) return [Subsystem(s) for _, s in changed] async def noidle(self): '''Cancel the current idle command.''' return await self.connection.connection.write_line('noidle') PK!BBampdup/mpd_client.py'''MPD Client module.''' from typing import List, Tuple, Union, Optional from .base_client import BaseMPDClient from .errors import ClientTypeError, NoCurrentSongError from .parsing import from_lines, parse_playlist, parse_single from .util import has_any_prefix from .types import ( Song, SongId, Stats, Status, Single, Tag, SearchType, TimeRange ) Range = Tuple[int, int] PositionOrRange = Union[int, Range] AnySearchType = Union[Tag, SearchType] def position_or_range_arg(arg: Optional[PositionOrRange]) -> str: '''Make argument string for commands that may take a position or a range. Args: arg: Either a position as an int or a range as a tuple. Returns: The correspondent string to the argument. ''' if arg is None: return '' if isinstance(arg, int): return f' {arg}' start, end = arg return f' {start}:{end}' def find_args( queries: List[Tuple[AnySearchType, str]] ) -> str: '''Make argument string for find and search. Args: queries: A list of queries (type and what). Returns: An argument string. ''' return ' '.join(f'{type.value} "{what}"' for type, what in queries) class MPDClient(BaseMPDClient): '''An async MPD Client object for any operations except idle/noidle.''' async def run_command(self, command: str) -> List[str]: if has_any_prefix(command, ('idle', 'noidle')): raise ClientTypeError('Use an IdleClient to use the idle command.') return await super().run_command(command) # MPD status async def current_song(self) -> Song: '''Displays the song info of the current song. Returns: The metadata of the song that is identified in status. ''' result = await self.run_command('currentsong') if not result: raise NoCurrentSongError('There is no current song playing.') return from_lines(Song, result) async def status(self) -> Status: '''Get the current player status. Returns: The current player status. ''' result = await self.run_command('status') return from_lines(Status, result) async def stats(self) -> Stats: '''Get player stats. Refer to the Stats type to see what is available. Returns: Statistics about the player. ''' result = await self.run_command('stats') return from_lines(Stats, result) # Playback options async def consume(self, state: bool): '''Enable or disable consume. When consume is enabled, played songs are removed from the playlist. Args: state: True for consume, otherwise False. ''' await self.run_command(f'consume {int(state)}') async def single(self, mode: Single): '''Enable, disable or set to oneshot single playback. Args: mode: True for random, False for sequential. ''' await self.run_command(f'single {mode.value}') async def random(self, state: bool): '''Enable or disable random playback. Args: state: True for random, False for sequential. ''' await self.run_command(f'random {int(state)}') async def repeat(self, state: bool): '''Enable or disable repeat. Args: state: True to repeat, False to play once. ''' await self.run_command(f'repeat {int(state)}') async def setvol(self, amount: int): '''Set volume. Args: amount: The new volume, from 0 to 100. ''' await self.run_command(f'setvol {amount}') # Playback control async def next(self): '''Play next song in the playlist.''' await self.run_command(f'next') async def pause(self, pause: bool): '''Pause or resume playback. Args: pause: Whether to pause (`True`) or resume (`False`). ''' await self.run_command(f'pause {int(pause)}') async def play(self, pos: Optional[int] = None): '''Begin playback. If supplied, start at `pos` in the playlist. Args: pos: The position in the playlist where to begin playback. If omitted and playback is paused, resume it. If playback was stopped, start from the beginning. ''' arg = '' if pos is None else f' {pos}' await self.run_command(f'play{arg}') async def play_id(self, song_id: Optional[SongId] = None): '''Begin playback. If supplied, start at the song with id `song_id`. Args: song_id: The id of the song in the playlist where to begin playback. If omitted and playback is paused, resume it. If playback was stopped, start from the beginning. ''' arg = '' if song_id is None else f' {song_id}' await self.run_command(f'playid{arg}') async def previous(self): '''Play previous song in the playlist.''' await self.run_command(f'previous') async def seek(self, pos: int, time: float): '''Seek to a certain time of the `pos` entry in the playlist. Args: pos: The position in the playlist of the song to seek. time: The timestamp to seek to in seconds (fractions allowed). ''' await self.run_command(f'seek {pos} {time}') async def seek_id(self, song_id: SongId, time: float): '''Seek to a certain time of the song with id `song_id` in the playlist. Args: song_id: The id of the song in the playlist to seek. time: The timestamp to seek to in seconds (fractions allowed). ''' await self.run_command(f'seekid {song_id} {time}') async def seek_cur_abs(self, time: float): '''Seek to a certain time of the current song. Args: time: The timestamp to seek to in seconds (fractions allowed). ''' time = time if time >= 0 else 0 await self.run_command(f'seekcur {time}') async def seek_cur_rel(self, time: float): '''Seek to a time relative to the current time in the current song. Args: time: The time delta to apply to the current time (fractions allowed). ''' await self.run_command(f'seekcur {time:+}') async def stop(self): '''Stop playback.''' await self.run_command(f'stop') # Current playlist async def add(self, uri: str): '''Add a directory or a file to the current playlist. Args: uri: The URI of what to add. Directories are added recursively. ''' await self.run_command(f'add "{uri}"') async def add_id(self, song_uri: str, position: int = None) -> SongId: '''Add a directory or a file to the current playlist. Args: song_uri: The URI of the song to add. Must be a single file. position: Position on the playlist where to add. End of playlist if omitted. Returns: The id of the added song. ''' pos = '' if position is None else f' {position}' result = await self.run_command(f'addid "{song_uri}"{pos}') return parse_single(result, SongId) async def clear(self): '''Clears the current playlist.''' await self.run_command('clear') async def delete( self, position_or_range: PositionOrRange, ): '''Delete songs from the playlist. Args: position_or_range: either an integer pointing to a specific position in the playlist or an interval. ''' arg = position_or_range_arg(position_or_range) await self.run_command(f'delete{arg}') async def delete_id(self, song_id: SongId): '''Delete a song by its id. Args id: a song id. ''' await self.run_command(f'deleteid {song_id}') async def move(self, what: PositionOrRange, to: int): '''Move a song or a range to another position in the playlist. Args: what: A position or range of songs to move. to: The position on the current state of the playlist where the songs should be moved to. ''' what_arg = position_or_range_arg(what) await self.run_command(f'move {what_arg} {to}') async def move_id(self, song_id: SongId, to: int): '''Move a song by its id to a position in the playlist. If to is negative, it is relative to the current song in the playlist (if there is one). Args: song_id: An id of a song in the playlist. to: The position on the current state of the playlist where the songs should be moved to. ''' await self.run_command(f'moveid {song_id} {to}') async def playlist_find(self, tag: Tag, needle: str) -> List[Song]: '''Search strictly for `needle` among the `tag` values. Args: tag: Which tag to search for. needle: What to search for. Returns: ''' result = await self.run_command( f'playlistfind {tag.value} "{needle}"' ) return parse_playlist(result) async def playlist_id( self, song_id: Optional[SongId] = None ) -> List[Song]: '''Get information about a particular song in the playlist. Args: song_id: The id of the song to search for. If omitted, returns the whole playlist, like `playlist_info`. Returns: A list of Song objects representing the current playlist. ''' arg = f' {song_id}' if song_id else '' result = await self.run_command(f'playlistid{arg}') return parse_playlist(result) async def playlist_info( self, position_or_range: Optional[PositionOrRange] = None, ) -> List[Song]: '''Get information about every song in the current playlist. Args: position_or_range: either an integer pointing to a specific position in the playlist or an interval. If ommited, returns the whole playlist. Returns: A list of Song objects representing the current playlist. ''' arg = position_or_range_arg(position_or_range) result = await self.run_command(f'playlistinfo{arg}') return parse_playlist(result) async def playlist_search(self, tag: Tag, needle: str) -> List[Song]: '''Search case-insensitively for `needle` among the `tag` values. Args: tag: Which tag to search for. needle: What to search for. Returns: ''' result = await self.run_command( f'playlistsearch {tag.value} "{needle}"' ) return parse_playlist(result) async def prio(self, priority: int, song_range: Range): '''Set a song priority for random playback. Args: priority: A value between 0 and 255. song_range: The range of songs in the playlist to change. ''' start, end = song_range start_arg = '' if start is None else f'{start}' end_arg = '' if end is None else f'{end}' result = await self.run_command( f'prio {priority} {start_arg}:{end_arg}' ) return parse_playlist(result) async def prio_id(self, priority: int, song_id: SongId): '''Set a song priority for random playback. Args: priority: A value between 0 and 255. song_range: The range of songs in the playlist to change. ''' result = await self.run_command(f'prioid {priority} {song_id}') return parse_playlist(result) async def range_id( self, song_id: SongId, time_range: TimeRange = TimeRange((None, None)), ): '''Specify the portion of the song that shall be played. Args: song_id: The id of the target song. time_range: A pair of offsets in seconds (fractions allowed). If omitted, removes any range, ''' start, end = time_range start_arg = '' if start is None else f'{start}' end_arg = '' if end is None else f'{end}' await self.run_command(f'rangeid {song_id} {start_arg}:{end_arg}') async def shuffle(self, shuffle_range: Optional[Range] = None): '''Shuffle the current playlist. Args: shuffle_range: An optional range to shuffle. If ommitted, shuffles the whole playlist. ''' arg = position_or_range_arg(shuffle_range) await self.run_command(f'shuffle{arg}') async def swap(self, s1: int, s2: int): '''Swap two songs. Args: s1: the first song's position. s2: the second song's position. ''' await self.run_command(f'swap {s1} {s2}') async def swap_id(self, s1: SongId, s2: SongId): '''Swap two songs by id. Args: s1: the first song's id. s2: the second song's id. ''' await self.run_command(f'swapid {s1} {s2}') # Music database # async def find( # self, # queries: List[Tuple[AnySearchType, str]] # ) -> List[Song]: # '''Search strictly in the music database. # Args: # Returns: # ''' # result = await self.run_command(f'find {find_args(queries)}') # return parse_playlist(result) async def _find_command( self, command: str, filter_expression: str, sort: Optional[Tag] = None, descending: bool = False ) -> List[str]: descending_text = '-' if descending else '' sort_text = ( f' sort {descending_text}{sort.value}' if sort is not None else '' ) result = await self.run_command( f'{command} "{filter_expression}"{sort_text}' ) return result async def find( self, filter_expression: str, sort: Optional[Tag] = None, descending: bool = False ) -> List[Song]: '''Search strictly in the music database. Args: filter_expression: an expression defining a filter for mpd. See MPD protocol version 0.21 documentation. sort: a Tag by which to sort the result. descending: whether to have results descending or ascending. Returns: The songs found by the find command. ''' return parse_playlist( await self._find_command( 'find', filter_expression, sort, descending ) ) async def find_add( self, filter_expression: str, sort: Optional[Tag] = None, descending: bool = False ) -> List[Song]: '''Directly add the results of a strict search in the music database. Args: filter_expression: an expression defining a filter for mpd. See MPD protocol version 0.21 documentation. sort: a Tag by which to sort the result. descending: whether to have results descending or ascending. ''' return parse_playlist( await self._find_command( 'findadd', filter_expression, sort, descending ) ) async def search( self, queries: List[Tuple[AnySearchType, str]] ) -> List[Song]: '''Search case-insensitively in the music database. Args: Returns: ''' result = await self.run_command(f'search {find_args(queries)}') return parse_playlist(result) async def update(self, uri: str = None) -> int: '''Update the database. Args: uri: An optional URI to specify which file or directory to update. If omitted, everything is updated. Returns: The id of the update job. ''' arg = f' "{uri}"' if uri else '' result = await self.run_command(f'update{arg}') return parse_single(result, int) async def rescan(self, uri: str = None) -> int: '''Update the database rescanning unmodified files as well. Args: uri: An optional URI to specify which file or directory to update. If omitted, everything is updated. Returns: The id of the update job. ''' arg = f' "{uri}"' if uri else '' result = await self.run_command(f'rescan{arg}') return parse_single(result, int) PK!nH  ampdup/parsing.py'''MPD output parsing utilities.''' import re from typing import ( overload, Callable, Iterable, List, Tuple, TypeVar, Type, Union ) from .errors import CommandError, ErrorCode, get_error_constructor from .types import Song from .util import from_json_like, split_on __all__ = [ 'normalize', 'split_item', 'from_lines', 'parse_single', 'parse_playlist', 'parse_error', ] T = TypeVar('T') class IncompatibleErrorMessage(Exception): '''Exception in case MPD sends an error in a different format somehow.''' def normalize(name: str) -> str: '''Normalize a value name to a valid Python (PEP8 compliant) identifier. Args: name: The name of a value returned by MPD. Returns: The normalized name, in all lowercase with - replaced by _. ''' return name.lower().replace('-', '_') def split_item(item: str) -> Tuple[str, str]: '''Split a key/value pair in a string into a tuple (key, value). This also strips space from both sides of either. Args: item: A key/value string in 'key: value' format. Returns: The (key, value) tuple, with both sides stripped. ''' lhs, rhs = item.split(':', maxsplit=1) return lhs.strip(), rhs.strip() def from_lines(cls: Type[T], lines: Iterable[str]) -> T: '''Make a `cls` object from a list of lines in MPD output format.''' values = (split_item(l) for l in lines) normalized = {normalize(k): v for k, v in values} return from_json_like(cls, normalized) def parse_error(error_line: str, partial: List[str]) -> CommandError: '''Parse an error from MPD. Errors are of format `ACK [CODE@LINE] {COMMAND} MESSAGE` Args: error_line: an ACK line from MPD. Returns: A CommandError (or subclass) object with the error data. ''' match = ERROR_RE.match(error_line) if match is None: raise IncompatibleErrorMessage(error_line) code, line, command, message = match.groups() error_code = ErrorCode(int(code)) return get_error_constructor(error_code)(int(line), command, message, partial) @overload def parse_single( lines: Iterable[str] # pylint: disable=unused-argument ) -> str: '''Overload.''' @overload # noqa: F811 def parse_single( # pylint: disable=function-redefined lines: Iterable[str], # pylint: disable=unused-argument cast: Callable[[str], T] # pylint: disable=unused-argument ) -> T: '''Overload.''' def parse_single( # noqa: F811, pylint: disable=function-redefined lines: Iterable[str], cast: Callable[[str], T] = None ) -> Union[str, T]: '''Parse a single return value and discard its name. Args: lines: The return from MPD as a list of a single line. cast: An optional function to read the string into another type. Returns: The value as a string or converted into the chosen type. ''' result, = lines _, value = split_item(result) if cast is None: return value return cast(value) def is_file(line: str) -> bool: '''Check if a return line is a song file.''' return line.startswith('file:') def parse_playlist(lines: Iterable[str]) -> List[Song]: '''Parse playlist information into a list of songs.''' split = split_on(is_file, lines) return [from_lines(Song, song_info) for song_info in split] ERROR_RE = re.compile(r'ACK\s+\[(\d+)@(\d+)\]\s+\{(.*)\}\s+(.*)') PK! L& ampdup/types.py'''Types for MPD information.''' from enum import Enum from typing import NamedTuple, Optional, Tuple, NewType TimeRange = NewType('TimeRange', Tuple[Optional[float], Optional[float]]) class SongId(int): '''Strong alias for song ids.''' class Song(NamedTuple): '''Type representing the static data about a playable song in MPD.''' file: str last_modified: str time: int duration: float artist: str = '' artistsort: str = '' albumartist: str = '' albumartistsort: str = '' title: str = '' album: str = '' albumsort: str = '' genre: str = '' label: str = '' format: Optional[str] = None disc: Optional[int] = None track: Optional[int] = None date: Optional[str] = None range: Optional[TimeRange] = None pos: Optional[int] = None id: Optional[SongId] = None prio: Optional[int] = None class SearchType(Enum): '''Special types for searching the database.''' ANY = 'any' FILE = 'file' BASE = 'base' MODIFIED_SINCE = 'modified-since' class Single(Enum): '''"single" setting state.''' DISABLED = '0' ENABLED = '1' ONESHOT = 'oneshot' class State(Enum): '''Player state.''' PLAY = 'play' STOP = 'stop' PAUSE = 'pause' class Status(NamedTuple): '''Type representing the static data about a playable song in MPD.''' repeat: bool random: bool single: Single consume: bool playlist: int playlistlength: int mixrampdb: float state: State volume: Optional[int] = None song: Optional[int] = None songid: Optional[SongId] = None time: Optional[str] = None elapsed: Optional[float] = None bitrate: Optional[int] = None duration: Optional[float] = None audio: Optional[str] = None nextsong: Optional[int] = None nextsongid: Optional[SongId] = None error: Optional[str] = None mixrampdelay: Optional[int] = None updating_db: Optional[int] = None xfade: Optional[int] = None class Stats(NamedTuple): '''Statistics about the player.''' uptime: int playtime: int artists: int albums: int songs: int db_playtime: int db_update: int class Subsystem(Enum): '''Enumeration of available subsystems for idle to listen to.''' DATABASE = 'database' UPDATE = 'update' STORED_PLAYLIST = 'stored_playlist' PLAYLIST = 'playlist' PLAYER = 'player' MIXER = 'mixer' OUTPUT = 'output' OPTIONS = 'options' PARTITION = 'partition' STICKER = 'sticker' SUBSCRIPTION = 'subscription' MESSAGE = 'message' class Tag(Enum): '''Tags supported by MPD.''' ARTIST = 'artist' ARTISTSORT = 'artistsort' ALBUM = 'album' ALBUMSORT = 'albumsort' ALBUMARTIST = 'albumartist' ALBUMARTISTSORT = 'albumartistsort' TITLE = 'title' TRACK = 'track' NAME = 'name' GENRE = 'genre' DATE = 'date' COMPOSER = 'composer' PERFORMER = 'performer' COMMENT = 'comment' DISC = 'disc' MUSICBRAINZ_ARTISTID = 'musicbrainz_artistid' MUSICBRAINZ_ALBUMID = 'musicbrainz_albumid' MUSICBRAINZ_ALBUMARTISTID = 'musicbrainz_albumartistid' MUSICBRAINZ_TRACKID = 'musicbrainz_trackid' MUSICBRAINZ_RELEASETRACKID = 'musicbrainz_releasetrackid' MUSICBRAINZ_WORKID = 'musicbrainz_workid' __all__ = [ 'SearchType', 'Single', 'Song', 'SongId', 'State', 'Stats', 'Status', 'Subsystem', 'Tag', 'TimeRange', ] PK!Eampdup/typing_inspect.py'''This module mimics some functionality from typing-inspect. Original code can be found at github.com/ilevkivskyi/typing_inspect. Until typing-inspect stops being experimental or gets standardized (one can dream), this simpler version will be used. Notice this only needs to support Python 3.7+. ''' # The original typing-inspect module is published on PyPI under the MIT # license. # There was no proper license notice on github, except for setup.py. # License follows: # # Copyright 2018 Ivan Levkivskyi # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from typing import Callable from typing import ( # type: ignore ClassVar, Generic, Tuple, Type, Union, _GenericAlias ) def is_union_type(tp: Type) -> bool: '''Test if the type is a union type. Examples:: is_union_type(int) == False is_union_type(Union) == True is_union_type(Union[int, int]) == False is_union_type(Union[T, int]) == True ''' return (tp is Union or isinstance(tp, _GenericAlias) and tp.__origin__ is Union) def get_origin(tp): '''Get the unsubscripted version of a type. Supports generic types, Union, Callable, and Tuple. Returns None for unsupported types. Examples:: get_origin(int) == None get_origin(ClassVar[int]) == None get_origin(Generic) == Generic get_origin(Generic[T]) == Generic get_origin(Union[T, int]) == Union get_origin(List[Tuple[T, T]][int]) == list # List prior to Python 3.7 ''' if isinstance(tp, _GenericAlias): return tp.__origin__ if tp.__origin__ is not ClassVar else None if tp is Generic: return Generic return None def get_args(tp: Type) -> Tuple: '''Get type arguments with all substitutions performed. For unions, basic simplifications used by Union constructor are performed. On versions prior to 3.7 if `evaluate` is False (default), report result as nested tuple, this matches the internal representation of types. If `evaluate` is True (or if Python version is 3.7 or greater), then all type parameters are applied (this could be time and memory expensive). Examples:: get_args(int) == () get_args(Union[int, Union[T, int], str][int]) == (int, str) get_args(Union[int, Tuple[T, int]][str]) == (int, (Tuple, str, int)) get_args(Union[int, Tuple[T, int]][str], evaluate=True) == \ (int, Tuple[str, int]) get_args(Dict[int, Tuple[T, T]][Optional[int]], evaluate=True) == \ (int, Tuple[Optional[int], Optional[int]]) get_args(Callable[[], T][int], evaluate=True) == ([], int,) ''' if isinstance(tp, _GenericAlias): res = tp.__args__ if get_origin(tp) is Callable and res[0] is not Ellipsis: res = (list(res[:-1]), res[-1]) return res return () def is_optional_type(tp: Type) -> bool: '''Returns `True` if the type is `type(None)`, or is a direct `Union` to `type(None)`, such as `Optional[T]`. NOTE: this method inspects nested `Union` arguments but not `TypeVar` definitions (`bound`/`constraint`). So it will return `False` if - `tp` is a `TypeVar` bound, or constrained to, an optional type - `tp` is a `Union` to a `TypeVar` bound or constrained to an optional type, - `tp` refers to a *nested* `Union` containing an optional type or one of the above. Users wishing to check for optionality in types relying on type variables might wish to use this method in combination with `get_constraints` and `get_bound` ''' if tp is type(None): # noqa return True if is_union_type(tp): return any(is_optional_type(t) for t in get_args(tp)) return False PK!eZampdup/util.py'''Utility module.''' from enum import Enum, EnumMeta from functools import lru_cache from itertools import groupby from operator import itemgetter from typing import ( Any, Callable, Iterable, List, Sequence, Tuple, Type, TypeVar ) from contextlib import asynccontextmanager from .typing_inspect import get_args, is_optional_type from .types import TimeRange __all__ = [ 'asynccontextmanager', ] class NoCommonTypeError(Exception): '''Happens when values in an Enum are not of the same type.''' class EmptyEnumError(Exception): '''Happens when an enum has no value.''' @lru_cache() def underlying_type(enum_type: EnumMeta) -> Type: '''Get the underlying type of an enum. Returns: The type of every value in the enum if it is the same. Raises: NoCommonTypeError: If the values in the enum are not of the same type. EmptyEnumError: If the enum has no value. ''' try: first: Any = next(iter(enum_type)) except StopIteration: raise EmptyEnumError('No value in enum.') t = type(first.value) if any(not isinstance(v.value, t) for v in enum_type): # type: ignore raise NoCommonTypeError('No common type in enum.') return t def is_namedtuple(cls: Type) -> bool: '''Checks if a type inherits typing.NamedTuple. Checking inspects the type and looks for the `_field_types` attribute. Args: cls: Type to inspect. Returns: Whether or not the type was generated by NamedTuple. ''' return hasattr(cls, '_field_types') def from_list(list_type, v) -> List[Any]: '''Make a list of typed objects from a JSON-like list, based on type hints. Args: list_type: A `List[T]`-like type to instantiate . v: A JSON-like list. Returns: list_type: An object of type `list_type`. ''' inner_type, = list_type.__args__ return [from_json_like(inner_type, value) for value in v] T = TypeVar('T') def from_dict(cls: Type[T], d) -> T: '''Make an object from a dict, recursively, based on type hints. Args: cls: The type to instantiate. d: a dict of property names to values. Returns: cls: An object of type `cls`. ''' # pylint:disable=protected-access if hasattr(cls, '_renames'): for k in d: if k in cls._renames: # type: ignore d[cls._renames[k]] = d.pop(k) # type: ignore return cls(**{i: from_json_like(cls._field_types[i], v) # type: ignore for i, v in d.items()}) def time_range(s: str) -> TimeRange: '''Parse a time range from a song metadata. Args: s: The time range as a string. Returns: A (float, float) tuple with offsets in seconds. ''' start, end = s.split('-') return TimeRange((float(start), float(end))) def from_json_like(cls: Type[T], j) -> T: '''Make an object from a JSON-like value, recursively, based on type hints. Args: cls: The type to instantiate. j: the JSON-like object. Returns: cls: An object of type `cls`. ''' if cls is bool: return cls(int(j)) # type: ignore if is_optional_type(cls): for t in get_args(cls): if t is type(None): # noqa continue try: return from_json_like(t, j) except TypeError: continue raise TypeError(f'{j} cannot be converted into {cls}.') if cls is str: return j if cls is TimeRange: return time_range(j) # type: ignore if any(issubclass(cls, t) for t in (int, float)): return cls(j) # type: ignore if issubclass(cls, Enum): return cls(underlying_type(cls)(j)) # type: ignore if issubclass(cls, List): return from_list(cls, j) # type: ignore if is_namedtuple(cls): return from_dict(cls, j) raise TypeError(f'{j} cannot be converted into {cls}.') def has_any_prefix(s: str, prefixes: Sequence[str]) -> bool: '''Checks if a string has any of the provided prefixes. Args: s: The string to check. prefixes: A sequence of prefixes to check. Returns: Whether the string matches any of the prefixes. ''' return any(s.startswith(prefix) for prefix in prefixes) Predicate = Callable[[T], bool] def enumerate_on(pred: Predicate, iterable: Iterable[T], begin: int = 0) -> Iterable[Tuple[int, T]]: '''Generate enumerated tuples based on sentinel elements in an iterable. A sentinel element is one for which `pred` is `True`. From it onwards the enumeration is incremented. Args: pred: A predicate identifying a sentinel element. iterable: An iterable to enumerate. begin: where to begin the enumeration. Returns: The enumerated iterable of tuples. ''' i = begin it = iter(iterable) try: first = next(it) except StopIteration: return yield i, first for e in it: if pred(e): i += 1 yield i, e def split_on(pred: Predicate, iterable: Iterable[T]) -> Iterable[Iterable[T]]: '''Split an iterable based on sentnel elements. A sentinel element is one for which `pred` is `True`. From it onwards a new split is made. Args: pred: A predicate identifying a sentinel element. iterable: An iterable to enumerate. Returns: An iterable with an iterable for each split. ''' enumerated = enumerate_on(pred, iterable) return ((line for _, line in group) for _, group in groupby(enumerated, key=itemgetter(0))) PK!! !ampdup-0.8.0.dist-info/LICENSE.mdThe MIT License (MIT) ===================== Copyright © `2018` `Tarcísio Eduardo Moreira Crocomo ` Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUampdup-0.8.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H,^9ampdup-0.8.0.dist-info/METADATAT]o6|طI$ERqIQvjؗEGK$ˏ$8"A;3;+ aOr^x#5_*'/nj dIKS9(u^H=!X߬{ҌN*z$iW;M:$U]e>q5]u#Νax37V4XjY?ɪ7TLj #_]}x}~z3>jig4Yg6ɲkgQ=\Gv$/w}"7 ܐ5^wYt:7Ն=f>9si~bTW!50LuĬaQH@ðo@y\48B8٩hC: ;Pr|}F1CU Oԩս4x_0-$DV3q˲Ct4Zs<ߦ ĈgLTtD5m[%vWy{6Ì% ơW|U]z$g_.MF蝉>hu_mwJf [Ik]rn[tA-~ PK!H"ޡ؛ampdup-0.8.0.dist-info/RECORDuˎP}? \,PPQAQ@`CC;O?&3vLfzS/U^YCz]3֥ł_= g3`ۇIjE`M}x$"7(oȯN ġ]J1%P;{MFkc4GTEAU 'SlR]B|Z;y?GKha$$Wڶj71WzbUZ) @B{u#b ?7*P/0gseŖ6j(#Y}i/Vl@aQ"+dF0ഴF8 ַ #9^Z]7drCB̰kq4q@ds Y;~TdžQ8vxT22F39υ+@[֖OKN289Czm >+ޘdZK4jnF ?,`&u7iu@{c9 Uu=<II2|cۄT@7&scn ˫$?$^ù/LˊJV:%(<,#pn3?5ԔcT$]]xS g>y'C'k"we,vQܫ9_E PK!!gampdup/__init__.pyPK!.zzzampdup/base_client.pyPK!Iampdup/connection.pyPK!)Rv v  ampdup/errors.pyPK!xi!ampdup/idle_client.pyPK!BB'ampdup/mpd_client.pyPK!nH  jampdup/parsing.pyPK! L& +yampdup/types.pyPK!Eampdup/typing_inspect.pyPK!eZۙampdup/util.pyPK!! !ampdup-0.8.0.dist-info/LICENSE.mdPK!HڽTUtampdup-0.8.0.dist-info/WHEELPK!H,^9ampdup-0.8.0.dist-info/METADATAPK!H"ޡ؛]ampdup-0.8.0.dist-info/RECORDPK3