PK!K!NNaudio_metadata/__about__.py__all__ = [ '__author__', '__author_email__', '__copyright__', '__license__', '__summary__', '__title__', '__url__', '__version__', '__version_info__' ] __title__ = 'audio-metadata' __summary__ = 'A library for reading and, in the future, writing metadata from audio files.' __url__ = 'https://github.com/thebigmunch/audio-metadata' __version__ = '0.2.0' __version_info__ = tuple(int(i) for i in __version__.split('.') if i.isdigit()) __author__ = 'thebigmunch' __author_email__ = 'mail@thebigmunch.me' __license__ = 'MIT' __copyright__ = f'2018 {__author__} <{__author_email__}>' PK!Li_audio_metadata/__init__.pyfrom .api import * from .exceptions import * from .formats import * __all__ = [ *api.__all__, *exceptions.__all__, *formats.__all__ ] PK!)  audio_metadata/api.py__all__ = [ 'determine_format', 'load', 'loads' ] import os from .exceptions import UnsupportedFormat from .formats import FLAC, MP3, WAV from .utils import DataReader def determine_format(data, extension=None): """Determine the format of an audio file. Parameters: data (bytes-like object, str, os.PathLike, or file-like object): A bytes-like object, filepath, path-like object or file-like object of an audio file. extension (str): The file extension of the file. Used as a tie-breaker for formats that can be used in multiple containers (e.g. ID3). """ if isinstance(data, (os.PathLike, str)): data = open(data, 'rb') data_reader = DataReader(data) data_reader.seek(0, os.SEEK_SET) d = data_reader.read(4) if d.startswith((b'ID3', b'\xFF\xFB')): # TODO: Catch all MP3 possibilities. if extension is None or extension.endswith('.mp3'): return MP3 if d.startswith((b'fLaC', b'ID3')): if extension is None or extension.endswith('.flac'): return FLAC if d.startswith(b'RIFF'): if extension is None or extension.endswith('.wav'): return WAV return None def load(f): """Load audio metadata from filepath or file-like object. Parameters: f (str, os.PathLike, or file-like object): A filepath, path-like object or file-like object of an audio file. Returns: Format: An audio format object. Raises: UnsupportedFormat: If file is not of a supported format. ValueError: If filepath/file-like object is not valid nor readable. """ if isinstance(f, (os.PathLike, str)): fileobj = open(f, 'rb') else: try: f.read(0) except AttributeError: raise ValueError("Not a valid file-like object.") except Exception: raise ValueError("Can't read from file-like object.") fileobj = f parser_cls = determine_format(fileobj, os.path.splitext(fileobj.name)[1]) if parser_cls is None: raise UnsupportedFormat("Supported format signature not found.") else: fileobj.seek(0, os.SEEK_SET) return parser_cls.load(fileobj) def loads(b): """Load audio metadata from filepath or file-like object. Parameters: b (bytes-like object): A bytes-like object of an audio file. Returns: Format: An audio format object. Raises: UnsupportedFormat: If file is not of a supported format. """ parser_cls = determine_format(b) if parser_cls is None: raise UnsupportedFormat("Supported format signature not found.") return parser_cls.load(b) PK!Uxaudio_metadata/exceptions.py__all__ = [ 'AudioMetadataException', 'InvalidFormat', 'InvalidFrame', 'InvalidHeader', 'UnsupportedFormat' ] class AudioMetadataException(Exception): """Base exception class.""" pass class InvalidFormat(AudioMetadataException): """Exception raised when a file format is invalid.""" pass class InvalidFrame(AudioMetadataException): """Exception raised when a metadata frame is invalid.""" pass class InvalidHeader(AudioMetadataException): """Exception raised when a metadata header is invalid.""" pass class UnsupportedFormat(AudioMetadataException): """Exception raised when loading a file that isn't supported.""" pass PK!֨II"audio_metadata/formats/__init__.pyfrom .flac import * from .id3v2 import * from .id3v2_frames import * from .models import * from .mp3 import * from .tables import * from .vorbis import * from .wav import * __all__ = [ *flac.__all__, *id3v2_frames.__all__, *id3v2.__all__, *models.__all__, *mp3.__all__, *tables.__all__, *vorbis.__all__, *wav.__all__ ] PK!ۮ''audio_metadata/formats/flac.py__all__ = [ 'FLAC', 'FLACApplication', 'FLACCueSheet', 'FLACCueSheetIndex', 'FLACCueSheetTrack', 'FLACMetadataBlock', 'FLACPadding', 'FLACSeekPoint', 'FLACSeekTable', 'FLACStreamInfo', ] import binascii import struct from attr import Factory, attrib, attrs from .models import Format, StreamInfo from .tables import FLACMetadataBlockType from .vorbis import VorbisComment, VorbisPicture from ..exceptions import InvalidHeader from ..structures import DictMixin, ListMixin from ..utils import DataReader, bytes_to_int_be, decode_synchsafe_int @attrs(repr=False) class FLACApplication(DictMixin): """Application metadata block. Attributes: id (str): The 32-bit application identifier. data (bytes): The data defined by the application. """ id = attrib() # noqa data = attrib() def __repr__(self): return f"" @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) id = data.read(4).decode('utf-8', 'replace') # noqa. data = data.read() return cls(id, data) @attrs(repr=False) class FLACCueSheetIndex(DictMixin): """A cue sheet track index point. Attributes: number (int): The index point number. The first index in a track must have a number of 0 or 1. Index numbers must increase by 1 and be unique within a track. For CD-DA, an index number of 0 corresponds to the track pre-gab. offset (int): Offset in samples relative to the track offset. """ number = attrib() offset = attrib() @attrs(repr=False) class FLACCueSheetTrack(DictMixin): """A FLAC cue sheet track. Attributes: track_number (int): The track number of the track. 0 is not allowed to avoid conflicting with the CD-DA spec lead-in track. For CD-DA, the track number must be 1-99 or 170 for the lead-out track. For non-CD-DA, the track number must be 255 for the lead-out track. Track numbers must be unique withint a cue sheet. offset (int): Offset in samples relative to the beginning of the FLAC audio stream. isrc (str): The ISRC (International Standard Recording Code) of the track. type (int): ``0`` for audio, ``1`` for non-audio. pre_emphasis (bool): ``True`` if contains pre-emphasis, ``False`` if not. indexes (list): The index points for the track as :class:`FLACCueSheetIndex` objects. """ track_number = attrib() offset = attrib() isrc = attrib() type = attrib() # noqa pre_emphasis = attrib() indexes = attrib(default=Factory(list)) class FLACCueSheet(ListMixin): """The cue sheet metadata block. A list-like structure of :class:`FLACCueSheetTrack` objects along with some information used in the cue sheet. Attributes: catalog_number (str): The media catalog number. lead_in_samples (int): The number of lead-in samples. This is only meaningful for CD-DA cuesheets. For others, it should be 0. compact_disc (bool): ``True`` if the cue sheet corresponds to a compact disc, else ``False``. """ item_label = 'tracks' def __init__(self, tracks, catalog_number, lead_in_samples, compact_disc): super().__init__(tracks) self.catalog_number = catalog_number self.lead_in_samples = lead_in_samples self.compact_disc = compact_disc @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) catalog_number = data.read(128).rstrip(b'\0').decode('ascii', 'replace') lead_in_samples = bytes_to_int_be(data.read(8)) compact_disc = bool(bytes_to_int_be(data.read(1)) & 128) data.read(258) num_tracks = bytes_to_int_be(data.read(1)) tracks = [] for i in range(num_tracks): offset = bytes_to_int_be(data.read(8)) track_number = bytes_to_int_be(data.read(1)) isrc = data.read(12).rstrip(b'\x00').decode('ascii', 'replace') flags = bytes_to_int_be(data.read(1)) type_ = (flags & 128) >> 7 pre_emphasis = bool(flags & 64) data.read(13) num_indexes = bytes_to_int_be(data.read(1)) track = FLACCueSheetTrack(track_number, offset, isrc, type_, pre_emphasis) for i in range(num_indexes): offset = bytes_to_int_be(data.read(8)) number = bytes_to_int_be(data.read(1)) data.read(3) track.indexes.append(FLACCueSheetIndex(number, offset)) tracks.append(track) return cls(tracks, catalog_number, lead_in_samples, compact_disc) @attrs(repr=False) class FLACMetadataBlock(DictMixin): type = attrib() # noqa size = attrib() def __repr__(self): return f"" @attrs(repr=False) class FLACPadding(DictMixin): size = attrib() def __repr__(self): return f"" @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) return cls(len(data.peek())) @attrs(repr=False) class FLACSeekPoint(DictMixin): first_sample = attrib() offset = attrib() num_samples = attrib() @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) return cls(*struct.unpack('>QQH', data.read())) class FLACSeekTable(ListMixin): item_label = 'seekpoints' def __init__(self, items): super().__init__(items) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) seekpoints = [] seekpoint = data.read(18) while len(seekpoint) == 18: seekpoints.append(FLACSeekPoint.load(seekpoint)) seekpoint = data.read(18) return cls(seekpoints) @attrs(repr=False) class FLACStreamInfo(StreamInfo): _start = attrib() _size = attrib() _min_block_size = attrib() _max_block_size = attrib() _min_frame_size = attrib() _max_frame_size = attrib() bit_depth = attrib() bitrate = attrib() channels = attrib() duration = attrib() md5 = attrib() sample_rate = attrib() @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) stream_info_block_data = struct.unpack('2s2s3s3s8B16s', data.read(34)) min_block_size = bytes_to_int_be(stream_info_block_data[0]) max_block_size = bytes_to_int_be(stream_info_block_data[1]) min_frame_size = bytes_to_int_be(stream_info_block_data[2]) max_frame_size = bytes_to_int_be(stream_info_block_data[3]) sample_rate = bytes_to_int_be(stream_info_block_data[4:7]) >> 4 channels = ((stream_info_block_data[6] >> 1) & 7) + 1 bps_start = (stream_info_block_data[6] & 1) << 4 bps_end = (stream_info_block_data[7] & 240) >> 4 bit_depth = int(bps_start + bps_end + 1) total_samples = bytes_to_int_be( [stream_info_block_data[7] & 15] + list(stream_info_block_data[8:12]) ) duration = total_samples / sample_rate md5sum = binascii.hexlify(stream_info_block_data[12:][0]).decode('ascii', 'replace') return cls( None, None, min_block_size, max_block_size, min_frame_size, max_frame_size, bit_depth, None, channels, duration, md5sum, sample_rate ) class FLAC(Format): """FLAC file format object. Extends :class:`Format`. Attributes: cuesheet (FLACCueSheet): The cuesheet metadata block. pictures (list): A list of :class:`VorbisPicture` objects. seektable (FLACSeekTable): The seektable metadata block. streaminfo (FLACStreamInfo): The audio stream information. tags (VorbisComment): The Vorbis comment metadata block. """ tags_type = VorbisComment def __init__(self): super().__init__() self._blocks = [] @classmethod def load(cls, data): self = super()._load(data) # Ignore ID3v2 in FLAC. if self._obj.peek(3)[0:3] == b'ID3': self._obj.seek(5) extended = bool((self._obj.read(1)[0] & 64)) self._obj.read(decode_synchsafe_int(self._obj.read(4), 7)) if extended: ext_size = decode_synchsafe_int(struct.unpack('4B', self._obj.read(4))[0], 7) self._obj.read(ext_size) if self._obj.read(4) != b'fLaC': raise InvalidHeader("Valid FLAC header not found.") header_data = self._obj.read(4) while len(header_data): metadata_block_header = struct.unpack('B3B', header_data) block_type = metadata_block_header[0] & 127 is_last_block = bool(metadata_block_header[0] & 128) # There are examples of tools writing incorrect block sizes. # The FLAC reference implementation unintentionally (I hope?) parses them. # I've chosen not to add special handling for these invalid files. # If needed, mutagen (https://github.com/quodlibet/mutagen) may support them. size = bytes_to_int_be(metadata_block_header[1:4]) metadata_block_data = self._obj.read(size) if block_type == FLACMetadataBlockType.STREAMINFO: streaminfo_block = FLACStreamInfo.load(metadata_block_data) self.streaminfo = streaminfo_block self._blocks.append(streaminfo_block) elif block_type == FLACMetadataBlockType.PADDING: self._blocks.append(FLACPadding.load(metadata_block_data)) elif block_type == FLACMetadataBlockType.APPLICATION: application_block = FLACApplication.load(metadata_block_data) self._blocks.append(application_block) elif block_type == FLACMetadataBlockType.SEEKTABLE: seektable = FLACSeekTable.load(metadata_block_data) self.seektable = seektable self._blocks.append(seektable) elif block_type == FLACMetadataBlockType.VORBIS_COMMENT: comment_block = VorbisComment.load(metadata_block_data) self.tags = comment_block self._blocks.append(comment_block) elif block_type == FLACMetadataBlockType.CUESHEET: cuesheet_block = FLACCueSheet.load(metadata_block_data) self.cuesheet = cuesheet_block self._blocks.append(cuesheet_block) elif block_type == FLACMetadataBlockType.PICTURE: picture = VorbisPicture.load(metadata_block_data) self.pictures.append(picture) self._blocks.append(picture) elif block_type >= 127: raise InvalidHeader("FLAC header contains invalid block type.") else: self._blocks.append(FLACMetadataBlock(block_type, size)) if is_last_block: pos = self._obj.tell() self.streaminfo._start = pos self.streaminfo._size = self.filesize - self.streaminfo._start if self.streaminfo.duration > 0: self.streaminfo.bitrate = self.streaminfo._size * 8 / self.streaminfo.duration break else: header_data = self._obj.read(4) return self PK!ǵ)  audio_metadata/formats/id3v2.py__all__ = [ 'ID3v2', 'ID3v2Frames', 'ID3v2Header' ] import struct from collections import defaultdict from attr import Factory, attrib, attrs from bidict import frozenbidict from .id3v2_frames import * from .models import Tags from ..exceptions import InvalidFrame, InvalidHeader from ..structures import DictMixin from ..utils import DataReader, decode_synchsafe_int class ID3v2Frames(Tags): FIELD_MAP = frozenbidict({ 'album': 'TALB', 'albumsort': 'TSOA', 'albumartist': 'TPE2', 'albumartistsort': 'TSO2', 'arranger': 'TPE4', 'artist': 'TPE1', 'artistsort': 'TSOP', 'audiodelay': 'TDLY', 'audiolength': 'TLEN', 'audiosize': 'TSIZ', 'bpm': 'TBPM', 'comment': 'COMM', 'compilation': 'TCMP', 'composer': 'TCOM', 'composersort': 'TSOC', 'conductor': 'TPE3', 'copyright': 'TCOP', 'date': 'TYER', 'discnumber': 'TPOS', 'encodedby': 'TENC', 'encodingsettings': 'TSSE', 'genre': 'TCON', 'grouping': 'TIT1', 'isrc': 'TSRC', 'language': 'TLAN', 'lyricist': 'TEXT', 'lyrics': 'USLT', 'media': 'TMED', 'mood': 'TMOO', 'originalalbum': 'TOAL', 'originalartist': 'TOPE', 'originalauthor': 'TOLY', 'originalyear': 'TORY', 'pictures': 'APIC', 'playcount': 'PCNT', 'publisher': 'TPUB', 'recordingdates': 'TRDA', 'subtitle': 'TSST', 'time': 'TIME', 'title': 'TIT2', 'titlesort': 'TSOT', 'tracknumber': 'TRCK' }) def __init__(self, *args, **kwargs): self.update(*args, **kwargs) @classmethod def load(cls, data, id3_version, header_size): if not isinstance(data, DataReader): data = DataReader(data) if id3_version[1] == 2: struct_pattern = '3s3B' size_len = 3 per_byte = 8 elif id3_version[1] == 3: struct_pattern = '4s4B2B' size_len = 4 per_byte = 8 elif id3_version[1] == 4: struct_pattern = '4s4B2B' size_len = 4 per_byte = 7 else: raise ValueError(f"Unsupported ID3 version: {id3_version}") frames = defaultdict(list) while True: try: frame = ID3v2Frame.load(data, struct_pattern, size_len, per_byte) except InvalidFrame: break # Ignore oddities/bad frames. if not isinstance(frame, ID3v2BaseFrame): continue # TODO: Finish any missing frame types. # TODO: Move representation into frame classes? if isinstance( frame, (ID3v2CommentFrame, ID3v2SynchronizedLyricsFrame, ID3v2UnsynchronizedLyricsFrame) ): frames[f'{frame.id}:{frame.description}:{frame.language}'].append(frame.value) elif isinstance(frame, ID3v2GenreFrame): frames['TCON'] = frame.value elif isinstance(frame, ID3v2GEOBFrame): frames[f'GEOB:{frame.description}'].append({ 'filename': frame.filename, 'mime_type': frame.mime_type, 'value': frame.value }) elif isinstance(frame, ID3v2PrivateFrame): frames[f'PRIV:{frame.owner}'].append(frame.value) elif isinstance(frame, (ID3v2UserTextFrame, ID3v2UserURLLinkFrame)): frames[f'{frame.id}:{frame.description}'].append(frame.value) else: frames[frame.id].append(frame.value) return cls(frames) @attrs(repr=False) class ID3v2Header(DictMixin): _size = attrib() version = attrib() flags = attrib(default=Factory(DictMixin)) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) major, revision, _flags, sync_size = struct.unpack('BBB4s', data.read(7)) version = (2, major, revision) if version[1] not in [2, 3, 4]: raise ValueError("Unsupported ID3 version.") flags = DictMixin() flags.unsync = bool((_flags & 128)) flags.extended = bool((_flags & 64)) flags.experimental = bool((_flags & 32)) flags.footer = bool((_flags & 16)) size = decode_synchsafe_int(sync_size, 7) return cls(size, version, flags) class ID3v2(DictMixin): @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) self = cls() if data.read(3) != b"ID3": raise InvalidHeader("Valid ID3v2 header not found.") self._header = ID3v2Header.load(data.read(7)) if self._header.flags.extended: ext_size = decode_synchsafe_int(struct.unpack('4B', data.read(4))[0:4], 7) if self._header.version[1] == 4: data.read(ext_size - 4) else: data.read(ext_size) self.tags = ID3v2Frames.load(data, self._header.version, self._header._size) self.pictures = self.tags.pop('pictures', []) return self PK!,,&audio_metadata/formats/id3v2_frames.py__all__ = [ 'ID3v2BaseFrame', 'ID3v2CommentFrame', 'ID3v2Frame', 'ID3v2GEOBFrame', 'ID3v2GenreFrame', 'ID3v2NumberFrame', 'ID3v2NumericTextFrame', 'ID3v2Picture', 'ID3v2PictureFrame', 'ID3v2PrivateFrame', 'ID3v2SynchronizedLyricsFrame', 'ID3v2TextFrame', 'ID3v2UnsynchronizedLyricsFrame', 'ID3v2URLLinkFrame', 'ID3v2UserTextFrame', 'ID3v2UserURLLinkFrame', 'ID3v2YearFrame' ] import re import struct from urllib.parse import unquote from attr import attrib, attrs from .models import Picture from .tables import ID3PictureType, ID3v1Genres from ..exceptions import InvalidFrame from ..structures import DictMixin from ..utils import ( DataReader, decode_bytestring, decode_synchsafe_int, determine_encoding, get_image_size, split_encoded ) _genre_re = re.compile(r"((?:\((?P\d+|RX|CR)\))*)(?P.+)?") class ID3v2Picture(Picture): def __init__(self, **kwargs): self.update(**kwargs) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) data = data.read() encoding = determine_encoding(data[0:1]) mime_start = 1 mime_end = data.index(b'\x00', 1) mime_type = decode_bytestring(data[mime_start:mime_end]) type = ID3PictureType(data[mime_end + 1]) # noqa desc_start = mime_end + 2 description, image_data = split_encoded(data[desc_start:], encoding) description = decode_bytestring(description, encoding) width, height = get_image_size(image_data) return cls( type=type, mime_type=mime_type, description=description, width=width, height=height, data=image_data ) @attrs(repr=False) class ID3v2BaseFrame(DictMixin): id = attrib() # noqa @attrs(repr=False) class ID3v2CommentFrame(ID3v2BaseFrame): language = attrib() description = attrib() value = attrib() @attrs(repr=False) class ID3v2GenreFrame(ID3v2BaseFrame): value = attrib() @attrs(repr=False) class ID3v2GEOBFrame(ID3v2BaseFrame): mime_type = attrib() filename = attrib() description = attrib() value = attrib() @attrs(repr=False) class ID3v2NumberFrame(ID3v2BaseFrame): value = attrib() @property def number(self): return self.value.split('/')[0] @property def total(self): try: tot = self.value.split('/')[1] except IndexError: tot = None return tot @attrs(repr=False) class ID3v2NumericTextFrame(ID3v2BaseFrame): value = attrib() @attrs(repr=False) class ID3v2PictureFrame(ID3v2BaseFrame): value = attrib(converter=ID3v2Picture.load) @attrs(repr=False) class ID3v2PrivateFrame(ID3v2BaseFrame): owner = attrib() value = attrib() @attrs(repr=False) class ID3v2SynchronizedLyricsFrame(ID3v2BaseFrame): language = attrib() timestamp_format = attrib() description = attrib() value = attrib() @attrs(repr=False) class ID3v2TextFrame(ID3v2BaseFrame): value = attrib() @attrs(repr=False) class ID3v2UnsynchronizedLyricsFrame(ID3v2BaseFrame): language = attrib() description = attrib() value = attrib() @attrs(repr=False) class ID3v2URLLinkFrame(ID3v2BaseFrame): value = attrib() @attrs(repr=False) class ID3v2UserURLLinkFrame(ID3v2BaseFrame): description = attrib() value = attrib() @attrs(repr=False) class ID3v2UserTextFrame(ID3v2BaseFrame): description = attrib() value = attrib() @attrs(repr=False) class ID3v2YearFrame(ID3v2NumericTextFrame): value = attrib() @value.validator def validate_value(self, attribute, value): if ( not value.isdigit() or len(value) != 4 ): raise ValueError("Year frame values must be 4-character number strings.") @attrs(repr=False) class ID3v2TDATFrame(ID3v2NumericTextFrame): value = attrib() @value.validator def validate_value(self, attribute, value): if ( not value.isdigit() or len(value) != 4 or int(value[0:2]) not in range(1, 32) or int(value[2:4]) not in range(1, 13) ): raise ValueError( "TDAT frame value must be a 4-character number string in the DDMM format." ) @attrs(repr=False) class ID3v2TIMEFrame(ID3v2NumericTextFrame): value = attrib() @value.validator def validate_value(self, attribute, value): if ( not value.isdigit() or len(value) != 4 or int(value[0:2]) not in range(0, 24) or int(value[2:4]) not in range(0, 60) ): raise ValueError( "TIME frame value must be a 4-character number string in the HHMM format." ) @attrs(repr=False) class ID3v2Frame(ID3v2BaseFrame): value = attrib() # TODO:ID3v2.2 # TODO: BUF, CNT, CRA, CRM, ETC, EQU, IPL, LNK, MCI, MLL, POP, REV, # TODO: RVA, STC, UFI # TODO: ID3v2.3 # TODO: AENC, COMR, ENCR, EQUA, ETCO, GRID, IPLS, LINK, MCDI, MLLT, OWNE # TODO: PCNT, POPM, POSS, RBUF, RVAD, RVRB, SYTC, UFID, USER # TODO: ID3v2.4 # TODO: ASPI, EQU2, RVA2, SEEK, SIGN, TDEN, TDOR, TDRC, TDRL, TDTG, TIPL # TODO: TMCL, TPRO, _FRAME_TYPES = { # Complex Text Frames 'COM': ID3v2CommentFrame, 'GEO': ID3v2GEOBFrame, 'TXX': ID3v2UserTextFrame, 'COMM': ID3v2CommentFrame, 'GEOB': ID3v2GEOBFrame, 'PRIV': ID3v2PrivateFrame, 'TXXX': ID3v2UserTextFrame, # Genre Frame 'TCO': ID3v2GenreFrame, 'TCON': ID3v2GenreFrame, # Lyrics Frames 'SLT': ID3v2SynchronizedLyricsFrame, 'ULT': ID3v2UnsynchronizedLyricsFrame, 'SYLT': ID3v2SynchronizedLyricsFrame, 'USLT': ID3v2UnsynchronizedLyricsFrame, # Number Frames 'TPA': ID3v2NumberFrame, 'TRK': ID3v2NumberFrame, 'TPOS': ID3v2NumberFrame, 'TRCK': ID3v2NumberFrame, # Numeric Text Frames 'TBP': ID3v2NumericTextFrame, 'TDA': ID3v2TDATFrame, 'TDY': ID3v2NumericTextFrame, 'TIM': ID3v2TIMEFrame, 'TLE': ID3v2NumericTextFrame, 'TOR': ID3v2YearFrame, 'TSI': ID3v2NumericTextFrame, 'TYE': ID3v2YearFrame, 'TBPM': ID3v2NumericTextFrame, 'TDAT': ID3v2TDATFrame, 'TDLY': ID3v2NumericTextFrame, 'TIME': ID3v2TIMEFrame, 'TLEN': ID3v2NumericTextFrame, 'TORY': ID3v2YearFrame, 'TSIZ': ID3v2NumericTextFrame, 'TYER': ID3v2YearFrame, # Picture Frames 'PIC': ID3v2PictureFrame, 'APIC': ID3v2PictureFrame, # Text Frames 'TAL': ID3v2TextFrame, 'TCM': ID3v2TextFrame, 'TCR': ID3v2TextFrame, 'TEN': ID3v2TextFrame, 'TFT': ID3v2TextFrame, 'TKE': ID3v2TextFrame, 'TLA': ID3v2TextFrame, 'TMT': ID3v2TextFrame, 'TOA': ID3v2TextFrame, 'TOF': ID3v2TextFrame, 'TOL': ID3v2TextFrame, 'TOT': ID3v2TextFrame, 'TP1': ID3v2TextFrame, 'TP2': ID3v2TextFrame, 'TP3': ID3v2TextFrame, 'TP4': ID3v2TextFrame, 'TPB': ID3v2TextFrame, 'TRC': ID3v2TextFrame, 'TRD': ID3v2TextFrame, 'TSS': ID3v2TextFrame, 'TT1': ID3v2TextFrame, 'TT2': ID3v2TextFrame, 'TT3': ID3v2TextFrame, 'TXT': ID3v2TextFrame, 'TALB': ID3v2TextFrame, 'TCMP': ID3v2TextFrame, 'TCOM': ID3v2TextFrame, 'TCOP': ID3v2TextFrame, 'TENC': ID3v2TextFrame, 'TEXT': ID3v2TextFrame, 'TFLT': ID3v2TextFrame, 'TIT1': ID3v2TextFrame, 'TIT2': ID3v2TextFrame, 'TIT3': ID3v2TextFrame, 'TKEY': ID3v2TextFrame, 'TLAN': ID3v2TextFrame, 'TMED': ID3v2TextFrame, 'TMOO': ID3v2TextFrame, 'TOAL': ID3v2TextFrame, 'TOFN': ID3v2TextFrame, 'TOLY': ID3v2TextFrame, 'TOPE': ID3v2TextFrame, 'TOWN': ID3v2TextFrame, 'TPE1': ID3v2TextFrame, 'TPE2': ID3v2TextFrame, 'TPE3': ID3v2TextFrame, 'TPE4': ID3v2TextFrame, 'TPUB': ID3v2TextFrame, 'TRDA': ID3v2TextFrame, 'TRSN': ID3v2TextFrame, 'TRSO': ID3v2TextFrame, 'TSO2': ID3v2TextFrame, 'TSOA': ID3v2TextFrame, 'TSOC': ID3v2TextFrame, 'TSOP': ID3v2TextFrame, 'TSOT': ID3v2TextFrame, 'TSRC': ID3v2TextFrame, 'TSSE': ID3v2TextFrame, 'TSST': ID3v2TextFrame, # URL Link Frames 'WAF': ID3v2URLLinkFrame, 'WAR': ID3v2URLLinkFrame, 'WAS': ID3v2URLLinkFrame, 'WCM': ID3v2URLLinkFrame, 'WCP': ID3v2URLLinkFrame, 'WPB': ID3v2URLLinkFrame, 'WXX': ID3v2UserURLLinkFrame, 'WCOM': ID3v2URLLinkFrame, 'WCOP': ID3v2URLLinkFrame, 'WOAF': ID3v2URLLinkFrame, 'WOAR': ID3v2URLLinkFrame, 'WOAS': ID3v2URLLinkFrame, 'WORS': ID3v2URLLinkFrame, 'WPAY': ID3v2URLLinkFrame, 'WPUB': ID3v2URLLinkFrame, 'WXXX': ID3v2UserURLLinkFrame } @classmethod def load(cls, data, struct_pattern, size_len, per_byte): if not isinstance(data, DataReader): data = DataReader(data) try: frame = struct.unpack(struct_pattern, data.read(struct.calcsize(struct_pattern))) except struct.error: raise InvalidFrame("Not enough data.") frame_size = decode_synchsafe_int(frame[1:1 + size_len], per_byte) if frame_size == 0: raise InvalidFrame("Not a valid ID3v2 frame") frame_id = frame[0].decode('iso-8859-1') frame_type = ID3v2Frame._FRAME_TYPES.get(frame_id, cls) frame_data = data.read(frame_size) # TODO: Move logic into frame classes? args = [frame_id] if frame_type is ID3v2CommentFrame: encoding = determine_encoding(frame_data[0:1]) language = decode_bytestring(frame_data[1:4]) args.append(language) values = [decode_bytestring(v, encoding) for v in split_encoded(frame_data[4:], encoding)] # Ignore empty comments. if len(values) < 2: return None args.extend(values) elif frame_type is ID3v2GenreFrame: encoding = determine_encoding(frame_data[0:1]) remainder = frame_data[1:] values = [] while True: split = split_encoded(remainder, encoding) values.extend([decode_bytestring(v, encoding) for v in split]) if len(split) < 2: break remainder = split[1] genres = [] for value in values: match = _genre_re.match(value) if match['name']: genres.append(match['name']) elif match['id']: if match['id'].isdigit() and int(match['id']): try: genres.append(ID3v1Genres[int(match['id'])]) except IndexError: genres.append(value) elif match['id'] == 'CR': genres.append('Cover') elif match['id'] == 'RX': genres.append('Remix') args.append(genres) elif frame_type is ID3v2GEOBFrame: encoding = determine_encoding(frame_data[0:1]) mime_type, remainder = split_encoded(frame_data[1:], encoding) filename, remainder = split_encoded(remainder, encoding) description, value = split_encoded(remainder, encoding) values = [decode_bytestring(mime_type)] values.extend([decode_bytestring(v, encoding) for v in [filename, description]]) values.append(value) args.extend(values) elif frame_type is ID3v2PictureFrame: args.append(frame_data) elif frame_type is ID3v2PrivateFrame: owner_end = frame_data.index(b'\x00') args.append(frame_data[0:owner_end].decode('iso-8859-1')) args.append(frame_data[owner_end + 1:]) elif frame_type is ID3v2UnsynchronizedLyricsFrame: encoding = determine_encoding(frame_data[0:1]) language = decode_bytestring(frame_data[1:4]) args.append(language) for v in split_encoded(frame_data[4:], encoding): args.append(decode_bytestring(v, encoding)) elif frame_type is ID3v2URLLinkFrame: args.append(unquote(decode_bytestring(frame_data))) elif frame_type is ID3v2UserURLLinkFrame: encoding = determine_encoding(frame_data) description, url = split_encoded(frame_data[1:], encoding) args.append(decode_bytestring(description, encoding)) args.append(unquote(decode_bytestring(url))) elif issubclass( frame_type, (ID3v2NumberFrame, ID3v2NumericTextFrame, ID3v2TextFrame, ID3v2UserTextFrame,) ): encoding = determine_encoding(frame_data[0:1]) args.append(decode_bytestring(frame_data[1:], encoding)) elif frame_type is ID3v2Frame: args.append(frame_data) else: args.append(decode_bytestring(frame_data)) try: return frame_type(*args) except (TypeError, ValueError): # Bad frame value. return None PK!s8 audio_metadata/formats/models.py__all__ = ['Format', 'Picture', 'Tags'] import os from enum import Enum from io import BytesIO import pprintpp from bidict import frozenbidict from ..structures import DictMixin from ..utils import DataReader, humanize_bitrate, humanize_duration, humanize_filesize, humanize_sample_rate class Tags(DictMixin): FIELD_MAP = frozenbidict() def __getitem__(self, key): k = self.FIELD_MAP.get(key, key) return super().__getitem__(k) def __setitem__(self, key, value): k = self.FIELD_MAP.get(key, key) return super().__setitem__(k, value) def __delitem__(self, key): k = self.FIELD_MAP.get(key, key) return super().__delitem__(k) def __iter__(self): return iter(self.FIELD_MAP.inv.get(k, k) for k in self.__dict__) def __repr__(self, repr_dict=None): repr_dict = {self.FIELD_MAP.inv.get(k, k): v for k, v in self.__dict__.items() if not k.startswith('_')} return super().__repr__(repr_dict=repr_dict) def __str__(self): str_dict = {self.FIELD_MAP.inv.get(k, k): v for k, v in self.__dict__.items() if not k.startswith('_')} return pprintpp.pformat(str_dict) class Format(DictMixin): """Base class for audio format objects. Attributes: filepath (str): Path to audio file, if applicable. filesize (int): Size of audio file. pictures (list): A list of :class:`Picture` objects. tags (Tags): A :class:`Tags` object. """ tags_type = Tags def __init__(self): self.filepath = None self.filesize = None self.pictures = [] self.tags = self.tags_type() def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if k == 'filesize': repr_dict[k] = humanize_filesize(v, precision=2) elif isinstance(v, Enum): repr_dict[k] = v.name elif isinstance(v, BytesIO): repr_dict[k] = f"<{v.__class__.__name__}>" elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) @classmethod def _load(cls, data): self = cls() if hasattr(data, 'name'): self.filepath = os.path.abspath(data.name) self.filesize = os.path.getsize(data.name) else: self.filepath = '' self.filesize = len(data) if not isinstance(data, DataReader): self._obj = DataReader(data) else: self._obj = data return self class Picture(DictMixin): def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if k == 'data': repr_dict[k] = humanize_filesize(len(v), precision=2) elif isinstance(v, Enum): repr_dict[k] = v.name elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) class StreamInfo(DictMixin): def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if k == 'bitrate': repr_dict[k] = humanize_bitrate(v) elif k == 'duration': repr_dict[k] = humanize_duration(v) elif k == 'sample_rate': repr_dict[k] = humanize_sample_rate(v) elif isinstance(v, Enum): repr_dict[k] = v.name elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) PK!w44audio_metadata/formats/mp3.py__all__ = [ 'LAMEHeader', 'MP3', 'MP3StreamInfo', 'MPEGFrameHeader', 'XingHeader', 'XingTOC' ] import os import re import struct from enum import Enum import more_itertools from attr import attrib, attrs from .id3v2 import ID3v2, ID3v2Frames from .models import Format, StreamInfo from .tables import ( LAMEBitrateMode, LAMEChannelMode, LAMEPreset, LAMEReplayGainOrigin, LAMEReplayGainType, LAMESurroundInfo, MP3BitrateMode, MP3Bitrates, MP3ChannelMode, MP3SampleRates, MP3SamplesPerFrame ) from ..exceptions import InvalidFormat, InvalidFrame, InvalidHeader from ..structures import DictMixin, ListMixin from ..utils import DataReader, humanize_bitrate, humanize_filesize, humanize_sample_rate @attrs(repr=False) class LAMEReplayGain(DictMixin): type = attrib() # noqa origin = attrib() adjustment = attrib() peak = attrib() def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if isinstance(v, Enum): repr_dict[k] = v.name elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) replay_gain_data = struct.unpack('>I2B', data.read(6)) peak_data = replay_gain_data[0] if peak_data == b'\x00\x00\x00\x00': gain_peak = None else: gain_peak = (peak_data - 0.5) / 2 ** 23 gain_type = LAMEReplayGainType(replay_gain_data[1] >> 5) gain_origin = LAMEReplayGainOrigin((replay_gain_data[1] >> 2) & 7) gain_sign = (replay_gain_data[1] >> 1) & 1 adjustment_start = (replay_gain_data[1] & 1) << 4 adjustment_end = replay_gain_data[2] gain_adjustment = (adjustment_start + adjustment_end) / 10 if gain_sign: gain_adjustment *= -1 if not gain_type: return None return cls(gain_type, gain_origin, gain_adjustment, gain_peak) @attrs(repr=False) class LAMEHeader(DictMixin): _crc = attrib() version = attrib() revision = attrib() album_gain = attrib() ath_type = attrib() audio_crc = attrib() audio_size = attrib() bitrate = attrib() bitrate_mode = attrib() channel_mode = attrib() delay = attrib() encoding_flags = attrib() lowpass_filter = attrib() mp3_gain = attrib() noise_shaping = attrib() padding = attrib() preset = attrib() source_sample_rate = attrib() surround_info = attrib() track_gain = attrib() unwise_settings_used = attrib() def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if k == 'bitrate': repr_dict[k] = humanize_bitrate(v) elif k == 'audio_size': repr_dict[k] = humanize_filesize(v, precision=2) elif isinstance(v, Enum): repr_dict[k] = v.name elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) @classmethod def load(cls, data, xing_quality): if not isinstance(data, DataReader): data = DataReader(data) lame_header_data = struct.unpack('>9s2B4s2s2s9BIHH', data.read(36)) encoder = lame_header_data[0] if not encoder.startswith(b'LAME'): raise InvalidHeader('Valid LAME header not found.') version_match = re.search(rb'LAME(\d+)\.(\d+)', encoder) if version_match: version = tuple(int(part) for part in version_match.groups()) else: version = None rev_mode = lame_header_data[1] revision = rev_mode >> 4 bitrate_mode = LAMEBitrateMode(rev_mode & 15) # TODO: Decide what, if anything, to do with the different meanings in LAME. # quality = (100 - xing_quality) % 10 # vbr_quality = (100 - xing_quality) // 10 lowpass_filter = lame_header_data[2] * 100 track_gain = LAMEReplayGain.load(lame_header_data[3] + lame_header_data[4]) album_gain = LAMEReplayGain.load(lame_header_data[3] + lame_header_data[5]) enc_flags = lame_header_data[6] >> 4 nspsytune = bool(enc_flags & 1) nssafejoint = bool(enc_flags & 2) nogap_continued = bool(enc_flags & 4) nogap_continuation = bool(enc_flags & 8) encoding_flags = { 'nogap_continuation': nogap_continuation, 'nogap_continued': nogap_continued, 'nspsytune': nspsytune, 'nssafejoint': nssafejoint } ath_type = lame_header_data[6] & 15 # TODO: Different representation for VBR minimum bitrate vs CBR/ABR specified bitrate? # Can only go up to 255. bitrate = lame_header_data[7] * 1000 delay = (lame_header_data[8] + (lame_header_data[9] >> 4)) << 4 padding = ((lame_header_data[9] & 15) << 8) + lame_header_data[10] source_sample_rate = (lame_header_data[11] >> 6) & 3 unwise_settings_used = bool((lame_header_data[11] >> 5) & 1) channel_mode = LAMEChannelMode((lame_header_data[11] >> 2) & 7) noise_shaping = lame_header_data[11] & 3 mp3_gain = lame_header_data[12] & 127 if lame_header_data[12] & 1: mp3_gain *= -1 surround_info = LAMESurroundInfo((lame_header_data[13] >> 3) & 7) preset_used = ((lame_header_data[13] & 7) << 8) + lame_header_data[14] try: preset = LAMEPreset(preset_used) except ValueError: # 8-320 are used for bitrates and aren't defined in LAMEPreset. preset = f"{preset_used} Kbps" audio_size = lame_header_data[15] audio_crc = lame_header_data[16] lame_crc = lame_header_data[17] return cls( lame_crc, version, revision, album_gain, ath_type, audio_crc, audio_size, bitrate, bitrate_mode, channel_mode, delay, encoding_flags, lowpass_filter, mp3_gain, noise_shaping, padding, preset, source_sample_rate, surround_info, track_gain, unwise_settings_used ) class XingTOC(ListMixin): item_label = 'entries' def __init__(self, items): super().__init__(items) @attrs(repr=False) class XingHeader(DictMixin): _lame = attrib() num_frames = attrib() num_bytes = attrib() toc = attrib() quality = attrib() @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) if data.read(4) not in [b'Xing', b'Info']: raise InvalidHeader('Valid Xing header not found.') flags = struct.unpack('>i', data.read(4))[0] num_frames = num_bytes = toc = quality = lame_header = None if flags & 1: num_frames = struct.unpack('>i', data.read(4))[0] if flags & 2: num_bytes = struct.unpack('>i', data.read(4))[0] if flags & 4: toc = XingTOC(list(bytearray(data.read(100)))) if flags & 8: quality = struct.unpack('>i', data.read(4))[0] if data.read(4) == b'LAME': data.seek(-4, os.SEEK_CUR) lame_header = LAMEHeader.load(data, quality) return cls(lame_header, num_frames, num_bytes, toc, quality) @attrs(repr=False) class MPEGFrameHeader(DictMixin): _start = attrib() _size = attrib() _xing = attrib() version = attrib() layer = attrib() protected = attrib() padded = attrib() bitrate = attrib() channel_mode = attrib() channels = attrib() sample_rate = attrib() def __repr__(self): repr_dict = {} for k, v in sorted(self.items()): if k == 'bitrate': repr_dict[k] = humanize_bitrate(v) elif k == 'sample_rate': repr_dict[k] = humanize_sample_rate(v) elif isinstance(v, Enum): repr_dict[k] = v.name elif not k.startswith('_'): repr_dict[k] = v return super().__repr__(repr_dict=repr_dict) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) frame_start = data.tell() try: sync, flags, indexes, remainder = struct.unpack('BBBB', data.read(4)) except struct.error: raise InvalidFrame('Not a valid MPEG audio frame.') if sync != 255 or flags >> 5 != 7: raise InvalidFrame('Not a valid MPEG audio frame.') version_id = (flags >> 3) & 0x03 version = [2.5, None, 2, 1][version_id] layer_index = (flags >> 1) & 0x03 layer = 4 - layer_index protected = bool(not (flags & 1)) bitrate_index = (indexes >> 4) & 0x0F sample_rate_index = (indexes >> 2) & 0x03 padded = bool(indexes & 0x02) channel_mode = MP3ChannelMode((remainder >> 6) & 0x03) channels = 1 if channel_mode == 3 else 2 if version_id == 1 or layer_index == 0 or bitrate_index == 0 or bitrate_index == 15 or sample_rate_index == 3: raise InvalidFrame('Not a valid MPEG audio frame.') bitrate = MP3Bitrates[(version, layer)][bitrate_index] * 1000 sample_rate = MP3SampleRates[version][sample_rate_index] samples_per_frame, slot_size = MP3SamplesPerFrame[(version, layer)] frame_size = (((samples_per_frame // 8 * bitrate) // sample_rate) + padded) * slot_size xing_header = None if layer == 3: if version == 1: if channel_mode != 3: xing_header_start = 36 else: xing_header_start = 21 elif channel_mode != 3: xing_header_start = 21 else: xing_header_start = 13 data.seek(frame_start + xing_header_start, os.SEEK_SET) t = data.read(4) if t in [b'Xing', b'Info']: data.seek(-4, os.SEEK_CUR) xing_header = XingHeader.load(data.read(frame_size)) return cls( frame_start, frame_size, xing_header, version, layer, protected, padded, bitrate, channel_mode, channels, sample_rate ) @attrs(repr=False) class MP3StreamInfo(StreamInfo): _start = attrib() _size = attrib() _xing = attrib() version = attrib() layer = attrib() protected = attrib() bitrate = attrib() bitrate_mode = attrib() channel_mode = attrib() channels = attrib() duration = attrib() sample_rate = attrib() @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) frames = [] xing_frame = None while (len(frames) < 4) and (not xing_frame): buffer = data.peek(4) if len(buffer) < 4: break start = data.tell() if buffer[0] == 255 and buffer[1] >> 5 == 7: for _ in range(4): try: frame = MPEGFrameHeader.load(data) frames.append(frame) if frame._xing: xing_frame = frame data.seek(frame._start + frame._size, os.SEEK_SET) except InvalidFrame: del frames[:] data.seek(start + 1, os.SEEK_SET) break else: index = buffer.find(b'\xFF', 1) if index == -1: index = len(buffer) data.seek(max(index, 1), os.SEEK_CUR) if not frames and not xing_frame: raise InvalidFormat("Missing XING header and insufficient MPEG frames.") if not frames and xing_frame: frames.append(xing_frame) samples_per_frame, _ = MP3SamplesPerFrame[(frames[0].version, frames[0].layer)] data.seek(0, os.SEEK_END) end_pos = data.tell() # This is an arbitrary amount that should hopefully encompass all end tags. # Starting low so as not to add unnecessary processing time. chunk_size = 64 * 1024 if end_pos > chunk_size: data.seek(-(chunk_size), os.SEEK_END) else: data.seek(0, os.SEEK_SET) end_buffer = data.read() end_tag_offset = 0 for tag_type in [b'APETAGEX', b'LYRICSBEGIN', b'TAG']: tag_offset = end_buffer.rfind(tag_type) if tag_offset > 0: tag_offset = len(end_buffer) - tag_offset if tag_offset > end_tag_offset: end_tag_offset = tag_offset audio_start = frames[0]._start audio_size = end_pos - audio_start - end_tag_offset bitrate_mode = MP3BitrateMode.UNKNOWN xing_header = frames[0]._xing if xing_header: num_samples = samples_per_frame * xing_header.num_frames # I prefer to include the Xing/LAME header as part of the audio. # Google Music seems to do so for calculating client ID. # Haven't tested in too many other scenarios. # But, there should be enough low-level info for people to calculate this if desired. if xing_header._lame: # Old versions of LAME wrote invalid delay/padding for # short MP3s with low bitrate. # Subtract them only them if there would be samples left. lame_padding = xing_header._lame.delay + xing_header._lame.padding if lame_padding < num_samples: num_samples -= lame_padding if xing_header._lame.bitrate_mode in [1, 8]: bitrate_mode = MP3BitrateMode.CBR elif xing_header._lame.bitrate_mode in [2, 9]: bitrate_mode = MP3BitrateMode.ABR elif xing_header._lame.bitrate_mode in [3, 4, 5, 6]: bitrate_mode = MP3BitrateMode.VBR else: if more_itertools.all_equal([frame['bitrate'] for frame in frames]): bitrate_mode = MP3BitrateMode.CBR num_samples = samples_per_frame * (audio_size / frames[0]._size) if bitrate_mode == MP3BitrateMode.CBR: bitrate = frames[0].bitrate else: # Subtract Xing/LAME frame size from audio_size for bitrate calculation accuracy. if xing_header: bitrate = ((audio_size - frames[0]._size) * 8 * frames[0].sample_rate) / num_samples else: bitrate = (audio_size * 8 * frames[0].sample_rate) / num_samples duration = (audio_size * 8) / bitrate version = frames[0].version layer = frames[0].layer protected = frames[0].protected sample_rate = frames[0].sample_rate channel_mode = frames[0].channel_mode channels = frames[0].channels return cls( audio_start, audio_size, xing_header, version, layer, protected, bitrate, bitrate_mode, channel_mode, channels, duration, sample_rate ) class MP3(Format): """MP3 file format object. Extends :class:`Format`. Attributes: pictures (list): A list of :class:`ID3v2Picture` objects. streaminfo (MP3StreamInfo): The audio stream information. tags (ID3v2Frames): The ID3v2 tag frames, if present. """ tags_type = ID3v2Frames @classmethod def load(cls, data): self = super()._load(data) try: id3 = ID3v2.load(self._obj) self._id3 = id3._header self.pictures = id3.pictures self.tags = id3.tags self._obj.seek(self._id3._size, os.SEEK_SET) except (InvalidFrame, InvalidHeader): self._obj.seek(0, os.SEEK_SET) self.streaminfo = MP3StreamInfo.load(self._obj) return self PK!gZZ audio_metadata/formats/tables.py__all__ = [ 'FLACMetadataBlockType', 'ID3PictureType', 'ID3v1Genres', 'LAMEBitrateMode', 'LAMEChannelMode', 'LAMEPreset', 'LAMEReplayGainOrigin', 'LAMEReplayGainType', 'LAMESurroundInfo', 'MP3BitrateMode', 'MP3Bitrates', 'MP3ChannelMode', 'MP3SampleRates', 'MP3SamplesPerFrame' ] from enum import IntEnum class FLACMetadataBlockType(IntEnum): STREAMINFO = 0 PADDING = 1 APPLICATION = 2 SEEKTABLE = 3 VORBIS_COMMENT = 4 CUESHEET = 5 PICTURE = 6 class ID3PictureType(IntEnum): OTHER = 0 FILE_ICON = 1 OTHER_FILE_ICON = 2 COVER_FRONT = 3 COVER_BACK = 4 LEAFLET_PAGE = 5 MEDIA = 6 LEAD_ARTIST = 7 ARTIST = 8 CONDUCTOR = 9 BAND = 10 COMPOSER = 11 LYRICIST = 12 RECORDING_LOCATION = 13 DURING_RECORDING = 14 DURING_PERFORMANCE = 15 SCREEN_CAPTURE = 16 FISH = 17 ILLUSTRATION = 18 ARTIST_LOGOTYPE = 19 BAND_LOGOTYPE = 19 PUBLISHER_LOGOTYPE = 20 STUDIO_LOGOTYPE = 20 ID3v1Genres = [ 'Blues', 'Classic Rock', 'Country', 'Dance', 'Disco', 'Funk', 'Grunge', 'Hip-Hop', 'Jazz', 'Metal', 'New Age', 'Oldies', 'Other', 'Pop', 'R&B', 'Rap', 'Reggae', 'Rock', 'Techno', 'Industrial', 'Alternative', 'Ska', 'Death Metal', 'Pranks', 'Soundtrack', 'Euro-Techno', 'Ambient', 'Trip-Hop', 'Vocal', 'Jazz+Funk', 'Fusion', 'Trance', 'Classical', 'Instrumental', 'Acid', 'House', 'Game', 'Sound Clip', 'Gospel', 'Noise', 'Alt Rock', 'Bass', 'Soul', 'Punk', 'Space', 'Meditative', 'Instrumental Pop', 'Instrumental Rock', 'Ethnic', 'Gothic', 'Darkwave', 'Techno-Industrial', 'Electronic', 'Pop-Folk', 'Eurodance', 'Dream', 'Southern Rock', 'Comedy', 'Cult', 'Gangsta Rap', 'Top 40', 'Christian Rap', 'Pop/Funk', 'Jungle', 'Native American', 'Cabaret', 'New Wave', 'Psychedelic', 'Rave', 'Showtunes', 'Trailer', 'Lo-Fi', 'Tribal', 'Acid Punk', 'Acid Jazz', 'Polka', 'Retro', 'Musical', 'Rock & Roll', 'Hard Rock', 'Folk', 'Folk-Rock', 'National Folk', 'Swing', 'Fast-Fusion', 'Bebop', 'Latin', 'Revival', 'Celtic', 'Bluegrass', 'Avantgarde', 'Gothic Rock', 'Progressive Rock', 'Symphonic Rock', 'Slow Rock', 'Big Band', 'Chorus', 'Easy Listening', 'Acoustic', 'Humour', 'Speech', 'Chanson', 'Opera', 'Chamber Music', 'Sonata', 'Symphony', 'Booty Bass', 'Primus', 'Porn Groove', 'Satire', 'Slow Jam', 'Club', 'Tango', 'Samba', 'Folklore', 'Ballad', 'Power Ballad', 'Rhythmic Soul', 'Freestyle', 'Duet', 'Punk Rock', 'Drum Solo', 'A Cappella', 'Euro-House', 'Dance Hall', 'Goa', 'Drum & Bass', 'Club-House', 'Hardcore', 'Terror', 'Indie', 'BritPop', 'Afro-Punk', 'Polsk Punk', 'Beat', 'Christian Gangsta Rap', 'Heavy Metal', 'Black Metal', 'Crossover', 'Contemporary Christian', 'Chrstian Rock', 'Merengue', 'Salsa', 'Thrash Metal', 'Anime', 'JPop', 'Synthpop', 'Abstract', 'Art Rock', 'Baroque', 'Bhangra', 'Big Beat', 'Breakbeat', 'Chillout', 'Downtempo', 'Dub', 'EBM', 'Eclectic', 'Electro', 'Electroclash', 'Emo', 'Experimental', 'Garage', 'Global', 'IDM', 'Illibient', 'Industro-Goth', 'Jam Band', 'Krautrock', 'Leftfield', 'Lounge', 'Math Rock', 'New Romantic', 'Nu-Breakz', 'Post-Punk', 'Post-Rock', 'Psytrance', 'Shoegaze', 'Space Rock', 'Trop Rock', 'World Music', 'Neoclassical', 'Audiobook', 'Audio Theatre', 'Neue Deutsche Welle', 'Podcast', 'Indie Rock', 'G-Funk', 'Dubstep', 'Garage Rock', 'Psybient' ] class LAMEBitrateMode(IntEnum): UNKNOWN = 0 CBR = 1 ABR = 2 VBR_METHOD_1 = 3 VBR_METHOD_2 = 4 VBR_METHOD_3 = 5 VBR_METHOD_4 = 6 CBR_2_PASS = 8 ABR_2_PASS = 9 RESERVED = 15 class LAMEChannelMode(IntEnum): MONO = 0 STEREO = 1 DUAL_CHANNEL = 2 JOINT_STEREO = 3 FORCED = 4 AUTO = 5 INTENSITY = 6 UNDEFINED = 7 # 8 through 320 are reserved for ABR bitrates. class LAMEPreset(IntEnum): Unknown = 0 V9 = 410 V8 = 420 V7 = 430 V6 = 440 V5 = 450 V4 = 460 V3 = 470 V2 = 480 V1 = 490 V0 = 500 r3mix = 1000 standard = 1001 extreme = 1002 insane = 1003 standard_fast = 1004 extreme_fast = 1005 medium = 1006 medium_fast = 1007 class LAMEReplayGainOrigin(IntEnum): not_set = 0 artist = 1 user = 2 model = 3 average = 4 class LAMEReplayGainType(IntEnum): not_set = 0 radio = 1 audiophile = 2 class LAMESurroundInfo(IntEnum): NO_SURROUND = 0 DPL = 1 DPL2 = 2 AMBISONIC = 3 # (version, layer): bitrate in kilobits per second MP3Bitrates = { (1, 1): [0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448], (1, 2): [0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384], (1, 3): [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320], (2, 1): [0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256], (2, 2): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160], (2, 3): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160], (2.5, 1): [0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256], (2.5, 2): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160], (2.5, 3): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160] } class MP3BitrateMode(IntEnum): UNKNOWN = 0 CBR = 1 VBR = 2 ABR = 3 class MP3ChannelMode(IntEnum): STEREO = 0 JOINT_STEREO = 1 DUAL_CHANNEL = 2 MONO = 3 # version MP3SampleRates = { 1: [44100, 48000, 32000], 2: [22050, 24000, 16000], 2.5: [11025, 12000, 8000] } # (version, layer): (samples_per_frame, slot_size) MP3SamplesPerFrame = { (1, 1): (384, 4), (1, 2): (1152, 1), (1, 3): (1152, 1), (2, 1): (384, 4), (2, 2): (1152, 1), (2, 3): (576, 1), (2.5, 1): (384, 4), (2.5, 2): (1152, 1), (2.5, 3): (576, 1), } PK!!9 audio_metadata/formats/vorbis.py__all__ = ['VorbisComment', 'VorbisPicture'] import struct from collections import defaultdict from .models import Picture, Tags from .tables import ID3PictureType from ..utils import DataReader # TODO: Number frames. class VorbisComment(Tags): def __init__(self, *args, **kwargs): self.update(*args, **kwargs) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) vendor_length = struct.unpack('I', data.read(4))[0] vendor = data.read(vendor_length).decode('utf-8', 'replace') num_comments = struct.unpack('I', data.read(4))[0] fields = defaultdict(list) for i in range(num_comments): length = struct.unpack('I', data.read(4))[0] comment = data.read(length).decode('utf-8', 'replace') if '=' in comment: field, value = comment.split('=', 1) fields[field.lower()].append(value) return cls(**fields, _vendor=vendor) class VorbisPicture(Picture): def __init__(self, **kwargs): self.update(**kwargs) @classmethod def load(cls, data): if not isinstance(data, DataReader): data = DataReader(data) type_, mime_length = struct.unpack('>2I', data.read(8)) mime_type = data.read(mime_length).decode('utf-8', 'replace') desc_length = struct.unpack('>I', data.read(4))[0] description = data.read(desc_length).decode('utf-8', 'replace') width, height, depth, colors = struct.unpack('>4I', data.read(16)) data_length = struct.unpack('>I', data.read(4))[0] data = data.read(data_length) return cls( type=ID3PictureType(type_), mime_type=mime_type, description=description, width=width, height=height, depth=depth, colors=colors, data=data ) PK!Qaudio_metadata/formats/wav.py__all__ = ['WAV', 'WAVStreamInfo'] import os import struct from attr import attrib, attrs from .id3v2 import ID3v2, ID3v2Frames from .models import Format, StreamInfo from ..exceptions import InvalidFrame, InvalidHeader @attrs(repr=False) class WAVStreamInfo(StreamInfo): _start = attrib() _size = attrib() bitrate = attrib() channels = attrib() duration = attrib() sample_rate = attrib() class WAV(Format): """WAV file format object. Extends :class:`Format`. Attributes: pictures (list): A list of :class:`ID3v2Picture` objects. streaminfo (WAVStreamInfo): The audio stream information. tags (ID3v2Frames): The ID3v2 tag frames, if present. """ tags_type = ID3v2Frames @classmethod def load(cls, data): self = super()._load(data) chunk_id, chunk_size, format_ = struct.unpack('4sI4s', self._obj.read(12)) if chunk_id != b'RIFF' or format_ != b'WAVE': raise InvalidHeader("Valid WAVE header not found.") # TODO: Support other subchunks? subchunk_header = self._obj.read(8) while len(subchunk_header) == 8: subchunk_id, subchunk_size = struct.unpack('4sI', subchunk_header) if subchunk_id == b'fmt ': audio_format, channels, sample_rate = struct.unpack('HHI', self._obj.read(8)) byte_rate, block_align, bit_depth = struct.unpack('" # def __str__(self): # return pprintpp.pformat({(k, v) for k, v in self.items() if not k.startswith('_')}) def items(self): return self.__dict__.items() def keys(self): return self.__dict__.keys() def values(self): return self.__dict__.values() class ListMixin: def __init__(self, items=None, item_label=None): self.items = items or [] if item_label: self.item_label = item_label def __contains__(self, item): return item in self.items def __getitem__(self, key): return self.items[key] def __iter__(self): return iter(self.items) def __len__(self): return len(self.items) def __repr__(self): return f"<{self.__class__.__name__} ({len(self)} {self.item_label})>" PK!X2&audio_metadata/utils.py__all__ = [ 'DataReader', 'bytes_to_int_be', 'bytes_to_int_le', 'decode_synchsafe_int', 'get_image_size', 'humanize_bitrate', 'humanize_duration', 'humanize_filesize', 'humanize_sample_rate', 'int_to_bytes_be', 'int_to_bytes_le' ] import os import struct from codecs import BOM_UTF16_BE, BOM_UTF16_LE from functools import reduce from io import DEFAULT_BUFFER_SIZE from attr import attrib, attrs @attrs(slots=True) class DataReader: data = attrib() _position = attrib(default=0, repr=False) def __attrs_post_init__(self): if hasattr(self.data, 'read'): self._position = self.data.tell() def peek(self, size=DEFAULT_BUFFER_SIZE): if size > DEFAULT_BUFFER_SIZE: size = DEFAULT_BUFFER_SIZE try: peeked = self.data.peek(size)[:size] if len(peeked) != size: peeked = self.data.read(size) self.data.seek(-size, os.SEEK_CUR) return peeked except AttributeError: return self.data[self._position:self._position + size] def read(self, size=None): try: read_ = self.data.read(size) except AttributeError: if size is None: size = len(self.data) read_ = self.data[self._position:self._position + size] self._position += len(read_) return read_ def seek(self, offset, whence=os.SEEK_SET): try: self.data.seek(offset, whence) self._position = self.data.tell() except AttributeError: if whence == os.SEEK_CUR: self._position += offset elif whence == os.SEEK_SET: self._position = 0 + offset elif whence == os.SEEK_END: self._position = len(self.data) + offset else: raise ValueError("Invalid 'whence'.") def tell(self): return self._position def bytes_to_int_be(b): return int.from_bytes(b, 'big') def bytes_to_int_le(b): return int.from_bytes(b, 'little') def int_to_bytes_be(i): return i.to_bytes((i.bit_length() + 7) // 8, 'big') def int_to_bytes_le(i): return i.to_bytes((i.bit_length() + 7) // 8, 'little') def decode_bytestring(b, encoding='iso-8859-1'): if not b: return '' if encoding.startswith('utf-16'): if len(b) % 2 != 0 and b[-1:] == b'\x00': b = b[:-1] if b.startswith(BOM_UTF16_BE): b = b[len(BOM_UTF16_BE):] elif b.startswith(BOM_UTF16_LE): b = b[len(BOM_UTF16_LE):] return b.decode(encoding).rstrip('\x00') def decode_synchsafe_int(data, per_byte): return reduce(lambda value, element: (value << per_byte) + element, data, 0) def determine_encoding(b): first = b[0:1] if first == b'\x00': encoding = 'iso-8859-1' elif first == b'\x01': encoding = 'utf-16-be' if b[1:3] == b'\xfe\xff' else 'utf-16-le' elif first == b'\x02': encoding = 'utf-16-be' elif first == b'\x03': encoding = 'utf-8' else: encoding = 'iso-8859-1' return encoding def get_image_size(data): if hasattr(data, 'read'): data = data.read(56) size = len(data) width = height = 0 if size >= 10 and data[:6] in [b'GIF87a', b'GIF89a']: try: width, height = struct.unpack("= 24 and data.startswith(b'\x89PNG') and data[12:16] == b'IHDR': try: width, height = struct.unpack(">LL", data[16:24]) except struct.error: raise ValueError("Invalid PNG file.") elif size >= 16 and data.startswith(b'\x89PNG'): try: width, height = struct.unpack(">LL", data[8:16]) except struct.error: raise ValueError("Invalid PNG file.") elif size >= 2 and data.startswith(b'\xff\xd8'): data = DataReader(data) try: size = 2 ftype = 0 while not 0xc0 <= ftype <= 0xcf or ftype in [0xc4, 0xc8, 0xcc]: data.seek(size, os.SEEK_CUR) while True: b = ord(data.read(1)) if b != 0xff: break ftype = b size = struct.unpack('>H', data.read(2))[0] - 2 data.seek(1, os.SEEK_CUR) height, width = struct.unpack('>HH', data.read(4)) except struct.error: raise ValueError("Invalid JPEG file.") elif size >= 12 and data.startswith(b'\x00\x00\x00\x0cjP'): try: height, width = struct.unpack('>LL', data[48:]) except struct.error: raise ValueError("Invalid JPEG2000 file.") return width, height def humanize_bitrate(bitrate): for divisor, symbol in [(1000 ** 1, 'Kbps'), (1, 'bps')]: if bitrate >= divisor: break return f'{round(bitrate / divisor)} {symbol}' def humanize_duration(duration): if duration // 3600: hours = int(duration // 3600) minutes = int(duration % 3600 // 60) seconds = round(duration % 3600 % 60) return f'{hours:02d}:{minutes:02d}:{seconds:02d}' elif duration // 60: minutes = int(duration // 60) seconds = round(duration % 60) return f'{minutes:02d}:{seconds:02d}' else: return f'00:{round(duration):02d}' def humanize_filesize(filesize, *, precision=0): for divisor, symbol in [(1024 ** 3, 'GiB'), (1024 ** 2, 'MiB'), (1024 ** 1, 'KiB'), (1, 'B')]: if filesize >= divisor: break return f'{filesize / divisor:.{precision}f} {symbol}' def humanize_sample_rate(sample_rate): for divisor, symbol in [(1000 ** 1, 'KHz'), (1, 'Hz')]: if sample_rate >= divisor: break value = sample_rate / divisor return f'{value if value.is_integer() else value:.1f} {symbol}' def split_encoded(data, encoding): try: if encoding in ['iso-8859-1', 'utf-8']: head, tail = data.split(b'\x00', 1) else: if len(data) % 2 != 0: data += b'\x00' head, tail = data.split(b'\x00\x00', 1) if len(head) % 2 != 0: head, tail = data.split(b'\x00\x00\x00', 1) head += b'\x00' except ValueError: return (data,) return head, tail PK!H+dUT$audio_metadata-0.2.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/R(O-)T0343 /, (-JLR()*M IL*4KM̫PK!Hwy y'audio_metadata-0.2.0.dist-info/METADATAXkS<\ )$;΍\)Bvm%V-ג yw#cK~sChDu?BrHk9iFnZQ87|LSZ,Fd$<(J ffQiʂ5ȼJ-Ei!RLy¤쉔9A}T.G挫 PM ,-0n>,`;ܟ8E1"w5'#9aJ^0/@ G_otzn'TJ> R+Ud4"Gd3 q" {,Dž4M4P[MTzo}TW<;ת#k"TR$7ZXo =&&V4l:^Ļ´bA^к]ּ7(yY<4rErc`MB]TJxzI ynjwzdZ6(jܼB垝 ˅JɰBqEX"VD幌  G i*TDQKKʑM81fRf@TxAH("Xؑ! ?*$ih, YЂY/DH O1}<9ʘFԌpX}2e,1^ X&l AFba(g1QiDNc[!UaLHV긣FHkP8cSFuJ `Ts٘KT( "% CдLY4.S#326\&3\!K:SI W F"oiFS'gIIVei:NngSˆyu~t[4` VBKSp%c"%1dPCVN(#-Ma-728ƩUq:PyFM@(ѩ$AL+Xe,MwgT !U2CL5ھAr,ʥ$oYwH D[&1*¤D'&LidQ!v|+=`\3%L Z%* fϬiv({`=0?OS<Żuū,tyƶZ{8Z7n? wHm9 !I\] rzq&1--zqsi\`dYr5k!7:0yDՏx&SX.q |֣ۢmvT5[_+_0b]iG7sA~Lu۶{299α\A6ΥD-GTgzT#T9Lnøk˹1x]qq@V5:3r>][WJ@ݛaTOդ5gtVUV&\۟(W vΓX/ya' ٮYd[/j4%cDqIN~aߢGpMYB4([[ͧwIS yu~Nc& c6A̎@ѥ҅%ь!X r,hʕJuF҅4st跘 Mq"5 טmңTíkIΌLHzk,#0]^v$'GzhSnR,OA%MYQD6"#6 .ܢ㊂f_gv>#ل+>voOQԍ:l~ש4)dG!y0O6."a3hwao cF_pC"-Mhtc; tqL_Dnܠ@Pt(g9g&Lh WA)yxo$8;}"^hW6UXnڋ lz}k=Buux{1cmա[|?~[wt -^ߙYzәd\kmM;pna^WK]3'KkvzvkqPK!H9 _J%audio_metadata-0.2.0.dist-info/RECORDrJ} %`bÆ(@L#dNܼ`..m!wq+1:,l pp *l4ET#_;Y_J!jUs|Ky45XeӍcEMn0-Q|):K%],Xк=I/^cuw84%8n# 3"f۵nLf)*R {S).^*\NC23l@J8#I.r9pKT2L$C-MN?ݺ`