PK!MM!google_music_scripts/__about__.py__all__ = [ '__author__', '__author_email__', '__copyright__', '__license__', '__summary__', '__title__', '__url__', '__version__', '__version_info__' ] __title__ = 'google-music-scripts' __summary__ = 'A collection of scripts to interact with Google Music.' __url__ = 'https://github.com/thebigmunch/google-music-scripts' __version__ = '4.0.0' __version_info__ = tuple(int(i) for i in __version__.split('.') if i.isdigit()) __author__ = 'thebigmunch' __author_email__ = 'mail@thebigmunch.me' __license__ = 'MIT' __copyright__ = f'2018-2019 {__author__} <{__author_email__}>' PK!rAA google_music_scripts/__init__.pyfrom .__about__ import ( __author__, __author_email__, __copyright__, __license__, __summary__, __title__, __url__, __version__, __version_info__, ) __all__ = [ '__author__', '__author_email__', '__copyright__', '__license__', '__summary__', '__title__', '__url__', '__version__', '__version_info__' ] PK!PQQ google_music_scripts/__main__.py#!/usr/bin/env python3 from . import cli if __name__ == '__main__': cli.run() PK!1HHgoogle_music_scripts/cli.pyimport argparse import math import os import re from pathlib import Path import pendulum from attr import attrib, attrs from pendulum import DateTime from pendulum.tz import fixed_timezone from .__about__ import __title__, __version__ from .commands import ( do_delete, do_download, do_quota, do_search, do_upload ) from .config import configure_logging, get_defaults from .constants import UNIX_PATH_RE from .utils import DictMixin, convert_cygwin_path DATETIME_RE = re.compile( r"(?P\d{4})" r"[-\s]?" r"(?P\d{1,2})?" r"[-\s]?" r"(?P\d{1,2})?" r"[T\s]?" r"(?P\d{1,2})?" r"[:\s]?" r"(?P\d{1,2})?" r"[:\s]?" r"(?P\d{1,2})?" r"(?P[+\-\s])?" r"(?P\d{1,2})?" r"[:\s]?" r"(?P\d{1,2})?" ) FILTER_RE = re.compile(r'(([+-]+)?(.*?)\[(.*?)\])', re.I) DISPATCH = { 'del': do_delete, 'delete': do_delete, 'down': do_download, 'download': do_download, 'quota': do_quota, 'search': do_search, 'up': do_upload, 'upload': do_upload } def _convert_to_int(value): if value is not None: value = int(value) return value @attrs(slots=True, frozen=True, kw_only=True) class ParsedDateTime: year = attrib(converter=_convert_to_int) month = attrib(converter=_convert_to_int) day = attrib(converter=_convert_to_int) hour = attrib(converter=_convert_to_int) minute = attrib(converter=_convert_to_int) second = attrib(converter=_convert_to_int) tz_oper = attrib() tz_hour = attrib(converter=_convert_to_int) tz_minute = attrib(converter=_convert_to_int) class Namespace(DictMixin): pass class UsageHelpFormatter(argparse.RawTextHelpFormatter): def add_usage(self, usage, actions, groups, prefix="Usage: "): super().add_usage(usage, actions, groups, prefix) # Removes the command list while leaving the usage metavar intact. class SubcommandHelpFormatter(UsageHelpFormatter): def _format_action(self, action): parts = super()._format_action(action) if action.nargs == argparse.PARSER: parts = "\n".join(parts.split("\n")[1:]) return parts ######### # Utils # ######### # I use Windows Python install from Cygwin. # This custom click type converts Unix-style paths to Windows-style paths in this case. def custom_path(value): if os.name == 'nt' and UNIX_PATH_RE.match(str(value)): value = Path(convert_cygwin_path(str(value))) value = Path(value) return value def default_to_cwd(): return Path.cwd() def parse_filter(value): conditions = FILTER_RE.findall(value) if not conditions: raise ValueError(f"'{value}' is not a valid filter.") filter_ = [ tuple(condition[1:]) for condition in conditions ] return filter_ def split_album_art_paths(value): paths = value if value: paths = [] if not isinstance(value, list): value = value.split(',') for val in value: if os.name == 'nt' and UNIX_PATH_RE.match(val.strip()): paths.append(convert_cygwin_path(val.strip())) else: paths.append(Path(val)) return paths def time_period( dt_string, *, in_=False, on=False, before=False, after=False ): match = DATETIME_RE.match(dt_string) if not match or match['year'] is None: raise argparse.ArgumentTypeError(f"'{dt_string}' is not a supported datetime string.") parsed = ParsedDateTime(**match.groupdict()) if parsed.tz_hour: tz_offset = 0 if parsed.tz_hour is not None: tz_offset += parsed.tz_hour * 3600 if parsed.tz_minute is not None: tz_offset += parsed.tz_minute * 60 if parsed.tz_oper == '-': tz_offset *= -1 parsed_tz = fixed_timezone(tz_offset) else: parsed_tz = pendulum.local_timezone() if in_: if parsed.day: raise argparse.ArgumentTypeError( f"Datetime string must contain only year or year/month for 'in' option." ) start = pendulum.datetime( parsed.year, parsed.month or 1, parsed.day or 1, tz=parsed_tz ) if parsed.month: end = start.end_of('month') else: end = start.end_of('year') return pendulum.period( start, end ) elif on: if ( not all( getattr(parsed, attr) for attr in ['year', 'month', 'day'] ) or parsed.hour ): raise argparse.ArgumentTypeError( f"Datetime string must contain only year, month, and day for 'on' option." ) dt = pendulum.datetime( parsed.year, parsed.month, parsed.day, tz=parsed_tz ) return pendulum.period( dt.start_of('day'), dt.end_of('day') ) elif before: start = DateTime.min dt = pendulum.datetime( parsed.year, parsed.month or 1, parsed.day or 1, parsed.hour or 23, parsed.minute or 59, parsed.second or 59, 0, tz=parsed_tz ) if not parsed.month: dt = dt.start_of('year') elif not parsed.day: dt = dt.start_of('month') elif not parsed.hour: dt = dt.start_of('day') elif not parsed.minute: dt = dt.start_of('hour') elif not parsed.second: dt = dt.start_of('minute') return pendulum.period( start, dt ) elif after: end = DateTime.max dt = pendulum.datetime( parsed.year, parsed.month or 1, parsed.day or 1, parsed.hour or 23, parsed.minute or 59, parsed.second or 59, 99999, tz=parsed_tz ) if not parsed.month: dt = dt.end_of('year') elif not parsed.day: dt = dt.end_of('month') elif not parsed.hour: dt = dt.start_of('day') elif not parsed.minute: dt = dt.start_of('hour') elif not parsed.second: dt = dt.start_of('minute') return pendulum.period( dt, end ) ######## # Meta # ######## meta = argparse.ArgumentParser( add_help=False ) meta_options = meta.add_argument_group("Options") meta_options.add_argument( '-h', '--help', action='help', help="Display help." ) meta_options.add_argument( '-V', '--version', action='version', version=f"{__title__} {__version__}", help="Output version." ) ########## # Action # ########## dry_run = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) dry_run_options = dry_run.add_argument_group("Action") dry_run_options.add_argument( '-n', '--dry-run', action='store_true', help="Output results without taking action." ) yes = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) yes_options = yes.add_argument_group("Action") yes_options.add_argument( '-y', '--yes', action='store_true', help="Don't ask for confirmation." ) ########### # Logging # ########### logging_ = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) logging_options = logging_.add_argument_group("Logging") logging_options.add_argument( '-v', '--verbose', action='count', help="Increase verbosity of output." ) logging_options.add_argument( '-q', '--quiet', action='count', help="Decrease verbosity of output." ) logging_options.add_argument( '--debug', action='store_true', help="Output log messages from dependencies." ) logging_options.add_argument( '--log-to-file', action='store_true', help="Log to file as well as stdout." ) ################## # Identification # ################## ident = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) ident_options = ident.add_argument_group("Identification") ident_options.add_argument( '-u', '--username', metavar='USER', help="Your Google username or e-mail address.\nUsed to separate saved credentials." ) mc_ident = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) mc_ident_options = mc_ident.add_argument_group("Identification") mc_ident_options.add_argument( '--device-id', metavar='ID', help="A mobile device id." ) mm_ident = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) mm_ident_options = mm_ident.add_argument_group("Identification") mm_ident_options.add_argument( '--uploader-id', metavar='ID', help="A unique id given as a MAC address (e.g. '00:11:22:33:AA:BB').\nThis should only be provided when the default does not work." ) ######### # Local # ######### local = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) local_options = local.add_argument_group("Local") local_options.add_argument( '--no-recursion', action='store_true', help="Disable recursion when scanning for local files.\nRecursion is enabled by default." ) local_options.add_argument( '--max-depth', metavar='DEPTH', type=int, help="Set maximum depth of recursion when scanning for local files.\nDefault is infinite recursion." ) local_options.add_argument( '-xp', '--exclude-path', metavar='PATH', action='append', dest='exclude_paths', help="Exclude filepaths.\nCan be specified multiple times." ) local_options.add_argument( '-xr', '--exclude-regex', metavar='RX', action='append', dest='exclude_regexes', help="Exclude filepaths using regular expressions.\nCan be specified multiple times." ) local_options.add_argument( '-xg', '--exclude-glob', metavar='GP', action='append', dest='exclude_globs', help="Exclude filepaths using glob patterns.\nCan be specified multiple times.\nAbsolute glob patterns not supported." ) ########## # Filter # ########## # Metadata filter_metadata = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) metadata_options = filter_metadata.add_argument_group("Filter") metadata_options.add_argument( '-f', '--filter', metavar='FILTER', action='append', dest='filters', type=parse_filter, help="Metadata filters.\nCan be specified multiple times." ) # Dates filter_dates = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) dates_options = filter_dates.add_argument_group("Filter") dates_options.add_argument( '--created-in', metavar='DATE', type=lambda d: time_period(d, in_=True), help="Include songs created in year or year/month." ) dates_options.add_argument( '--created-on', metavar='DATE', type=lambda d: time_period(d, on=True), help="Include songs created on date." ) dates_options.add_argument( '--created-before', metavar='DATE', type=lambda d: time_period(d, before=True), help="Include songs created before datetime." ) dates_options.add_argument( '--created-after', metavar='DATE', type=lambda d: time_period(d, after=True), help="Include songs created after datetime." ) dates_options.add_argument( '--modified-in', metavar='DATE', type=lambda d: time_period(d, in_=True), help="Include songs created in year or year/month." ) dates_options.add_argument( '--modified-on', metavar='DATE', type=lambda d: time_period(d, on=True), help="Include songs created on date." ) dates_options.add_argument( '--modified-before', metavar='DATE', type=lambda d: time_period(d, before=True), help="Include songs modified before datetime." ) dates_options.add_argument( '--modified-after', metavar='DATE', type=lambda d: time_period(d, after=True), help="Include songs modified after datetime." ) ############### # Upload Misc # ############### upload_misc = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) upload_misc_options = upload_misc.add_argument_group("Misc") upload_misc_options.add_argument( '--delete-on-success', action='store_true', help="Delete successfully uploaded local files." ) upload_misc_options.add_argument( '--no-sample', action='store_true', help="Don't create audio sample with ffmpeg/avconv.\nSend empty audio sample." ) upload_misc_options.add_argument( '--album-art', metavar='ART_PATHS', type=split_album_art_paths, help="Comma-separated list of album art filepaths.\nCan be relative filenames and/or absolute filepaths." ) ######## # Sync # ######## sync = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) sync_options = sync.add_argument_group("Sync") sync_options.add_argument( '--use-hash', action='store_true', help="Use audio hash to sync songs." ) sync_options.add_argument( '--no-use-hash', action='store_true', help="Don't use audio hash to sync songs." ) sync_options.add_argument( '--use-metadata', action='store_true', help="Use metadata to sync songs." ) sync_options.add_argument( '--no-use-metadata', action='store_true', help="Don't use metadata to sync songs." ) ########## # Output # ########## output = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) output_options = output.add_argument_group("Output") output_options.add_argument( '-o', '--output', metavar='TEMPLATE_PATH', type=lambda t: str(custom_path(t)), help="Output file or directory name which can include template patterns." ) ########### # Include # ########### include = argparse.ArgumentParser( argument_default=argparse.SUPPRESS, add_help=False ) include_options = include.add_argument_group("Include") include_options.add_argument( 'include', metavar='PATH', type=lambda p: custom_path(p).resolve(), nargs='*', help="Local paths to include songs from." ) ####### # gms # ####### gms = argparse.ArgumentParser( prog='gms', description="A collection of scripts to interact with Google Music.", usage=argparse.SUPPRESS, parents=[meta], formatter_class=SubcommandHelpFormatter, add_help=False ) subcommands = gms.add_subparsers( title="Commands", dest='_command', metavar="" ) ########## # Delete # ########## delete_command = subcommands.add_parser( 'delete', aliases=['del'], description="Delete song(s) from Google Music.", help="Delete song(s) from Google Music.", formatter_class=UsageHelpFormatter, usage="gms delete [OPTIONS]", parents=[ meta, dry_run, yes, logging_, ident, mc_ident, filter_metadata, filter_dates ], add_help=False ) ############ # Download # ############ download_command = subcommands.add_parser( 'download', aliases=['down'], description="Download song(s) from Google Music.", help="Download song(s) from Google Music.", formatter_class=UsageHelpFormatter, usage="gms download [OPTIONS]", parents=[ meta, dry_run, logging_, ident, mm_ident, mc_ident, local, filter_metadata, filter_dates, sync, output, include ], add_help=False ) ######### # Quota # ######### quota_command = subcommands.add_parser( 'quota', description="Get the uploaded song count and allowance.", help="Get the uploaded song count and allowance.", formatter_class=UsageHelpFormatter, usage="gms quota [OPTIONS]", parents=[ meta, logging_, ident, mm_ident ], add_help=False ) ########## # Search # ########## search_command = subcommands.add_parser( 'search', description="Search a Google Music library for songs.", help="Search for Google Music library songs.", formatter_class=UsageHelpFormatter, usage="gms search [OPTIONS]", parents=[ meta, yes, logging_, mc_ident, filter_metadata ], add_help=False ) ########## # Upload # ########## upload_command = subcommands.add_parser( 'upload', aliases=['up'], description="Upload song(s) to Google Music.", help="Upload song(s) to Google Music.", formatter_class=UsageHelpFormatter, usage="gms upload [OPTIONS] [INCLUDE_PATH]...", parents=[ meta, dry_run, logging_, ident, mm_ident, mc_ident, local, filter_metadata, filter_dates, upload_misc, sync, include ], add_help=False ) def parse_args(): return gms.parse_args(namespace=Namespace()) def check_args(args): if all( option in args for option in ['use_hash', 'no_use_hash'] ): raise ValueError( "Use one of --use-hash/--no-use-hash', not both." ) if all( option in args for option in ['use_metadata', 'no_use_metadata'] ): raise ValueError( "Use one of --use-metadata/--no-use-metadata', not both." ) def default_args(args): defaults = Namespace() # Set defaults. defaults.verbose = 0 defaults.quiet = 0 defaults.debug = False defaults.log_to_file = False defaults.dry_run = False defaults.username = '' defaults.filters = [] if args._command in ['down', 'download', 'up', 'upload']: defaults.uploader_id = None defaults.device_id = None defaults.no_recursion = False defaults.max_depth = math.inf defaults.exclude_paths = [] defaults.exclude_regexes = [] defaults.exclude_globs = [] defaults.include = [custom_path('.').resolve()] if 'no_use_hash' in args: defaults.use_hash = False defaults.no_use_hash = True else: defaults.use_hash = True defaults.no_use_hash = False if 'no_use_metadata' in args: defaults.use_metadata = False defaults.no_use_metadata = True else: defaults.use_metadata = True defaults.no_use_metadata = False elif args._command in ['quota']: defaults.uploader_id = None else: defaults.device_id = None if args._command in ['del', 'delete', 'search']: defaults.yes = False if args._command in ['down', 'download']: defaults.output = str(Path('.').resolve()) defaults.include = [custom_path('.').resolve()] if args._command in ['up', 'upload']: defaults.delete_on_success = False defaults.no_sample = False defaults.album_art = None config_defaults = get_defaults(args._command, username=args.get('username')) for k, v in config_defaults.items(): if k == 'album_art': defaults.album_art = split_album_art_paths(v) elif k == 'filters': defaults.filters = [ parse_filter(filter_) for filter_ in v ] elif k == 'max_depth': defaults.max_depth = int(v) elif k == 'output': defaults.output = str(custom_path(v)) elif k == 'include': defaults.include = [ custom_path(val) for val in v ] elif k in ['use_hash', 'use_metadata']: defaults[k] = v defaults[f"no_{k}"] = not v elif k in ['no_use_hash', 'no_use_metadata']: defaults[k] = v defaults[f"{k.replace('no_', '')}"] = not v elif k.startswith(('created', 'modified')): if k.endswith('in'): defaults[k] = time_period(v, in_=True) elif k.endswith('on'): defaults[k] = time_period(v, on=True) elif k.endswith('before'): defaults[k] = time_period(v, before=True) elif k.endswith('after'): defaults[k] = time_period(v, after=True) else: defaults[k] = v return defaults def merge_defaults(defaults, parsed): args = Namespace() args.update(defaults) args.update(parsed) if args.get('no_recursion'): args.max_depth = 0 return args def run(): try: parsed = parse_args() if parsed.get('_command'): command = parsed._command else: gms.parse_args(['-h']) defaults = default_args(parsed) args = merge_defaults(defaults, parsed) configure_logging( args.verbose - args.quiet, username=args.username, debug=args.debug, log_to_file=args.log_to_file ) DISPATCH[command](args) except KeyboardInterrupt: gms.exit(130, "Interrupted by user") PK!,, google_music_scripts/commands.pyimport sys import google_music import google_music_utils as gm_utils from google_music_proto.musicmanager.utils import generate_client_id from loguru import logger from more_itertools import first_true from natsort import natsorted from .core import ( download_songs, filter_google_dates, filter_local_dates, get_google_songs, get_local_songs, upload_songs ) from .utils import template_to_base_path def do_delete(args): logger.success("Logging in to Mobile Client") mc = google_music.mobileclient(args.username, device_id=args.device_id) if not mc.is_authenticated: sys.exit("Failed to authenticate Mobile Client") to_delete = filter_google_dates( get_google_songs(mc, filters=args.filters), created_in=args.get('created_in'), created_on=args.get('created_on'), created_before=args.get('created_before'), created_after=args.get('created_after'), modified_in=args.get('modified_in'), modified_on=args.get('modified_on'), modified_before=args.get('modified_before'), modified_after=args.get('modified_after') ) if not to_delete: logger.success("No songs to delete") elif args.dry_run: logger.success(f"Found {len(to_delete)} songs to delete") for song in to_delete: title = song.get('title', "") artist = song.get('artist', "") album = song.get('album', "") song_id = song['id'] logger.success(f"{title} -- {artist} -- {album} ({song_id})") else: confirm = args.yes or input( f"\nAre you sure you want to delete {len(to_delete)} song(s) from Google Music? (y/n) " ) in ("y", "Y") if confirm: song_num = 0 total = len(to_delete) pad = len(str(total)) for song in to_delete: song_num += 1 title = song.get('title', "") artist = song.get('artist', "") album = song.get('album', "") song_id = song['id'] logger.info(f"Deleting {title} -- {artist} -- {album} ({song_id})") mc.song_delete(song) logger.success(f"Deleted {song_num:>{pad}}/{total}") else: logger.success("No songs deleted") mc.logout() logger.success("All done!") def do_download(args): logger.success("Logging in to Music Manager") mm = google_music.musicmanager(args.username, uploader_id=args.uploader_id) if not mm.is_authenticated: sys.exit("Failed to authenticate Music Manager") logger.success("Logging in to Mobile Client") mc = google_music.mobileclient(args.username, device_id=args.device_id) if not mc.is_authenticated: sys.exit("Failed to authenticate Mobile Client") logger.success("Loading Google songs") google_songs = get_google_songs(mm, filters=args.filters) base_path = template_to_base_path(args.output, google_songs) filepaths = [base_path] if args.include: filepaths.extend(args.include) mc_songs = get_google_songs(mc, filters=args.filters) if any( args.get(option) for option in [ 'created_in', 'created_on', 'created_before', 'created_after', 'modified_in', 'modified_on', 'modified_before', 'modified_after' ] ): mc_songs = filter_google_dates( mc_songs, created_in=args.get('created_in'), created_on=args.get('created_on'), created_before=args.get('created_before'), created_after=args.get('created_after'), modified_in=args.get('modified_in'), modified_on=args.get('modified_on'), modified_before=args.get('modified_before'), modified_after=args.get('modified_after') ) logger.success("Loading local songs") local_songs = get_local_songs( filepaths, filters=args.filters, max_depth=args.max_depth, exclude_paths=args.exclude_paths, exclude_regexes=args.exclude_regexes, exclude_globs=args.exclude_globs ) missing_songs = [] if args.use_hash: logger.success("Comparing hashes") existing_songs = [] google_client_id_map = { song.get('clientId'): song for song in mc_songs } local_client_ids = {generate_client_id(song) for song in local_songs} for client_id, mc_song in google_client_id_map.items(): song = first_true( (song for song in google_songs), pred=lambda song: song.get('id') == mc_song.get('id') ) if client_id not in local_client_ids: missing_songs.append(song) else: existing_songs.append(song) logger.success(f"Found {len(existing_songs)} songs already exist by audio hash") if logger._min_level <= 20: for song in existing_songs: title = song.get('title', "") artist = song.get('artist', "<artist>") album = song.get('album', "<album>") song_id = song['id'] logger.info(f"{title} -- {artist} -- {album} ({song_id})") if args.use_metadata: if args.use_hash: google_songs = missing_songs if google_songs: logger.success("Comparing metadata") missing_songs = natsorted( gm_utils.find_missing_items( google_songs, local_songs, fields=['artist', 'album', 'title', 'tracknumber'], normalize_values=True ) ) existing_songs = natsorted( gm_utils.find_existing_items( google_songs, local_songs, fields=['artist', 'album', 'title', 'tracknumber'], normalize_values=True ) ) logger.success(f"Found {len(existing_songs)} songs already exist by metadata") if logger._min_level <= 20: for song in existing_songs: title = song.get('title', "<title>") artist = song.get('artist', "<artist>") album = song.get('album', "<album>") song_id = song['id'] logger.info(f"{title} -- {artist} -- {album} ({song_id})") if not args.use_hash and not args.use_metadata: missing_songs = google_songs to_download = natsorted(missing_songs) if not to_download: logger.success("No songs to download") elif args.dry_run: logger.success(f"Found {len(to_download)} songs to download") if logger._min_level <= 20: for song in to_download: title = song.get('title', "<title>") artist = song.get('artist', "<artist>") album = song.get('album', "<album>") song_id = song['id'] logger.info(f"{title} -- {artist} -- {album} ({song_id})") else: download_songs(mm, to_download, template=args.output) mc.logout() mm.logout() logger.success("All done!") def do_quota(args): logger.success("Logging in to Music Manager") mm = google_music.musicmanager(args.username, uploader_id=args.uploader_id) if not mm.is_authenticated: sys.exit("Failed to authenticate Music Manager") uploaded, allowed = mm.quota() logger.success(f"Quota -- {uploaded}/{allowed} ({uploaded / allowed:.2%})") def do_search(args): logger.success("Logging in to Mobile Client") mc = google_music.mobileclient(args.username, device_id=args.device_id) if not mc.is_authenticated: sys.exit("Failed to authenticate Mobile Client") search_results = get_google_songs(mc, filters=args.filters) if any( args.get(option) for option in [ 'created_in', 'created_on', 'created_before', 'created_after', 'modified_in', 'modified_on', 'modified_before', 'modified_after' ] ): search_results = filter_google_dates( search_results, created_in=args.get('created_in'), created_on=args.get('created_on'), created_before=args.get('created_before'), created_after=args.get('created_after'), modified_in=args.get('modified_in'), modified_on=args.get('modified_on'), modified_before=args.get('modified_before'), modified_after=args.get('modified_after') ), search_results = natsorted( search_results, key=lambda song: ( song.get('artist', ''), song.get('album', ''), song.get('trackNumber', 0) ) ) if search_results: result_num = 0 total = len(search_results) pad = len(str(total)) confirm = ( args.yes or input(f"\nDisplay {len(search_results)} results? (y/n) ") in ("y", "Y") ) if confirm: for result in search_results: result_num += 1 title = result.get('title', "<empty>") artist = result.get('artist', "<empty>") album = result.get('album', "<empty>") song_id = result['id'] logger.success( f"{result_num:>{pad}}/{total} {title} -- {artist} -- {album} ({song_id})" ) else: logger.success("No songs found matching query") mc.logout() logger.success("All done!") def do_upload(args): logger.success("Logging in to Music Manager") mm = google_music.musicmanager(args.username, uploader_id=args.uploader_id) if not mm.is_authenticated: sys.exit("Failed to authenticate Music Manager") logger.success("Logging in to Mobile Client") mc = google_music.mobileclient(args.username, device_id=args.device_id) if not mc.is_authenticated: sys.exit("Failed to authenticate Mobile Client") logger.success("Loading local songs") local_songs = get_local_songs( args.include, filters=args.filters, max_depth=args.max_depth, exclude_paths=args.exclude_paths, exclude_regexes=args.exclude_regexes, exclude_globs=args.exclude_globs ) if any( args.get(option) for option in [ 'created_in', 'created_on', 'created_before', 'created_after', 'modified_in', 'modified_on', 'modified_before', 'modified_after' ] ): local_songs = filter_local_dates( local_songs, created_in=args.get('created_in'), created_on=args.get('created_on'), created_before=args.get('created_before'), created_after=args.get('created_after'), modified_in=args.get('modified_in'), modified_on=args.get('modified_on'), modified_before=args.get('modified_before'), modified_after=args.get('modified_after') ) missing_songs = [] if args.use_hash: logger.success("Comparing hashes") existing_songs = [] google_client_ids = {song.get('clientId', '') for song in get_google_songs(mc)} for song in local_songs: if generate_client_id(song) not in google_client_ids: missing_songs.append(song) else: existing_songs.append(song) logger.success(f"Found {len(existing_songs)} songs already exist by audio hash") if logger._min_level <= 20: for song in natsorted(existing_songs): logger.info(song) if args.use_metadata: if args.use_hash: local_songs = missing_songs if local_songs: logger.success("Comparing metadata") google_songs = get_google_songs(mm, filters=args.filters) missing_songs = natsorted( gm_utils.find_missing_items( local_songs, google_songs, fields=['artist', 'album', 'title', 'tracknumber'], normalize_values=True ) ) existing_songs = natsorted( gm_utils.find_existing_items( local_songs, google_songs, fields=['artist', 'album', 'title', 'tracknumber'], normalize_values=True ) ) logger.success(f"Found {len(existing_songs)} songs already exist by metadata") if logger._min_level <= 20: for song in existing_songs: logger.info(song) if not args.use_hash and not args.use_metadata: missing_songs = local_songs to_upload = natsorted(missing_songs) if not to_upload: logger.success("No songs to upload") elif args.dry_run: logger.success(f"Found {len(to_upload)} songs to upload") if logger._min_level <= 20: for song in to_upload: logger.info(song) else: upload_songs( mm, to_upload, album_art=args.album_art, no_sample=args.no_sample, delete_on_success=args.delete_on_success ) mc.logout() mm.logout() logger.success("All done!") PK�������!�A` ��` �����google_music_scripts/config.pyimport sys import time from collections.abc import Mapping from pathlib import Path import appdirs from loguru import logger from tomlkit.toml_document import TOMLDocument from tomlkit.toml_file import TOMLFile from .__about__ import __author__, __title__ from .utils import DictMixin COMMAND_ALIASES = { 'del': 'delete', 'delete': 'del', 'down': 'delete', 'download': 'down', 'up': 'upload', 'upload': 'up' } COMMAND_KEYS = { 'del', 'delete', 'down', 'download', 'quota', 'search', 'up', 'upload' } CONFIG_BASE_PATH = Path(appdirs.user_config_dir(__title__, __author__)) LOG_BASE_PATH = Path(appdirs.user_data_dir(__title__, __author__)) LOG_FORMAT = '<lvl>[{time:YYYY-MM-DD HH:mm:ss}]</lvl> {message}' LOG_DEBUG_FORMAT = LOG_FORMAT logger.level('SUCCESS', no=25, color="<green>") logger.level('FAILURE', no=26, color="<red>") logger.level('INFO', no=20, color="<green><bold>") VERBOSITY_LOG_LEVELS = { 0: "CRITICAL", 1: "ERROR", 2: "WARNING", 3: "FAIL", 4: "SUCCESS", 5: "INFO", 6: "DEBUG", 7: "TRACE" } def convert_default_keys(item): if isinstance(item, Mapping): converted = item.__class__() for k, v in item.items(): converted[k.lstrip('-').replace('-', '_')] = convert_default_keys(v) return converted else: return item def get_defaults(command, *, username=None): config_defaults = read_config_file(username).get('defaults') defaults = DictMixin() if config_defaults: defaults.update( (k, v) for k, v in config_defaults.items() if k not in COMMAND_KEYS ) if command in config_defaults: defaults.update( (k, v) for k, v in config_defaults[command[0]].items() if k not in COMMAND_KEYS ) cmd_alias = COMMAND_ALIASES.get(command) if cmd_alias in config_defaults: defaults.update( (k, v) for k, v in config_defaults[cmd_alias].items() if k not in COMMAND_KEYS ) return convert_default_keys(defaults) def read_config_file(username=None): config_path = CONFIG_BASE_PATH / (username or '') / 'google-music-scripts.toml' config_file = TOMLFile(config_path) try: config = config_file.read() except FileNotFoundError: config = TOMLDocument() write_config_file(config, username=username) return config def write_config_file(config, username=None): config_path = CONFIG_BASE_PATH / (username or '') / 'google-music-scripts.toml' config_path.parent.mkdir(parents=True, exist_ok=True) config_path.touch() config_file = TOMLFile(config_path) config_file.write(config) def ensure_log_dir(username=None): log_dir = LOG_BASE_PATH / (username or '') / 'logs' log_dir.mkdir(parents=True, exist_ok=True) return log_dir def configure_logging(modifier=0, *, username=None, debug=False, log_to_file=False): logger.remove() verbosity = 4 + modifier if verbosity < 0: verbosity = 0 elif verbosity > 7: verbosity = 7 log_level = VERBOSITY_LOG_LEVELS[verbosity] logger.add( sys.stdout, level=log_level, format=LOG_FORMAT, backtrace=False ) if debug: logger.enable('audio_metadata') logger.enable('google_music') logger.enable('google_music-proto') logger.enable('google_music_utils') if log_to_file: log_dir = ensure_log_dir(username=username) log_file = (log_dir / time.strftime('%Y-%m-%d_%H-%M-%S')).with_suffix('.log') logger.success("Logging to file: {}", log_file) logger.add( log_file, level=log_level, format=LOG_FORMAT, backtrace=False ) PK�������!�m����!���google_music_scripts/constants.py__all__ = [ 'CHARACTER_REPLACEMENTS', 'TEMPLATE_PATTERNS', 'UNIX_PATH_RE' ] import re from google_music_utils import CHARACTER_REPLACEMENTS, TEMPLATE_PATTERNS UNIX_PATH_RE = re.compile("^(?:/[^/]+)*/?$") """Regex pattern matching UNIX-style filepaths.""" PK�������!�J�������google_music_scripts/core.pyimport math import platform import re from collections import defaultdict from pathlib import Path import audio_metadata import google_music_utils as gm_utils import pendulum from loguru import logger from .utils import get_album_art_path def download_songs(mm, songs, template=None): logger.success(f"Downloading {len(songs)} songs from Google Music") if not template: template = Path.cwd() songnum = 0 total = len(songs) pad = len(str(total)) for song in songs: songnum += 1 try: audio, _ = mm.download(song) except Exception as e: # TODO: More specific exception. logger.log( 'FAILURE', f"({songnum:>{pad}}/{total}) Failed -- {song} | {e}" ) else: tags = audio_metadata.loads(audio).tags filepath = gm_utils.template_to_filepath(template, tags).with_suffix('.mp3') if filepath.is_file(): filepath.unlink() filepath.parent.mkdir(parents=True, exist_ok=True) filepath.touch() filepath.write_bytes(audio) logger.success( f"({songnum:>{pad}}/{total}) Downloaded -- {filepath} ({song['id']})" ) def filter_google_dates( songs, *, created_in=None, created_on=None, created_before=None, created_after=None, modified_in=None, modified_on=None, modified_before=None, modified_after=None ): matched_songs = songs def _dt_from_gm_timestamp(gm_timestamp): return pendulum.from_timestamp(int(gm_timestamp) // 1000000) def _match_created_date(songs, period): return ( song for song in songs if _dt_from_gm_timestamp(song['creationTimestamp']) in period ) def _match_modified_date(songs, period): return ( song for song in songs if _dt_from_gm_timestamp(song['lastModifiedTimestamp']) in period ) for period in [ created_in, created_on, created_before, created_after ]: if period is not None: matched_songs = _match_created_date(matched_songs, period, ) for period in [ modified_in, modified_on, modified_before, modified_after ]: if period is not None: matched_songs = _match_modified_date(matched_songs, period) return list(matched_songs) def filter_local_dates( filepaths, *, created_in=None, created_on=None, created_before=None, created_after=None, modified_in=None, modified_on=None, modified_before=None, modified_after=None ): matched_filepaths = filepaths def _match_created_date(filepaths, period): for filepath in filepaths: file_stat = filepath.stat() if platform.system() == 'Windows': created_timestamp = file_stat.st_ctime else: try: created_timestamp = file_stat.st_birthtime except AttributeError: # Settle for modified time on *nix systems # not supporting birth time. created_timestamp = file_stat.st_mtime if pendulum.from_timestamp(created_timestamp) in period: yield filepath def _match_modified_date(filepaths, period): for filepath in filepaths: modified_timestamp = filepath.stat().st_mtime if pendulum.from_timestamp(modified_timestamp) in period: yield filepath for period in [ created_in, created_on, created_before, created_after ]: if period is not None: matched_filepaths = _match_created_date(matched_filepaths, period) for period in [ modified_in, modified_on, modified_before, modified_after ]: if period is not None: matched_filepaths = _match_modified_date(matched_filepaths, period) return list(matched_filepaths) def filter_metadata(songs, filters): if filters: logger.success("Filtering songs") matched_songs = [] for filter_ in filters: include_filters = defaultdict(list) exclude_filters = defaultdict(list) for oper, field, value in filter_: if oper in ['+', '']: include_filters[field].append(value) elif oper == '-': exclude_filters[field].append(value) matched = songs # Use all if multiple conditions for inclusion. i_use_all = ( (len(include_filters) > 1) or any( len(v) > 1 for v in include_filters.values() ) ) i_any_all = all if i_use_all else any matched = gm_utils.include_items( matched, any_all=i_any_all, ignore_case=True, **include_filters ) # Use any if multiple conditions for exclusion. e_use_all = not ( (len(exclude_filters) > 1) or any( len(v) > 1 for v in exclude_filters.values() ) ) e_any_all = all if e_use_all else any matched = gm_utils.exclude_items( matched, any_all=e_any_all, ignore_case=True, **exclude_filters ) for song in matched: if song not in matched_songs: matched_songs.append(song) else: matched_songs = songs return matched_songs def get_google_songs(client, *, filters=None): return filter_metadata(client.songs(), filters) def get_local_songs( filepaths, *, filters=None, max_depth=math.inf, exclude_paths=None, exclude_regexes=None, exclude_globs=None ): def _exclude_paths(path, exclude_paths): return any( str(Path(exclude_path)) in str(path) for exclude_path in exclude_paths ) def _exclude_regexes(path, exclude_regexes): return any( re.search(regex, str(path)) for regex in exclude_regexes ) local_songs = [] for filepath in filepaths: if ( _exclude_paths(filepath, exclude_paths) or _exclude_regexes(filepath, exclude_regexes) ): continue if filepath.is_dir(): exclude_files = set() for exclude_glob in exclude_globs: exclude_files |= set(filepath.rglob(exclude_glob)) for path in filepath.glob('**/*'): if ( path.is_file() and path not in exclude_files and not _exclude_paths(path, exclude_paths) and not _exclude_regexes(path, exclude_regexes) ): with path.open('rb') as f: if audio_metadata.determine_format( f.read(4), extension=path.suffix ) is not None: local_songs.append(path) elif filepath.is_file(): with filepath.open('rb') as f: if audio_metadata.determine_format( f.read(4), extension=filepath.suffix ) is not None: local_songs.append(filepath) matched_songs = filter_metadata(local_songs, filters) return matched_songs def upload_songs( mm, filepaths, album_art=None, no_sample=False, delete_on_success=False ): logger.success(f"Uploading {len(filepaths)} songs to Google Music") filenum = 0 total = len(filepaths) pad = len(str(total)) for song in filepaths: filenum += 1 album_art_path = get_album_art_path(song, album_art) result = mm.upload( song, album_art_path=album_art_path, no_sample=no_sample ) if result['reason'] == 'Uploaded': logger.success( f"({filenum:>{pad}}/{total}) Uploaded -- {result['filepath']} ({result['song_id']})" ) elif result['reason'] == 'Matched': logger.success( f"({filenum:>{pad}}/{total}) Matched -- {result['filepath']} ({result['song_id']})" ) else: if 'song_id' in result: logger.success( f"({filenum:>{pad}}/{total}) Already exists -- {result['filepath']} ({result['song_id']})" ) else: logger.log( 'FAILURE' f"({filenum:>{pad}}/{total}) Failed -- {result['filepath']} | {result['reason']}" ) if delete_on_success and 'song_id' in result: try: result['filepath'].unlink() except Exception: logger.warning( f"Failed to remove {result['filepath']} after successful upload" ) PK�������!�69 ��9 �����google_music_scripts/utils.py__all__ = [ 'DictMixin', 'convert_cygwin_path', 'get_album_art_path', 'template_to_base_path' ] import os import subprocess from collections.abc import MutableMapping from pathlib import Path import google_music_utils as gm_utils import pprintpp from loguru import logger class DictMixin(MutableMapping): def __getattr__(self, attr): try: return self.__getitem__(attr) except KeyError: raise AttributeError(attr) from None def __setattr__(self, attr, value): self.__setitem__(attr, value) def __delattr__(self, attr): try: return self.__delitem__(attr) except KeyError: raise AttributeError(attr) from None def __getitem__(self, key): return self.__dict__[key] def __setitem__(self, key, value): self.__dict__[key] = value def __delitem__(self, key): del(self.__dict__[key]) def __missing__(self, key): return KeyError(key) def __iter__(self): return iter(self.__dict__) def __len__(self): return len(self.__dict__) def __repr__(self, repr_dict=None): return f"<{self.__class__.__name__} ({pprintpp.pformat(self.__dict__)})>" def items(self): return self.__dict__.items() def keys(self): return self.__dict__.keys() def values(self): return self.__dict__.values() def convert_cygwin_path(filepath): """Convert Unix filepath string from Cygwin to Windows format. Parameters: filepath (str): A filepath string. Returns: str: A filepath string in Windows format. Raises: FileNotFoundError subprocess.CalledProcessError """ try: win_path = subprocess.run( ["cygpath", "-aw", filepath], check=True, stdout=subprocess.PIPE, universal_newlines=True ).stdout.strip() except (FileNotFoundError, subprocess.CalledProcessError): logger.exception("Call to cygpath failed.") raise return Path(win_path) def get_album_art_path(song, album_art_paths): album_art_path = None if album_art_paths: for path in album_art_paths: if ( path.is_absolute() and path.isfile() ): album_art_path = path break else: path = song.parent / path if path.is_file(): album_art_path = path break return album_art_path def template_to_base_path(template, google_songs): """Get base output path for a list of songs for download.""" path = Path(template) if ( path == Path.cwd() or path == Path('%suggested%') ): base_path = Path.cwd() else: song_paths = [ gm_utils.template_to_filepath(template, song) for song in google_songs ] if song_paths: base_path = Path(os.path.commonpath(song_paths)) else: base_path = Path.cwd() return base_path.resolve() PK������!H.���4���5���google_music_scripts-4.0.0.dist-info/entry_points.txtN+I/N.,()J-MOg&ds2J�PK�������!�ulQ��Q��,���google_music_scripts-4.0.0.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2018-2019 thebigmunch <mail@thebigmunch.me> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK������!HڽT���U���*���google_music_scripts-4.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK������!H"N��&��-���google_music_scripts-4.0.0.dist-info/METADATAWkS8_0lS6^hf"[$N R2%+{S)hRMkMjN:1I$D3[ZYO9u1yt~dqs2DfIDLOQGN cFhuڮ!*@ĮϢ8KTG(4>=ZLOlٙ 1eM ?;1X3c=!}ԝ/ϩRl�YgHM/OH7Mx</LRDƱO(8 dߋwmm!ꁅMBxx}&WgSMhL*kx_jWyB&xT9/|sz-;"?!QG]|b4}\bh/ŭNTHm ,L՝~<:S(S镩/LqoP^]"dVffUIV4ȭ%4)$aƳ- KmN_$R*qM3]4 VŌ}uo HbZ|sm׃n^{_bM=S\C1M,m~䃓>Ͻ8rԄÄ)s:wC˩c(*2r2֥npK> #p˥e[>8btDM.59H 1X\*1k5o*j; -Ă+,O5(P^/ZR$dE2oˊ ~yk=�N4ew)8'r`a8ƌcYQRUahT^)J-#A7D+|$ ᶬ?}`=.{yer"$tᔘ)njc,v8}O.i^>7[P U: RgF%,4yaY0iXs#$$H PMyDfJg~D>>c:~68ٻ9qX!~NTL#g6y7Pd Tgt K' <%C*V3C6 0%جx_ +'}/0oTwp}NSii\$>Zӫ*x0lu{Wk+lX)@7tͰwb'( zVy-5w Q0! Rfu]:wQG!a@o]MQ%&:�oOiR^8"fX)AvyfRqyu:ksu4w&Bil76 2UN{z 5]{Rd5k2>͍qi=V؂.ݪN[zfzЛYzCoHz V}s- PK������!H7œ����+���google_music_scripts-4.0.0.dist-info/RECORDˎ0}ZLiE83!!A &!\}RF?/}*|*G4W~&ysxPSS;t@Jmb>JQ)oǫn']5wVVhZKJEs[pЗڰ ^R;sߣeQ._S#K)[I/$vw:/5܆ f:GNɓsku;{*3c\tIDX ^ܫn$B8(7]tJP�<JjJ >T9% E8#qi2 9eJHm%;#K7J02U:LxsDXG=Cy YfGlGɮ|jD fKw>(dKY.IMĂQy;M6K'.h"4WՑ=-d!_5pЎWZޙ۩r%Q@BRc)i_ rZ$x�#RP#Pf.y_X{4+qˤٿ&u9 P[ӨU[qŴY+|P7]jޞf%U`ȌTF14j# *JQ?�PK��������!�M��M��!���������������google_music_scripts/__about__.pyPK��������!�rA��A�� �������������google_music_scripts/__init__.pyPK��������!�PQ���Q��� ����������� ��google_music_scripts/__main__.pyPK��������!�1H��H���������������google_music_scripts/cli.pyPK��������!�,��,�� �����������M��google_music_scripts/commands.pyPK��������!�A` ��` �������������y��google_music_scripts/config.pyPK��������!�m����!�������������google_music_scripts/constants.pyPK��������!�J���������������ƈ��google_music_scripts/core.pyPK��������!�69 ��9 ���������������google_music_scripts/utils.pyPK�������!H.���4���5�������������google_music_scripts-4.0.0.dist-info/entry_points.txtPK��������!�ulQ��Q��,�������������google_music_scripts-4.0.0.dist-info/LICENSEPK�������!HڽT���U���*�������������google_music_scripts-4.0.0.dist-info/WHEELPK�������!H"N��&��-�������������google_music_scripts-4.0.0.dist-info/METADATAPK�������!H7œ����+�������������google_music_scripts-4.0.0.dist-info/RECORDPK������{��y����