PK@O+cloudsync/__init__.py""" cloudsync enables simple cloud file-level sync with a variety of cloud providers External modules: cloudsync.Event cloudsync.Provider cloudsync.Sync Example: import cloudsync prov = cloudsync.Provider('GDrive', token="237846234236784283") info = prov.upload(file, "/dest") print ("id of /dest is %s, hash of /dest is %s" % (info.id, info.hash)) Command-line example: cloudsync -p gdrive --token "236723782347823642786" -f ~/gdrive-folder --daemon """ __version__ = "0.1.3" # import modules into top level for convenience from .provider import * from .event import * from .sync import * from .exceptions import * from .types import * from .cs import * from .command import main if __name__ == "__main__": main() PKoiO@cloudsync/command.pyimport argparse import os def main(): parser = argparse.ArgumentParser(description='Process some integers.') parser.add_argument('command', type=str, help='Command to run') parser.add_argument('args', nargs=argparse.REMAINDER) args = parser.parse_args() raise NotImplementedError() if __name__ == "__main__": import sys sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) main() PKoiOcloudsync/cs.pyimport threading from typing import Optional, Tuple from .sync import SyncManager, SyncState, Storage from .runnable import Runnable from .event import EventManager from .provider import Provider class CloudSync(Runnable): def __init__(self, providers: Tuple[Provider, Provider], translate, storage: Optional[Storage] = None, label: Optional[str] = None): state = SyncState(storage, label) smgr = SyncManager(state, providers, translate) # for tests, make these accessible self.state = state self.providers = providers self.smgr = smgr self.emgrs = ( EventManager(smgr.providers[0], state, 0), EventManager(smgr.providers[1], state, 1) ) self.sthread = threading.Thread(target=smgr.run) self.ethreads = ( threading.Thread(target=self.emgrs[0].run), threading.Thread(target=self.emgrs[1].run) ) def start(self): self.sthread.start() self.ethreads[0].start() self.ethreads[1].start() def stop(self): self.smgr.stop() self.emgrs[0].stop() self.emgrs[1].stop() # for tests, make this manually runnable def do(self): self.smgr.do() self.emgrs[0].do() self.emgrs[1].do() def done(self): self.smgr.done() self.emgrs[0].done() self.emgrs[1].done() PKoiO cloudsync/event.pyimport logging from dataclasses import dataclass from typing import Optional from .runnable import Runnable from .muxer import Muxer from .types import OType log = logging.getLogger(__name__) @dataclass class Event: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id path: Optional[str] # path hash: Optional[bytes] # fsobject hash (better name: ohash) exists: Optional[bool] mtime: Optional[float] = None prior_oid: Optional[str] = None # path basesd systems use this on renames class EventManager(Runnable): def __init__(self, provider, state, side): self.provider = provider self.events = Muxer(provider.events, restart=True) self.state = state self.side = side def do(self): for event in self.events: log.debug("got event %s", event) path = event.path exists = event.exists otype = event.otype if not event.path and not self.state.lookup_oid(self.side, event.oid): info = self.provider.info_oid(event.oid) if info.otype != event.otype: log.warning("provider gave a bad event: %s != %s, using %s", info.path, event.otype, info.otype) if info: path = info.path otype = info.otype else: log.debug("ignoring delete of something that can't exist") continue self.state.update(self.side, otype, event.oid, path=path, hash=event.hash, exists=exists, prior_oid=event.prior_oid) PKoiOá2;;cloudsync/exceptions.pyclass CloudException(Exception): pass class CloudFileNotFoundError(CloudException): pass class CloudTemporaryError(CloudException): pass class CloudFileExistsError(CloudException): pass class CloudTokenError(CloudException): pass class CloudDisconnectedError(CloudException): pass PKoiOL6gOPPcloudsync/muxer.pyimport queue from threading import Lock from collections import namedtuple class Muxer(): Entry = namedtuple('Entry', 'genref listeners, lock') already = {} top_lock = Lock() def __init__(self, func, restart=False): self.restart = restart self.func = func self.queue = queue.Queue() with self.top_lock: if func not in self.already: self.already[func] = self.Entry([func()], [], Lock()) ent = self.already[func] self.genref = ent.genref self.lock = ent.lock self.listeners = ent.listeners self.listeners.append(self) def __iter__(self): return self def __next__(self): try: e = self.queue.get_nowait() except queue.Empty: with self.lock: try: e = self.queue.get_nowait() except queue.Empty: try: e = next(self.genref[0]) for other in self.listeners: if not other is self: other.queue.put(e) except StopIteration: if self.restart: self.genref[0] = self.func() raise return e def __del__(self): with self.top_lock: try: self.listeners.remove(self) except ValueError: pass if not self.listeners and self.func in self.already: del self.already[self.func] PKoiO_cloudsync/provider.pyfrom abc import ABC, abstractmethod import re from typing import Generator from cloudsync.types import OInfo, DIRECTORY, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError class Provider(ABC): # pylint: disable=too-many-public-methods sep: str = '/' # path delimiter alt_sep: str = '\\' # alternate path delimiter oid_is_path = False case_sensitive = True @abstractmethod def _api(self, *args, **kwargs): ... def connect(self, creds): # pylint: disable=unused-argument # some providers don't need connections, so just don't implement this pass @abstractmethod def events(self): ... @abstractmethod def walk(self, path, since=None): ... @abstractmethod def upload(self, oid, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def create(self, path, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def download(self, oid, file_like): ... @abstractmethod def rename(self, oid, path): ... @abstractmethod def mkdir(self, path) -> str: ... @abstractmethod def delete(self, oid): ... @abstractmethod def exists_oid(self, oid): ... @abstractmethod def exists_path(self, path) -> bool: ... @abstractmethod def listdir(self, oid) -> Generator[DirInfo, None, None]: ... # @abstractmethod # def hash_oid(self, oid) -> Any: # ... @abstractmethod def info_path(self, path) -> OInfo: ... @abstractmethod def info_oid(self, oid) -> OInfo: ... # CONVENIENCE def download_path(self, path, io): info = self.info_path(path) if not info or not info.oid: raise CloudFileNotFoundError() self.download(info.oid, io) # HELPER @classmethod def join(cls, *paths): res = "" for path in paths: if path is None or path == cls.sep: continue if isinstance(path, str): res = res + cls.sep + path.strip(cls.sep) continue for sub_path in path: if sub_path is None or sub_path == cls.sep: continue res = res + cls.sep + sub_path.strip(cls.sep) return res or cls.sep def split(self, path): # todo cache regex index = path.rfind(self.sep) if index == -1 and self.alt_sep: index = path.rfind(self.alt_sep) if index == -1: return path, "" if index == 0: return self.sep, path[index+1:] return path[:index], path[index+1:] def normalize_path(self, path: str): norm_path = path.rstrip(self.sep) if self.sep in ["\\", "/"]: parts = re.split(r'[\\/]+', norm_path) else: parts = re.split(r'[%s]+' % self.sep, norm_path) norm_path = self.join(*parts) return norm_path def is_subpath(self, folder, target, sep=None, anysep=False, strict=False): if sep is None: if anysep: sep = "/" folder = folder.replace("\\", "/") target = target.replace("\\", "/") else: sep = self.sep # Will return True for is-same-path in addition to target folder_full = str(folder) folder_full = folder_full.rstrip(sep) target_full = str(target) target_full = target_full.rstrip(sep) # .lower() instead of normcase because normcase will also mess with separators if not self.case_sensitive: folder_full = folder_full.lower() target_full = target_full.lower() # target is same as folder, or target is a subpath (ensuring separator is there for base) if folder_full == target_full: return False if strict else sep elif len(target_full) > len(folder_full) and \ target_full[len(folder_full)] == sep: if target_full.startswith(folder_full): return target_full.replace(folder_full, "", 1) else: return False return False def replace_path(self, path, from_dir, to_dir): relative = self.is_subpath(from_dir, path) if relative: return to_dir + relative raise ValueError("replace_path used without subpath") def paths_match(self, patha, pathb): pass def dirname(self, path: str): norm_path = self.normalize_path(path).lstrip(self.sep) parts = re.split(r'[%s]+' % self.sep, norm_path) retval = self.join(*parts[0:-1]) return retval def _verify_parent_folder_exists(self, path): parent_path = self.dirname(path) if parent_path != self.sep: parent_obj = self.info_path(parent_path) if parent_obj is None: # perhaps this should separate "FileNotFound" and "non-folder parent exists" # and raise different exceptions raise CloudFileNotFoundError(parent_path) if parent_obj.otype != DIRECTORY: raise CloudFileExistsError(parent_path) PKoiO3cloudsync/runnable.pyimport time from abc import ABC, abstractmethod import threading import logging log = logging.getLogger(__name__) def time_helper(timeout, sleep=None, multiply=1): forever = not timeout end = forever or time.monotonic() + timeout while forever or end >= time.monotonic(): yield True if sleep is not None: time.sleep(sleep) sleep = sleep * multiply class Runnable(ABC): def run(self, *, timeout=None, until=None, sleep=0.01): self.stopped = False # pylint: disable=attribute-defined-outside-init for _ in time_helper(timeout, sleep=sleep): if self.stopped or (until is not None and until()): break try: self.do() except Exception: log.exception("unhandled exception in %s", self.__class__) if self.stopped: self.done() @abstractmethod def do(self): ... def stop(self): self.stopped = True # pylint: disable=attribute-defined-outside-init def done(self): pass def test_runnable(): class TestRun(Runnable): def __init__(self): self.cleaned = False self.called = 0 def do(self): self.called += 1 def done(self): self.cleaned = True testrun = TestRun() testrun.run(timeout=0.02, sleep=0.001) assert testrun.called testrun.called = 0 testrun.run(until=lambda: testrun.called == 1) assert testrun.called == 1 thread = threading.Thread(target=testrun.run) thread.start() testrun.stop() thread.join(timeout=1) assert testrun.stopped == 1 PKoiOYQQcloudsync/types.pyfrom typing import Optional from dataclasses import dataclass from enum import Enum class OType(Enum): DIRECTORY = "dir" FILE = "file" DIRECTORY = OType.DIRECTORY FILE = OType.FILE @dataclass class OInfo: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id hash: Optional[bytes] # fsobject hash (better name: ohash) path: Optional[str] # path @dataclass class DirInfo(OInfo): name: Optional[str] = None mtime: Optional[float] = None PKiVOw.HHcloudsync/providers/__init__.pyfrom .gdrive import GDriveProvider from .dropbox import DropboxProvider PKoiOJ=EEcloudsync/providers/dropbox.pyimport io import time import logging import threading from typing import Generator, Optional import requests import arrow import dropbox from dropbox import Dropbox, exceptions, files from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, \ CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError log = logging.getLogger(__name__) class _FolderIterator: def __init__(self, api, path, *, recursive, cursor=None): self.api = api self.path = path self.ls_res = None if not cursor: self.ls_res = self.api('files_list_folder', path=self.path, recursive=recursive, limit=200) else: self.ls_res = self.api('files_list_folder_continue', cursor) def __iter__(self): return self def __next__(self): if self.ls_res: if not self.ls_res.entries and self.ls_res.has_more: self.ls_res = self.api( 'files_list_folder_continue', self.ls_res.cursor) if self.ls_res and self.ls_res.entries: ret = self.ls_res.entries.pop() ret.cursor = self.ls_res.cursor return ret raise StopIteration() @property def cursor(self): return self.ls_res and self.ls_res.cursor class DropboxProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False _max_simple_upload_size = 15 * 1024 * 1024 _upload_block_size = 10 * 1024 * 1024 def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() @property def connected(self): return self.client is not None def get_quota(self): space_usage = self._api('users_get_space_usage') account = self._api('users_get_current_account') if space_usage.allocation.is_individual(): used = space_usage.used allocated = space_usage.allocation.get_individual().allocated else: team_allocation = space_usage.allocation.get_team() used, allocated = team_allocation.used, team_allocation.allocated res = { 'used': used, 'total': allocated, 'login': account.email, 'uid': account.account_id[len('dbid:'):] } return res def connect(self, creds): log.debug('Connecting to dropbox') if not self.client: self.api_key = creds.get('key', self.api_key) if not self.api_key: raise CloudTokenError() self.client = Dropbox(self.api_key) try: self.get_quota() except exceptions.AuthError: self.disconnect() raise CloudTokenError() except Exception as e: log.debug("error connecting %s", e) self.disconnect() raise CloudDisconnectedError() def _api(self, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") log.debug("_api: %s (%s %s)", method, args, kwargs) with self.mutex: try: return getattr(self.client, method)(*args, **kwargs) except exceptions.ApiError as e: if isinstance(e.error, (files.ListFolderError, files.GetMetadataError, files.ListRevisionsError)): if e.error.is_path() and isinstance(e.error.get_path(), files.LookupError): inside_error: files.LookupError = e.error.get_path() if inside_error.is_malformed_path(): log.debug('Malformed path when executing %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'Malformed path when executing %s(%s)' % (method, kwargs)) if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.DeleteError): if e.error.is_path_lookup(): inside_error: files.LookupError = e.error.get_path_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.RelocationError): if e.error.is_from_lookup(): inside_error: files.LookupError = e.error.get_from_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if e.error.is_to(): inside_error: files.WriteError = e.error.get_to() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) log.debug("here") if isinstance(e.error, files.CreateFolderError): if e.error.is_path() and isinstance(e.error.get_path(), files.WriteError): inside_error: files.WriteError = e.error.get_path() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) except (exceptions.InternalServerError, exceptions.RateLimitError, requests.exceptions.ReadTimeout): raise CloudTemporaryError() except dropbox.stone_validators.ValidationError as e: log.debug("f*ed up api error: %s", e) if "never created" in str(e): raise CloudFileNotFoundError() if "did not match" in str(e): log.warning("oid error %s", e) raise CloudFileNotFoundError() raise except requests.exceptions.ConnectionError as e: log.exception('api error handled exception %s:%s', "dropbox", e.__class__.__name__) self.disconnect() raise CloudDisconnectedError() @property def root_id(self): return "" def disconnect(self): self.client = None @property def cursor(self): if not self.__cursor: res = self._api('files_list_folder_get_latest_cursor', self.root_id, recursive=True, include_deleted=True, limit=200) self.__cursor = res.cursor return self.__cursor def _events(self, cursor, path=None): if path and path != "/": info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) oid = info.oid else: oid = self.root_id for res in _FolderIterator(self._api, oid, recursive=True, cursor=cursor): exists = True log.debug("event %s", res) if isinstance(res, files.DeletedMetadata): # dropbox doesn't give you the id that was deleted # we need to get the ids of every revision # then find out which one was the latest before the deletion time # then get the oid for that revs = self._api('files_list_revisions', res.path_lower, limit=10) if revs is None: # dropbox will give a 409 conflict if the revision history was deleted # instead of raising an error, this gets converted to revs==None log.info("revs is none for %s %s", oid, path) continue log.debug("revs %s", revs) deleted_time = revs.server_deleted latest_time = None for ent in revs.entries: if ent.server_modified <= deleted_time and \ (latest_time is None or ent.server_modified >= latest_time): oid = ent.id latest_time = ent.server_modified if not oid: log.error( "skipping deletion %s, because we don't know the oid", res) continue exists = False otype = None ohash = None elif isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None oid = res.id else: otype = FILE ohash = res.content_hash oid = res.id path = res.path_display event = Event(otype, oid, path, ohash, exists, time.time()) yield event if getattr(res, "cursor", False): self.__cursor = res.cursor def events(self): # pylint: disable=too-many-locals yield from self._events(self.cursor) def walk(self, path, since=None): yield from self._events(None, path=path) def listdir(self, oid) -> Generator[DirInfo, None, None]: yield from self._listdir(oid, recursive=False) def _listdir(self, oid, *, recursive) -> Generator[DirInfo, None, None]: info = self.info_oid(oid) for res in _FolderIterator(self._api, oid, recursive=recursive): if isinstance(res, files.DeletedMetadata): continue if isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None else: otype = FILE ohash = res.content_hash path = res.path_display oid = res.id relative = self.is_subpath(info.path, path).lstrip("/") if relative: yield DirInfo(otype, oid, ohash, path, name=relative) def create(self, path: str, file_like, metadata=None) -> OInfo: self._verify_parent_folder_exists(path) if self.exists_path(path): raise CloudFileExistsError(path) return self._upload(path, file_like, metadata) def upload(self, oid: str, file_like, metadata=None) -> OInfo: if oid.startswith(self.sep): raise CloudFileNotFoundError("Called upload with a path instead of an OID: %s" % oid) if not self.exists_oid(oid): raise CloudFileNotFoundError(oid) return self._upload(oid, file_like, metadata) def _upload(self, oid, file_like, metadata=None) -> OInfo: res = None metadata = metadata or {} file_like.seek(0, io.SEEK_END) size = file_like.tell() file_like.seek(0) if size < self._max_simple_upload_size: res = self._api('files_upload', file_like.read(), oid, mode=files.WriteMode('overwrite')) else: cursor = None while True: data = file_like.read(self._upload_block_size) if not data: if cursor: local_mtime = arrow.get(metadata.get('mtime', time.time())).datetime commit = files.CommitInfo(path=oid, mode=files.WriteMode.overwrite, autorename=False, client_modified=local_mtime, mute=True) res = self._api( 'files_upload_session_finish', data, cursor, commit ) break if not cursor: self._api('files_upload_session_start', data) cursor = files.UploadSessionCursor( res.session_id, len(data)) else: self._api('files_upload_session_append_v2', data, cursor) cursor.offset += len(data) if res is None: raise CloudFileExistsError() return OInfo(otype=FILE, oid=res.id, hash=res.content_hash, path=res.path_display) def download(self, oid, file_like): ok = self._api('files_download', oid) if not ok: raise CloudFileNotFoundError() res, content = ok for data in content.iter_content(self._upload_block_size): file_like.write(data) return OInfo(otype=FILE, oid=oid, hash=res.content_hash, path=res.path_display) def _attempt_rename_folder_over_empty_folder(self, info: OInfo, path) -> None: if info.otype != DIRECTORY: raise CloudFileExistsError(path) possible_conflict = self.info_path(path) if possible_conflict.otype == DIRECTORY: try: next(self._listdir(possible_conflict.oid, recursive=False)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: pass # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) self._api('files_move_v2', info.oid, path) return else: # conflict is a file, and we already know that the rename is on a folder raise CloudFileExistsError(path) def rename(self, oid, path): try: self._api('files_move_v2', oid, path) except CloudFileExistsError: info = self.info_oid(oid) if info.otype == DIRECTORY: self._attempt_rename_folder_over_empty_folder(info, path) else: raise def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ, unused-argument # TODO: check if a regular filesystem lets you mkdir over a non-empty folder... self._verify_parent_folder_exists(path) if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError() log.debug("Skipped creating already existing folder: %s", path) return info.oid res = self._api('files_create_folder_v2', path) log.debug("dbx mkdir %s", res) res = res.metadata return res.id def delete(self, oid): info = self.info_oid(oid) if not info: return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self._listdir(oid, recursive=False)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files_delete_v2', oid) except CloudFileNotFoundError: # shouldn't happen because we are checking above... return def exists_oid(self, oid) -> bool: return bool(self.info_oid(oid)) def info_path(self, path) -> Optional[OInfo]: if path == "/": return OInfo(DIRECTORY, "", None, "/") try: log.debug("res info path %s", path) res = self._api('files_get_metadata', path) log.debug("res info path %s", res) oid = res.id if oid[0:3] != 'id:': log.warning("invalid oid %s from path %s", oid, path) if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash return OInfo(otype, oid, fhash, path) except CloudFileNotFoundError: return None def exists_path(self, path) -> bool: return self.info_path(path) is not None def info_oid(self, oid) -> Optional[OInfo]: if oid == "": otype = DIRECTORY fhash = None path = "/" else: try: res = self._api('files_get_metadata', oid) log.debug("res info oid %s", res) path = res.path_display if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash except CloudFileNotFoundError: return None return OInfo(otype, oid, fhash, path) PKoiO37W7Wcloudsync/providers/gdrive.pyimport time import logging import threading from ssl import SSLError import json from typing import Generator, Optional import arrow from apiclient.discovery import build # pylint: disable=import-error from apiclient.errors import HttpError # pylint: disable=import-error from httplib2 import Http, HttpLib2Error from oauth2client import client # pylint: disable=import-error from oauth2client.client import HttpAccessTokenRefreshError # pylint: disable=import-error from googleapiclient.http import _should_retry_response # This is necessary because google masks errors from apiclient.http import MediaIoBaseDownload, MediaIoBaseUpload # pylint: disable=import-error from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError log = logging.getLogger(__name__) class GDriveInfo(DirInfo): # pylint: disable=too-few-public-methods pids = [] def __init__(self, *a, pids=None, **kws): super().__init__(*a, **kws) if pids is None: pids = [] self.pids = pids class GDriveProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = True require_parent_folder = True _scope = "https://www.googleapis.com/auth/drive" _redir = 'urn:ietf:wg:oauth:2.0:oob' _token_uri = 'https://accounts.google.com/o/oauth2/token' _folder_mime_type = 'application/vnd.google-apps.folder' _io_mime_type = 'application/octet-stream' def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.refresh_token = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() self._ids = {"/": "root"} self._trashed_ids = {} @property def connected(self): return self.client is not None def get_quota(self): # https://developers.google.com/drive/api/v3/reference/about res = self._api('about', 'get', fields='storageQuota,user') quota = res['storageQuota'] user = res['user'] usage = int(quota['usage']) if 'limit' in quota and quota['limit']: limit = int(quota['limit']) else: # It is possible for an account to have unlimited space - pretend it's 1TB limit = 1024 * 1024 * 1024 * 1024 res = { 'used': usage, 'total': limit, 'login': user['emailAddress'], 'uid': user['permissionId'] } return res def connect(self, creds): log.debug('Connecting to googledrive') if not self.client: api_key = creds.get('api_key', self.api_key) refresh_token = creds.get('refresh_token', self.refresh_token) kwargs = {} try: with self.mutex: creds = client.GoogleCredentials(access_token=api_key, client_id=creds.get( 'client_id'), client_secret=creds.get( 'client_secret'), refresh_token=refresh_token, token_expiry=None, token_uri=self._token_uri, user_agent=self.user_agent) creds.refresh(Http()) self.client = build( 'drive', 'v3', http=creds.authorize(Http())) kwargs['api_key'] = creds.access_token if getattr(creds, 'refresh_token', None): refresh_token = creds.refresh_token self.refresh_token = refresh_token self.api_key = api_key try: self.get_quota() except SSLError: # pragma: no cover # Seeing some intermittent SSL failures that resolve on retry log.warning('Retrying intermittent SSLError') self.get_quota() except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() return self.client def _api(self, resource, method, *args, **kwargs): # pylint: disable=arguments-differ if not self.client: raise CloudDisconnectedError("currently disconnected") with self.mutex: try: res = getattr(self.client, resource)() meth = getattr(res, method)(*args, **kwargs) return meth.execute() except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() except HttpError as e: # gets a default something (actually the message, not the reason) using their secret interface reason = e._get_reason() # pylint: disable=protected-access # parses the JSON of the content to get the reason from where it really lives in the content try: # this code was copied from googleapiclient/http.py:_should_retry_response() data = json.loads(e.content.decode('utf-8')) if isinstance(data, dict): reason = data['error']['errors'][0]['reason'] else: reason = data[0]['error']['errors']['reason'] except (UnicodeDecodeError, ValueError, KeyError): log.warning('Invalid JSON content from response: %s', e.content) if str(e.resp.status) == '404': raise CloudFileNotFoundError('File not found when executing %s.%s(%s)' % ( resource, method, kwargs )) if str(e.resp.status) == '403' and str(reason) == 'parentNotAFolder': raise CloudFileExistsError("Parent Not A Folder") if (str(e.resp.status) == '403' and reason in ('userRateLimitExceeded', 'rateLimitExceeded', )) or str(e.resp.status) == '429': raise CloudTemporaryError("rate limit hit") # At this point, _should_retry_response() returns true for error codes >=500, 429, and 403 with # the reason 'userRateLimitExceeded' or 'rateLimitExceeded'. 403 without content, or any other # response is not retried. We have already taken care of some of those cases above, but we call this below # to catch the rest, and in case they improve their library with more conditions. If we called # meth.execute() above with a num_retries argument, all this retrying would happen in the google api # library, and we wouldn't have to think about retries. should_retry = _should_retry_response(e.resp.status, e.content) if should_retry: raise CloudTemporaryError("unknown error %s" % e) raise Exception("unknown error %s" % e) except (TimeoutError, HttpLib2Error): self.disconnect() raise CloudDisconnectedError("disconnected on timeout") @property def root_id(self): if not self.__root_id: res = self._api('files', 'get', fileId='root', fields='id', ) self.__root_id = res['id'] self._ids['/'] = self.__root_id return self.__root_id def disconnect(self): self.client = None @property def cursor(self): if not self.__cursor: res = self._api('changes', 'getStartPageToken') self.__cursor = res.get('startPageToken') return self.__cursor def events(self): # pylint: disable=too-many-locals page_token = self.cursor while page_token is not None: # log.debug("looking for events, timeout: %s", timeout) response = self._api('changes', 'list', pageToken=page_token, spaces='drive', includeRemoved=True, includeItemsFromAllDrives=True, supportsAllDrives=True) for change in response.get('changes'): log.debug("got event %s", change) # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T16:57:06.779Z', # 'removed': False, 'fileId': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'file': {'kind': 'drive#file', # 'id': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'name': 'dest', 'mimeType': 'application/octet-stream'}} # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T20:02:14.156Z', # 'removed': True, 'fileId': '1lhRe0nDplA6I5JS18642rg0KIbYN66lR'} ts = arrow.get(change.get('time')).float_timestamp oid = change.get('fileId') exists = not change.get('removed') fil = change.get('file') if fil: if fil.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE else: otype = None ohash = None path = self._path_oid(oid) event = Event(otype, oid, path, ohash, exists, ts) log.debug("converted event %s as %s", change, event) yield event page_token = response.get('nextPageToken') if 'newStartPageToken' in response: self.__cursor = response.get('newStartPageToken') def _walk(self, oid): for ent in self.listdir(oid): event = Event(ent.otype, ent.oid, ent.path, None, True, time.time()) log.debug("walk %s", event) yield event if ent.otype == DIRECTORY: if self.exists_oid(ent.oid): yield from self._walk(ent.oid) def walk(self, path, since=None): info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) yield from self._walk(info.oid) def __prep_upload(self, path, metadata): # modification time mtime = metadata.get("modifiedTime", time.time()) mtime = arrow.get(mtime).isoformat() gdrive_info = { 'modifiedTime': mtime } # mime type, if provided mime_type = metadata.get("mimeType", None) if mime_type: gdrive_info['mimeType'] = mime_type # path, if provided if path: _, name = self.split(path) gdrive_info['name'] = name # misc properties, if provided app_props = metadata.get("appProperties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['appProperties'] = app_props # misc properties, if provided app_props = metadata.get("properties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['properties'] = app_props log.debug("info %s", gdrive_info) return gdrive_info def upload(self, oid, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(None, metadata) ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' res = self._api('files', 'update', body=gdrive_info, fileId=oid, media_body=ul, fields=fields) log.debug("response from upload %s", res) if not res: raise CloudTemporaryError("unknown response from drive on upload") md5 = res.get('md5Checksum', None) # can be none if the user tries to upload to a folder if md5 is None: possible_conflict = self._info_oid(oid) if possible_conflict and possible_conflict.otype == DIRECTORY: raise CloudFileExistsError("Can only upload to a file: %s" % possible_conflict.path) return OInfo(otype=FILE, oid=res['id'], hash=md5, path=None) def create(self, path, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(path, metadata) if self.exists_path(path): raise CloudFileExistsError() ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' parent_oid = self.get_parent_id(path) gdrive_info['parents'] = [parent_oid] res = self._api('files', 'create', body=gdrive_info, media_body=ul, fields=fields) log.debug("response from create %s : %s", path, res) if not res: raise CloudTemporaryError("unknown response from drive on upload") self._ids[path] = res['id'] log.debug("path cache %s", self._ids) return OInfo(otype=FILE, oid=res['id'], hash=res['md5Checksum'], path=path) def download(self, oid, file_like): req = self.client.files().get_media(fileId=oid) dl = MediaIoBaseDownload(file_like, req, chunksize=4 * 1024 * 1024) done = False while not done: try: _, done = dl.next_chunk() except HttpError as e: if str(e.resp.status) == '416': log.debug("empty file downloaded") done = True elif str(e.resp.status) == '404': raise CloudFileNotFoundError("file %s not found" % oid) else: raise CloudTemporaryError("unknown response from drive") except (TimeoutError, HttpLib2Error): self.disconnect() raise CloudDisconnectedError("disconnected during download") def rename(self, oid, path): # pylint: disable=too-many-locals pid = self.get_parent_id(path) add_pids = [pid] if pid == 'root': add_pids = [self.root_id] info = self._info_oid(oid) if info is None: raise CloudFileNotFoundError(oid) remove_pids = info.pids old_path = info.path _, name = self.split(path) body = {'name': name} if self.exists_path(path): possible_conflict = self.info_path(path) if FILE in (info.otype, possible_conflict.otype): raise CloudFileExistsError(path) # because of the preceding if, we know that the source and target must both be folders try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: pass # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) if not old_path: for cpath, coid in list(self._ids.items()): if coid == oid: old_path = cpath if not old_path: old_path = self._path_oid(oid) self._api('files', 'update', body=body, fileId=oid, addParents=add_pids, removeParents=remove_pids, fields='id') for cpath, coid in list(self._ids.items()): if self.is_subpath(old_path, cpath): new_cpath = self.replace_path(cpath, old_path, path) self._ids.pop(cpath) self._ids[new_cpath] = oid log.debug("renamed %s", body) def listdir(self, oid) -> Generator[GDriveInfo, None, None]: query = f"'{oid}' in parents" try: res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, name, mimeType, trashed)', pageToken=None) except CloudFileNotFoundError: if self._info_oid(oid): return log.debug("listdir oid gone %s", oid) raise if not res or not res['files']: if self.exists_oid(oid): return raise CloudFileNotFoundError(oid) log.debug("listdir got res %s", res) for ent in res['files']: fid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent['name'] trashed = ent.get('trashed', False) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE if not trashed: yield GDriveInfo(otype, fid, fhash, None, pids=pids, name=name) def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError(path) log.debug("Skipped creating already existing folder: %s", path) return info.oid pid = self.get_parent_id(path) _, name = self.split(path) file_metadata = { 'name': name, 'parents': [pid], 'mimeType': self._folder_mime_type, } if metadata: file_metadata.update(metadata) res = self._api('files', 'create', body=file_metadata, fields='id') fileid = res.get('id') self._ids[path] = fileid return fileid def delete(self, oid): info = self.info_oid(oid) if not info: log.debug("deleted non-existing oid %s", oid) return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self.listdir(oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files', 'delete', fileId=oid) except CloudFileNotFoundError: log.debug("deleted non-existing oid %s", oid) for currpath, curroid in list(self._ids.items()): if curroid == oid: self._trashed_ids[currpath] = self._ids[currpath] del self._ids[currpath] def exists_oid(self, oid): return self._info_oid(oid) is not None def info_path(self, path) -> OInfo: if path == "/": return self.info_oid(self.root_id) try: parent_id = self.get_parent_id(path) _, name = self.split(path) query = f"'{parent_id}' in parents and name='{name}'" res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, mimeType)', pageToken=None) except CloudFileNotFoundError: return None if not res['files']: return None ent = res['files'][0] log.debug("res is %s", res) oid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE self._ids[path] = oid return GDriveInfo(otype, oid, fhash, path, pids=pids) def exists_path(self, path) -> bool: if path in self._ids: return True return self.info_path(path) is not None def get_parent_id(self, path): if not path: return None parent, _ = self.split(path) if parent == path: return self._ids.get(parent) if not self.exists_path(parent): raise CloudFileNotFoundError("parent %s must exist" % parent) return self._ids[parent] def _path_oid(self, oid, info=None) -> str: "convert oid to path" for p, pid in self._ids.items(): if pid == oid: return p for p, pid in self._trashed_ids.items(): if pid == oid: return p # todo, better cache, keep up to date, etc. if not info: info = self._info_oid(oid) if info and info.pids and info.name: ppath = self._path_oid(info.pids[0]) if ppath: path = self.join(ppath, info.name) self._ids[path] = oid return path return None def info_oid(self, oid) -> OInfo: info = self._info_oid(oid) if info is None: return None # expensive path = self._path_oid(oid, info) ret = OInfo(info.otype, info.oid, info.hash, path) log.debug("info oid ret: %s", ret) return ret def _info_oid(self, oid) -> Optional[GDriveInfo]: try: res = self._api('files', 'get', fileId=oid, fields='name, md5Checksum, parents, mimeType', ) except CloudFileNotFoundError: return None log.debug("info oid %s", res) pids = res.get('parents') fhash = res.get('md5Checksum') name = res.get('name') if res.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE return GDriveInfo(otype, oid, fhash, None, pids=pids, name=name) PKiVO_lcloudsync/sync/__init__.py__all__ = ['SyncManager', 'SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] from .manager import * from .state import * PKoiOq_XIXIcloudsync/sync/manager.pyimport os import logging import tempfile import shutil from typing import Tuple from cloudsync.provider import Provider __all__ = ['SyncManager'] from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError from cloudsync.types import DIRECTORY, FILE from cloudsync.runnable import Runnable from .state import SyncState, SyncEntry, TRASHED, EXISTS, LOCAL, REMOTE from .util import debug_sig log = logging.getLogger(__name__) # useful for converting oids and pointer nubmers into digestible nonces FINISHED = 1 REQUEUE = 0 def other_side(index): return 1-index class SyncManager(Runnable): def __init__(self, state, providers: Tuple[Provider, Provider], translate): self.state: SyncState = state self.providers = providers self.providers[LOCAL].debug_name = "local" self.providers[REMOTE].debug_name = "remote" self.translate = translate self.tempdir = tempfile.mkdtemp(suffix=".cloudsync") assert len(self.providers) == 2 def do(self): sync: SyncEntry = self.state.change() if sync: try: self.sync(sync) self.state.storage_update(sync) except Exception as e: log.exception( "exception %s[%s] while processing %s", type(e), e, sync) def done(self): log.info("cleanup %s", self.tempdir) shutil.rmtree(self.tempdir) def get_latest_state(self, ent): log.debug("before update state %s", ent) for i in (LOCAL, REMOTE): if not ent[i].changed: continue # get latest info from provider if ent[i].otype == FILE: ent[i].hash = self.providers[i].hash_oid(ent[i].oid) ent[i].exists = EXISTS if ent[i].hash else TRASHED else: ent[i].exists = self.providers[i].exists_oid(ent[i].oid) log.debug("after update state %s", self) def sync(self, sync): self.get_latest_state(sync) if sync.hash_conflict(): self.handle_hash_conflict(sync) return if sync.path_conflict(): self.handle_path_conflict(sync) return for i in (LOCAL, REMOTE): if sync[i].changed: response = self.embrace_change(sync, i, other_side(i)) if response == FINISHED: self.finished(i, sync) break log.debug("table\n%s", self.state.pretty_print()) def temp_file(self, ohash): # prefer big random name over NamedTemp which can infinite loop in odd situations! # Not a fan of importing Provider into sync.py for this... return Provider.join(self.tempdir, ohash) def finished(self, side, sync): sync[side].changed = None self.state.finished(sync) if sync.temp_file: try: os.unlink(sync.temp_file) except FileNotFoundError: pass except OSError: log.debug("exception unlinking %s", e) except Exception as e: # any exceptions here are pointless log.warning("exception unlinking %s", e) sync.temp_file = None def download_changed(self, changed, sync): sync.temp_file = sync.temp_file or self.temp_file( str(sync[changed].hash)) assert sync[changed].oid if os.path.exists(sync.temp_file): return True try: self.providers[changed].download( sync[changed].oid, open(sync.temp_file + ".tmp", "wb")) os.rename(sync.temp_file + ".tmp", sync.temp_file) return True except CloudFileNotFoundError: log.debug("download from %s failed fnf, switch to not exists", self.providers[changed].debug_name) sync[changed].exists = TRASHED return False def mkdirs(self, prov, path): log.debug("mkdirs %s", path) try: oid = prov.mkdir(path) # todo update state except CloudFileExistsError: # todo: mabye CloudFileExistsError needs to have an oid and/or path in it # at least optionally info = prov.info_path(path) if info: oid = info.oid else: raise except CloudFileNotFoundError: ppath, _ = prov.split(path) if ppath == path: raise log.debug("mkdirs parent, %s", ppath) oid = self.mkdirs(prov, ppath) try: oid = prov.mkdir(path) # todo update state except CloudFileNotFoundError: raise CloudFileExistsError("f'ed up mkdir") return oid def mkdir_synced(self, changed, sync, translated_path): synced = other_side(changed) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] if ents: for ent in ents: if ent[changed].otype == DIRECTORY: # these we can toss, they are other folders # keep the current one, since it exists for sure log.debug("discard %s", ent) ent.discard() self.state.storage_update(ent) ents = [ent for ent in ents if not ent.discarded] ents = [ent for ent in ents if TRASHED not in ( ent[changed].exists, ent[synced].exists)] if ents: raise NotImplementedError( "What to do if we create a folder when there's already a FILE") try: log.debug("translated %s as path %s", sync[changed].path, translated_path) # could have made a dir that already existed on my side or other side chents = list(self.state.lookup_path(changed, sync[changed].path)) syents = list(self.state.lookup_path(synced, translated_path)) ents = chents + syents notme_ents = [ent for ent in ents if ent != sync] conflicts = [] for ent in notme_ents: # any dup dirs on either side can be ignored if ent[synced].otype == DIRECTORY: log.debug("discard duplicate dir entry, caused by a mkdirs %s", ent) ent.discard() self.state.storage_update(ent) else: conflicts.append(ent) # if a file exists with the same name conflicts = [ent for ent in notme_ents if ent[synced].exists != TRASHED] if conflicts: log.warning("mkdir conflict %s letting other side handle it", sync) return # make the dir oid = self.mkdirs(self.providers[synced], translated_path) log.debug("mkdir %s as path %s oid %s", self.providers[synced].debug_name, translated_path, debug_sig(oid)) # did i already have that oid? if so, chuck it already_dir = self.state.lookup_oid(synced, oid) if already_dir and already_dir != sync and already_dir[synced].otype == DIRECTORY: log.debug("discard %s", already_dir) already_dir.discard() sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.state.update_entry( sync, synced, exists=True, oid=oid, path=translated_path) except CloudFileNotFoundError: log.debug("mkdir %s : %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].debug_name, translated_path) raise NotImplementedError("TODO mkdir, and make state etc") def upload_synced(self, changed, sync): synced = other_side(changed) try: info = self.providers[synced].upload( sync[synced].oid, open(sync.temp_file, "rb")) log.debug("upload to %s as path %s", self.providers[synced].debug_name, sync[synced].sync_path) sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = sync[synced].path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.state.update_entry( sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path) except CloudFileNotFoundError: log.debug("upload to %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].debug_name) raise NotImplementedError("TODO mkdir, and make state etc") except CloudFileExistsError: # this happens if the remote oid is a folder log.debug("split bc upload to folder") defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def _create_synced(self, changed, sync, translated_path): synced = other_side(changed) log.debug("create on %s as path %s", self.providers[synced].debug_name, translated_path) info = self.providers[synced].create( translated_path, open(sync.temp_file, "rb")) log.debug("created %s", info) sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = translated_path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.state.update_entry( sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path, hash=info.hash) def create_synced(self, changed, sync, translated_path): synced = other_side(changed) try: self._create_synced(changed, sync, translated_path) return FINISHED except CloudFileNotFoundError: log.debug("can't create %s, try mkdirs", translated_path) parent, _ = self.providers[synced].split(translated_path) self.mkdirs(self.providers[synced], parent) self._create_synced(changed, sync, translated_path) return FINISHED except CloudFileExistsError: # there's a folder in the way, let that resolve later log.debug("can't create %s, try punting", translated_path) if sync.punted > 1: self.rename_to_fix_conflict(sync, changed, synced) else: sync.punt() return REQUEUE def delete_synced(self, sync, changed, synced): log.debug("try sync deleted %s", sync[changed].path) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] if not ents: if sync[synced].oid: try: self.providers[synced].delete(sync[synced].oid) except CloudFileNotFoundError: pass else: log.debug("was never synced, ignoring deletion") sync[synced].exists = TRASHED sync.discard() else: has_log = False for ent in ents: if ent.is_creation(changed): log.debug("discard delete, pending create %s", sync) has_log = True if not has_log: log.warning("conflict delete %s <-> %s", ents, sync) log.debug("discard %s", sync) sync.discard() def check_disjoint_create(self, sync, changed, synced, translated_path): # check for creation of a new file with another in the table if sync[changed].otype != FILE: return False ents = list(self.state.lookup_path(synced, translated_path)) # filter for exists other_ents = [ent for ent in ents if ent != sync] if not other_ents: return False log.debug("found matching %s other ents %s", translated_path, other_ents) # ignoring trashed entries with different oids on the same path if all(TRASHED in (ent[synced].exists, ent[changed].exists) for ent in other_ents): return False other_untrashed_ents = [ent for ent in other_ents if TRASHED not in ( ent[synced].exists, ent[changed].exists)] assert len(other_untrashed_ents) == 1 log.debug("split conflict found") self.handle_split_conflict( other_untrashed_ents[0], synced, sync, changed) return True def handle_path_change_or_creation(self, sync, changed, synced): # pylint: disable=too-many-branches if not sync[changed].path: self.update_sync_path(sync, changed) log.debug("NEW SYNC %s", sync) if sync[changed].exists == TRASHED or sync.discarded: log.debug("requeue trashed event %s", sync) return REQUEUE translated_path = self.translate(synced, sync[changed].path) if translated_path is None: # ignore these return FINISHED if not sync[changed].path: log.debug("can't sync, no path %s", sync) if sync.is_creation(changed): # never synced this before, maybe there's another local path with # the same name already? if self.check_disjoint_create(sync, changed, synced, translated_path): return REQUEUE if sync.is_creation(changed): assert not sync[changed].sync_hash # looks like a new file if sync[changed].otype == DIRECTORY: self.mkdir_synced(changed, sync, translated_path) elif not self.download_changed(changed, sync): pass elif sync[synced].oid: self.upload_synced(changed, sync) else: return self.create_synced(changed, sync, translated_path) else: log.debug("rename %s %s", sync[synced].sync_path, translated_path) try: new_oid = self.providers[synced].rename( sync[synced].oid, translated_path) except CloudFileExistsError: log.debug("can't rename, file exists") if sync.punted > 1: # never punt twice self.rename_to_fix_conflict(sync, changed, synced) else: sync.punt() return REQUEUE sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.state.update_entry( sync, synced, path=translated_path, oid=new_oid) return FINISHED def rename_to_fix_conflict(self, sync, changed, _synced): print("renaming to fix conflict") conflict_name = sync[changed].path + ".conflicted" new_oid = self.providers[changed].rename(sync[changed].oid, conflict_name) self.state.update(changed, sync[changed].otype, oid=new_oid, path=conflict_name, prior_oid=sync[changed].oid) def embrace_change(self, sync, changed, synced): log.debug("embrace %s", sync) if sync[changed].exists == TRASHED: self.delete_synced(sync, changed, synced) return FINISHED if sync.is_path_change(changed) or sync.is_creation(changed): return self.handle_path_change_or_creation(sync, changed, synced) if sync[changed].hash != sync[changed].sync_hash: # not a new file, which means we must have last sync info log.debug("needs upload: %s index: %s", sync, synced) assert sync[synced].oid self.download_changed(changed, sync) self.upload_synced(changed, sync) return FINISHED log.info("nothing changed %s, but changed is true", sync) return FINISHED def update_sync_path(self, sync, changed): assert sync[changed].oid info = self.providers[changed].info_oid(sync[changed].oid) if not info: sync[changed].exists = TRASHED return if not info.path: assert False, "impossible sync, no path %s" % sync[changed] log.debug("UPDATE PATH %s->%s", sync, info.path) self.state.update_entry( sync, changed, sync[changed].oid, path=info.path, exists=True) def handle_hash_conflict(self, sync): log.debug("splitting hash conflict %s", sync) # split the sync in two defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def handle_split_conflict(self, defer_ent, defer_side, replace_ent, replace_side): log.info("BEFORE\n%s", self.state.pretty_print()) conflict_path = replace_ent[replace_side].path + ".conflicted" new_oid = self.providers[replace_side].rename( replace_ent[replace_side].oid, conflict_path) self.state.update_entry(replace_ent, replace_side, new_oid, path=conflict_path) log.debug("REPLACE %s", replace_ent) # force download of other side self.state.mark_changed(defer_side, defer_ent) defer_ent[defer_side].sync_path = None defer_ent[defer_side].sync_hash = None log.debug("SPLITTY\n%s", self.state.pretty_print()) def handle_path_conflict(self, sync): # consistent handling path1 = sync[0].path path2 = sync[1].path if path1 > path2: pick = 0 else: pick = 1 picked = sync[pick] other = sync[other_side(pick)] other_path = self.translate(other.side, picked.path) if other_path is None: return log.debug("renaming to handle path conflict: %s -> %s", other.oid, other_path) new_oid = self.providers[other.side].rename(other.oid, other_path) self.state.update_entry(sync, other.side, new_oid, path=other_path) PKoiO ^SScloudsync/sync/state.pyimport time import logging import json import copy from enum import Enum from typing import Union from abc import ABC, abstractmethod from typing import Optional, Tuple, Any, List, Dict, Set from cloudsync.types import OType from cloudsync.types import DIRECTORY, FILE from .util import debug_sig log = logging.getLogger(__name__) __all__ = ['SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] # adds a repr to some classes class Reprable: # pylint: disable=too-few-public-methods def __repr__(self): return self.__class__.__name__ + ":" + debug_sig(id(self)) + str(self.__dict__) # safe ternary, don't allow traditional comparisons class Exists(Enum): UNKNOWN = None EXISTS = True TRASHED = False def __bool__(self): raise ValueError("never bool enums") UNKNOWN = Exists.UNKNOWN EXISTS = Exists.EXISTS TRASHED = Exists.TRASHED # state of a single object class SideState(Reprable): # pylint: disable=too-few-public-methods, too-many-instance-attributes def __init__(self, side: int, otype: OType): self.side: int = side # just for assertions self.otype: OType = otype self.hash: Optional[bytes] = None # hash at provider # time of last change (we maintain this) self.changed: Optional[float] = None self.sync_hash: Optional[bytes] = None # hash at last sync self.sync_path: Optional[str] = None # path at last sync self.path: Optional[str] = None # path at provider self.oid: Optional[str] = None # oid at provider self._exists: Exists = UNKNOWN # exists at provider @property def exists(self): return self._exists # allow traditional sets of ternary @exists.setter def exists(self, val: Union[bool, Exists]): if val is False: val = TRASHED if val is True: val = EXISTS if val is None: val = UNKNOWN if type(val) != Exists: raise ValueError("use enum for exists") self._exists = val # these are not really local or remote # but it's easier to reason about using these labels LOCAL = 0 REMOTE = 1 def other_side(index): return 1-index class Storage(ABC): @abstractmethod def create(self, tag: str, serialization: bytes) -> Any: """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def update(self, tag: str, serialization: bytes, eid: Any): """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def delete(self, tag: str, eid: Any): """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def read_all(self, tag: str) -> Dict[Any, bytes]: """yield all the serialized strings in a generator""" ... # single entry in the syncs state collection class SyncEntry(Reprable): def __init__(self, otype: OType, storage_init: Optional[Tuple[Any, bytes]] = None): super().__init__() self.__states: List[SideState] = [SideState(0, otype), SideState(1, otype)] self.temp_file: Optional[str] = None self.discarded: bool = False self.storage_id: Any = None self.dirty: bool = True self.punted: int = 0 if storage_init is not None: self.storage_id = storage_init[0] self.deserialize(storage_init) self.dirty = False def serialize(self) -> bytes: """converts SyncEntry into a json str""" def side_state_to_dict(side_state: SideState) -> dict: ret = dict() ret['otype'] = side_state.otype.value ret['side'] = side_state.side ret['hash'] = side_state.hash.hex() if isinstance( side_state.hash, bytes) else None ret['changed'] = side_state.changed ret['sync_hash'] = side_state.sync_hash.hex() if isinstance( side_state.sync_hash, bytes) else None ret['path'] = side_state.path ret['sync_path'] = side_state.sync_path ret['oid'] = side_state.oid ret['exists'] = side_state.exists.value # storage_id does not get serialized, it always comes WITH a serialization when deserializing return ret ser = dict() ser['side0'] = side_state_to_dict(self.__states[0]) ser['side1'] = side_state_to_dict(self.__states[1]) ser['temp_file'] = self.temp_file ser['discarded'] = self.discarded return json.dumps(ser).encode('utf-8') def deserialize(self, storage_init: Tuple[Any, bytes]): """loads the values in the serialization dict into self""" def dict_to_side_state(side, side_dict: dict) -> SideState: otype = OType(side_dict['otype']) side_state = SideState(side, otype) side_state.side = side_dict['side'] side_state.hash = bytes.fromhex( side_dict['hash']) if side_dict['hash'] else None side_state.changed = side_dict['changed'] side_state.sync_hash = bytes.fromhex( side_dict['sync_hash']) if side_dict['sync_hash'] else None side_state.sync_path = side_dict['sync_path'] side_state.path = side_dict['path'] side_state.oid = side_dict['oid'] side_state.exists = side_dict['exists'] return side_state self.storage_id = storage_init[0] ser: dict = json.loads(storage_init[1].decode('utf-8')) self.__states = [dict_to_side_state(0, ser['side0']), dict_to_side_state(1, ser['side1'])] self.temp_file = ser['temp_file'] self.discarded = ser['discarded'] def __getitem__(self, i): return self.__states[i] def __setitem__(self, i, val): assert type(val) is SideState assert val.side is None or val.side == i self.__states[i] = val self.dirty = True def hash_conflict(self): if self[0].hash and self[1].hash: return self[0].hash != self[0].sync_hash and self[1].hash != self[1].sync_hash return False def path_conflict(self): if self[0].path and self[1].path: return self[0].path != self[0].sync_path and self[1].path != self[1].sync_path return False def is_path_change(self, changed): return self[changed].path != self[changed].sync_path def is_creation(self, changed): return not self[changed].sync_path def discard(self): self.discarded = True self.dirty = True def pretty(self, fixed=True, use_sigs=True): if self.discarded: return "DISCARDED" def secs(t): if t: return str(round(t % 300, 3)).replace(".", "") else: return 0 def abbrev_bool(b, tup=('T', 'F', '?')): idx = 1-int(bool(b)) if b is None: idx = 2 return tup[idx] lexv = abbrev_bool(self[LOCAL].exists.value, ("E", "X", "?")) rexv = abbrev_bool(self[REMOTE].exists.value, ("E", "X", "?")) lhma = abbrev_bool(self[LOCAL].sync_hash != self[LOCAL].hash, ("H", "=", "?")) rhma = abbrev_bool(self[REMOTE].sync_hash != self[REMOTE].hash, ("H", "=", "?")) if use_sigs: _sig = debug_sig else: _sig = lambda a: a otype = self[LOCAL].otype.value[0:3] if self[REMOTE].otype != self[LOCAL].otype: otype = self[LOCAL].otype.value[0] + "-" + self[REMOTE].otype.value[0] if not fixed: return str((_sig(id(self)), otype, (secs(self[LOCAL].changed), self[LOCAL].path, _sig( self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma), (secs(self[REMOTE].changed), self[REMOTE].path, _sig( self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + lexv + ":" + lhma), self.punted)) ret = "%3s S%3s %3s %6s %20s O%6s %22s -- %6s %20s O%6s %22s %s" % ( _sig(id(self)), _sig(self.storage_id), # S otype, secs(self[LOCAL].changed), self[LOCAL].path, _sig(self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, secs(self[REMOTE].changed), self[REMOTE].path, _sig(self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, self.punted or "" ) return ret def __str__(self): return self.pretty(fixed=False) def store(self, tag: str, storage: Storage): if not self.storage_id: self.storage_id = storage.create(tag, self.serialize()) else: storage.update(tag, self.serialize(), self.storage_id) def punt(self): # do this one later self.punted += 1 class SyncState: def __init__(self, storage: Optional[Storage] = None, tag: Optional[str] = None): self._oids = ({}, {}) self._paths = ({}, {}) self._changeset = set() self._storage: Optional[Storage] = storage self._tag = tag if self._storage: storage_dict = self._storage.read_all(tag) for eid, ent_ser in storage_dict.items(): ent = SyncEntry(None, (eid, ent_ser)) for side in [LOCAL, REMOTE]: path, oid = ent[side].path, ent[side].oid if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][oid] = ent self._oids[side][oid] = ent def _change_path(self, side, ent, path): assert type(ent) is SyncEntry assert ent[side].oid prior_ent = ent prior_path = ent[side].path if prior_path: if prior_path in self._paths[side]: prior_ent = self._paths[side][prior_path].pop(ent[side].oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] if path: if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][ent[side].oid] = ent ent[side].path = path ent.dirty = True if prior_ent and prior_ent in self._changeset and prior_ent is not ent: log.debug("alter changeset") self._changeset.remove(prior_ent) self._changeset.add(ent) assert ent in self.get_all() def _change_oid(self, side, ent, oid): assert type(ent) is SyncEntry prior_oid = ent[side].oid path = ent[side].path log.debug("side(%s) of %s oid -> %s", side, ent, debug_sig(oid)) other = other_side(side) if ent[other].path: assert ent in self.lookup_path(other, ent[other].path), ("%s %s path not indexed" % (other, ent)) prior_ent = None if prior_oid: prior_ent = self._oids[side].pop(prior_oid, None) if oid: ent[side].oid = oid ent.dirty = True self._oids[side][oid] = ent else: log.debug("removed oid from index") other = other_side(side) if ent[other].path: assert ent in self.lookup_path(other, ent[other].path), ("%s %s path not indexed" % (other, ent)) maybe_remove = set() if prior_ent and prior_ent is not ent and prior_ent in self._changeset: maybe_remove.add(prior_ent) self._changeset.add(ent) prior_ent = None if prior_oid and path and path in self._paths[side]: prior_ent = self._paths[side][path].pop(prior_oid, None) if oid and ent[side].path: if ent[side].path not in self._paths[side]: self._paths[side][ent[side].path] = {} self._paths[side][ent[side].path][oid] = ent if prior_ent and prior_ent is not ent and prior_ent in self._changeset: maybe_remove.add(prior_ent) for r in maybe_remove: if r in self.get_all(): continue log.debug("removing %s because oid and path not in index", r) self._changeset.remove(r) log.debug("side is %s, oid is %s", side, debug_sig(oid)) assert self._oids[side][oid] is ent def lookup_oid(self, side, oid): try: ret = self._oids[side][oid] if not ret.discarded: return ret return None except KeyError: return None def lookup_path(self, side, path): try: ret = self._paths[side][path].values() if ret: return [e for e in ret if not e.discarded] return [] except KeyError: return [] def rename_dir(self, side, from_dir, to_dir, is_subpath, replace_path): """ when a directory changes, utility to rename all kids """ remove = [] # TODO: refactor this so that a list of affected items is gathered, then the alterations happen to the final # list, which will avoid having to remove after adding, which feels mildly risky # TODO: is this function called anywhere? ATM, it looks like no... It should be called or removed # TODO: it looks like this loop has a bug... items() does not return path, sub it returns path, Dict[oid, sub] for path, sub in self._paths[side].items(): if is_subpath(from_dir, sub.path): sub.path = replace_path(sub.path, from_dir, to_dir) remove.append(path) self._paths[side][sub.path] = sub sub.dirty = True for path in remove: self._paths[side].pop(path) def update_entry(self, ent, side, oid, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin, too-many-arguments if oid is not None: self._change_oid(side, ent, oid) assert ent in self.get_all() if otype is not None: ent[side].otype = otype if path is not None: self._change_path(side, ent, path) assert ent in self.get_all() if hash is not None: ent[side].hash = hash ent.dirty = True if exists is not None and exists is not ent[side].exists: ent[side].exists = exists ent.dirty = True if changed: assert ent[side].path or ent[side].oid log.debug("add %s to changeset", ent) self.mark_changed(side, ent) assert ent in self.get_all() log.debug("updated %s", ent) def mark_changed(self, side, ent): ent[side].changed = time.time() self._changeset.add(ent) def storage_update(self, ent: SyncEntry): log.debug("storage_update eid%s", ent.storage_id) if self._storage is not None: if ent.storage_id is not None: if ent.discarded: log.debug("storage_update deleting eid%s", ent.storage_id) self._storage.delete(self._tag, ent.storage_id) else: self._storage.update(self._tag, ent.serialize(), ent.storage_id) else: assert not ent.discarded new_id = self._storage.create(self._tag, ent.serialize()) ent.storage_id = new_id log.debug("storage_update creating eid%s", ent.storage_id) ent.dirty = False def __len__(self): return len(self.get_all()) def update(self, side, otype, oid, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin, too-many-arguments log.debug("lookup %s", debug_sig(oid)) ent = self.lookup_oid(side, oid) prior_ent = None if prior_oid and prior_oid != oid: prior_ent = self.lookup_oid(side, prior_oid) if not ent: ent = prior_ent prior_ent = None if ent and prior_ent: # oid_is_path conflict # the new entry has the same name as an old entry log.debug("discarding old entry in favor of new %s", prior_ent) prior_ent.discard() if prior_oid and prior_oid != oid: # this is an oid_is_path provider path_ents = self.lookup_path(side, path) if path_ents: if not ent: ent = path_ents[0] log.debug("matched existing entry %s:%s", debug_sig(oid), path) elif ent is not path_ents[0]: path_ents[0].discard() log.debug("discarded existing entry %s:%s", debug_sig(oid), path) if not ent: log.debug("creating new entry because %s not found in %s", debug_sig(oid), side) ent = SyncEntry(otype) self.update_entry(ent, side, oid, path, hash, exists, changed=True, otype=otype) self.storage_update(ent) def change(self): # for now just get a random one for e in self._changeset: if not e.discarded and not e.punted: return e for e in list(self._changeset): if e.discarded: self._changeset.remove(e) else: return e return None def has_changes(self): return bool(self._changeset) def finished(self, ent): if ent[1].changed or ent[0].changed: log.info("not marking finished: %s", ent) return self._changeset.remove(ent) for e in self._changeset: e.punted = 0 def pretty_print(self, use_sigs=True): ret = "" for e in self.get_all(): e: SyncEntry ret += e.pretty(fixed=True, use_sigs=use_sigs) + "\n" return ret def _assert_index_is_correct(self): for ent in self._changeset: if not ent.discarded: assert ent in self.get_all(), ("%s in changeset, not in index" % ent) for ent in self.get_all(): assert ent if ent[LOCAL].path: assert ent in self.lookup_path(LOCAL, ent[LOCAL].path), ("%s local path not indexed" % ent) if ent[REMOTE].path: assert ent in self.lookup_path(REMOTE, ent[REMOTE].path), ("%s remote path not indexed" % ent) if ent[LOCAL].oid: assert ent is self.lookup_oid(LOCAL, ent[LOCAL].oid), ("%s local oid not indexed" % ent) if ent[REMOTE].oid: assert ent is self.lookup_oid(REMOTE, ent[REMOTE].oid), ("%s local oid not indexed" % ent) if ent[LOCAL].changed or ent[REMOTE].changed: if ent not in self._changeset: assert False, ("changeset missing %s" % ent) def get_all(self, discarded=False) -> Set['SyncState']: ents = set() for ent in self._oids[LOCAL].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) for ent in self._oids[REMOTE].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) return ents def entry_count(self): return len(self.get_all()) def split(self, ent): log.debug("splitting %s", ent) defer = REMOTE replace = LOCAL defer_ent = ent replace_ent = SyncEntry(ent[replace].otype) replace_ent[replace] = copy.copy(ent[replace]) # copy in the replace state defer_ent[replace] = SideState(replace, ent[replace].otype) # clear out # fix indexes, so the defer ent no longer has replace stuff self.update_entry(defer_ent, replace, oid=None, path=None, exists=UNKNOWN) self.update_entry(defer_ent, defer, oid=defer_ent[defer].oid, changed=True) # add to index assert replace_ent[replace].oid self.update_entry( replace_ent, replace, oid=replace_ent[replace].oid, path=replace_ent[replace].path, changed=True) assert replace_ent[replace].oid # we aren't synced replace_ent[replace].sync_path = None replace_ent[replace].sync_hash = None # never synced defer_ent[defer].sync_path = None defer_ent[defer].sync_hash = None log.debug("split: %s", defer_ent) log.debug("split: %s", replace_ent) log.info("SPLIT\n%s", self.pretty_print()) assert replace_ent[replace].oid self.storage_update(defer_ent) self.storage_update(replace_ent) return defer_ent, defer, replace_ent, replace PKoiOeEDcloudsync/sync/util.pyfrom hashlib import md5 from base64 import b64encode # useful for converting oids and pointer nubmers into digestible nonces def debug_sig(t, size=3): if not t: return 0 return b64encode(md5(str(t).encode()).digest()).decode()[0:size] PKiVO==cloudsync/tests/__init__.pyfrom .fixtures.util import util from .test_provider import * PKoiOPt cloudsync/tests/conftest.pyfrom .fixtures import * # pylint: disable=unused-import def pytest_addoption(parser): parser.addoption( "--provider", action="append", default=[], help="provider(s) to run tests for" ) PK|}NRlcloudsync/tests/pytest.ini[pytest] log_format = %(asctime)s p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s: %(message)s log_date_format = %Y-%m-%d %H:%M:%S log_level = debug log_cli = true log_cli_level = error PKoiO;<%~/~/cloudsync/tests/test_cs.pyfrom io import BytesIO import logging import pytest from typing import List from .fixtures import MockProvider, MockStorage from cloudsync import CloudSync, SyncState, SyncEntry, LOCAL, REMOTE, FILE, DIRECTORY from .test_sync import WaitFor, RunUntilHelper log = logging.getLogger(__name__) @pytest.fixture(name="cs") def fixture_cs(mock_provider_generator): def translate(to, path): if to == LOCAL: return "/local" + path.replace("/remote", "") if to == REMOTE: return "/remote" + path.replace("/local", "") raise ValueError() class CloudSyncMixin(CloudSync, RunUntilHelper): pass cs = CloudSyncMixin((mock_provider_generator(), mock_provider_generator()), translate) yield cs cs.done() @pytest.fixture(name="multi_cs") def fixture_multi_cs(mock_provider_generator): storage_dict = dict() storage = MockStorage(storage_dict) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = mock_provider_generator() p2 = mock_provider_generator() p3 = mock_provider_generator() def translate1(to, path): if to == LOCAL: return "/local1" + path.replace("/remote", "") if to == REMOTE: if "/local1" in path: return "/remote" + path.replace("/local1", "") return None raise ValueError() def translate2(to, path): if to == LOCAL: return "/local2" + path.replace("/remote", "") if to == REMOTE: if "/local2" in path: return "/remote" + path.replace("/local2", "") return None raise ValueError() cs1 = CloudSyncMixin((p1, p2), translate1, storage, "tag1") cs2 = CloudSyncMixin((p1, p3), translate2, storage, "tag2") yield cs1, cs2 cs1.done() cs2.done() def test_sync_multi(multi_cs): cs1, cs2 = multi_cs local_parent1 = "/local1" local_parent2 = "/local2" remote_parent1 = "/remote" remote_parent2 = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path11 = "/local1/stuff1" local_path21 = "/local2/stuff1" local_path12 = "/local1/stuff2" local_path22 = "/local2/stuff2" cs1.providers[LOCAL].mkdir(local_parent1) cs1.providers[REMOTE].mkdir(remote_parent1) cs2.providers[LOCAL].mkdir(local_parent2) cs2.providers[REMOTE].mkdir(remote_parent2) linfo1 = cs1.providers[LOCAL].create(local_path11, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path21, BytesIO(b"hello2"), None) rinfo1 = cs1.providers[REMOTE].create(remote_path2, BytesIO(b"hello3"), None) rinfo2 = cs2.providers[REMOTE].create(remote_path2, BytesIO(b"hello4"), None) cs1.run_until_found( (LOCAL, local_path11), (LOCAL, local_path21), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) cs1.run(until=lambda: not cs1.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs1.state.pretty_print()) assert len(cs1.state) == 5 # two dirs, 3 files, 1 never synced (local2 file) try: cs2.run_until_found( (LOCAL, local_path12), (LOCAL, local_path22), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("TABLE\n%s", cs2.state.pretty_print()) raise linfo12 = cs1.providers[LOCAL].info_path(local_path12) rinfo11 = cs1.providers[REMOTE].info_path(remote_path1) linfo22 = cs2.providers[LOCAL].info_path(local_path22) rinfo21 = cs2.providers[REMOTE].info_path(remote_path1) assert linfo12.oid assert linfo22.oid assert rinfo11.oid assert rinfo21.oid assert linfo12.hash == rinfo1.hash assert linfo22.hash == rinfo2.hash # let cleanups/discards/dedups happen if needed cs2.run(until=lambda: not cs2.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs2.state.pretty_print()) assert len(cs2.state) == 6 # two dirs, 4 files, 2 never synced (local1 files) def test_sync_basic(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/stuff2" local_path2 = "/local/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"hello2"), None) cs.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) linfo2 = cs.providers[LOCAL].info_path(local_path2) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo2.oid assert rinfo1.oid assert linfo2.hash == rinfo2.hash assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") # let cleanups/discards/dedups happen if needed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs.state.pretty_print()) assert len(cs.state) == 3 assert not cs.state.has_changes() def setup_remote_local(cs, *names): remote_parent = "/remote" local_parent = "/local" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) found = [] for name in names: remote_path1 = "/remote/" + name local_path1 = "/local/" + name linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) found.append((REMOTE, remote_path1)) cs.run_until_found(*found) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) def test_sync_create_delete_same_name(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) cs.run(until=lambda: not cs.state.has_changes(), timeout=2) rinfo = cs.providers[REMOTE].info_path(remote_path1) cs.emgrs[LOCAL].do() cs.providers[LOCAL].delete(linfo1.oid) cs.emgrs[LOCAL].do() linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) # run local event manager only... not sync cs.emgrs[LOCAL].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: assert(len(cs.state) == 2) else: assert(len(cs.state) == 3) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 2) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") def test_sync_two_conflicts(cs): remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # the local delete/create doesn't add entries assert(len(cs.state) == 2) else: assert(len(cs.state) == 4) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 3) assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") b1 = BytesIO() b2 = BytesIO() cs.providers[LOCAL].download_path(local_path1, b1) cs.providers[LOCAL].download_path(local_path1 + ".conflicted", b2) assert b1.getvalue() in (b'goodbye', b'world') assert b2.getvalue() in (b'goodbye', b'world') assert b1.getvalue() != b2.getvalue() # this test is sensitive to the order in which things are processed # so run it a few times @pytest.mark.repeat(10) def test_sync_folder_conflicts_file(cs): remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff1/under" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].mkdir(remote_path1) linfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # there won't be 2 rows for /local/stuff1 is oid_is_path assert(len(cs.state) == 3) locs = cs.state.lookup_path(LOCAL, local_path1) assert locs and len(locs) == 1 loc = locs[0] assert loc[LOCAL].otype == FILE assert loc[REMOTE].otype == DIRECTORY else: # deleted /local/stuff, remote/stuff, remote/stuff/under, lcoal/stuff, /local assert(len(cs.state) == 5) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 4) local_conf = cs.providers[LOCAL].info_path(local_path1 + ".conflicted") remote_conf = cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") def test_storage(): def translate(to, path): (old, new) = ("/local", "/remote") if to == REMOTE else ("/remote", "/local") return new + path.replace(old, "") class CloudSyncMixin(CloudSync, RunUntilHelper): pass storage_dict = dict() p1 = MockProvider(oid_is_path=False, case_sensitive=True) p2 = MockProvider(oid_is_path=False, case_sensitive=True) storage1 = MockStorage(storage_dict) cs1: CloudSync = CloudSyncMixin((p1, p2), translate, storage1, "tag") test_sync_basic(cs1) # do some syncing, to get some entries into the state table storage2 = MockStorage(storage_dict) cs2: CloudSync = CloudSyncMixin((p1, p2), translate, storage2, "tag") print(f"state1 = {cs1.state.entry_count()}\n{cs1.state.pretty_print()}") print(f"state2 = {cs2.state.entry_count()}\n{cs2.state.pretty_print()}") def not_dirty(s: SyncState): for se in s.get_all(): se: SyncEntry assert not se.dirty def compare_states(s1: SyncState, s2: SyncState) -> List[SyncEntry]: ret = [] found = False for e1 in s1.get_all(): e1: SyncEntry for e2 in s2.get_all(): e2: SyncEntry if e1.serialize() == e2.serialize(): found = True if not found: ret.append(e1) return ret missing1 = compare_states(cs1.state, cs2.state) missing2 = compare_states(cs2.state, cs1.state) for e in missing1: print(f"entry in 1 not found in 2 {e.pretty()}") for e in missing2: print(f"entry in 2 not found in 1 {e.pretty()}") assert not missing1 assert not missing2 not_dirty(cs1.state) PKoiO@Lcloudsync/tests/test_events.pyimport os from io import BytesIO import pytest from cloudsync import EventManager, SyncState, LOCAL @pytest.fixture(name="manager") def fixture_manager(mock_provider_generator): # TODO extend this to take any provider state = SyncState() provider = mock_provider_generator() yield EventManager(provider, state, LOCAL) def test_event_basic(util, manager): provider = manager.provider state = manager.state info = provider.create("/dest", BytesIO(b'hello')) assert not state.lookup_path(LOCAL, "/dest") oid = None # this is normally a blocking function that runs forever def done(): nonlocal oid states = state.get_all() if states: oid = list(states)[0][LOCAL].oid return state.lookup_oid(LOCAL, oid) # loop the sync until the file is found manager.run(timeout=1, until=done) assert oid info = provider.info_oid(oid) assert info.path == "/dest" PKoiOtocloudsync/tests/test_mux.pyimport threading from cloudsync.muxer import Muxer def test_simple_mux(): def gen(): yield from range(4) m1 = Muxer(gen) m2 = Muxer(gen) assert len(list(m1)) == 4 assert len(list(m2)) == 4 def test_thready_mux(): threads = 10 count = 1000 def gen(): yield from range(count) def counter(m): def inner(): inner.count = 0 for _ in m: inner.count += 1 return inner m = [None] * threads c = [None] * threads t = [None] * threads for i in range(threads): m[i] = Muxer(gen) c[i] = counter(m[i]) t[i] = threading.Thread(target=c[i]) for i in range(threads): t[i].start() for i in range(threads): t[i].join() assert c[i].count == count def test_later_mux(): def gen(): yield from range(4) m1 = Muxer(gen) assert next(m1) == 0 m2 = Muxer(gen) assert len(m1.listeners) == 2 assert len(list(m1)) == 3 assert len(list(m2)) == 3 def test_restart_mux(): def gen(): yield from range(4) m1 = Muxer(gen, restart=True) m2 = Muxer(gen, restart=True) assert len(m1.listeners) == 2 assert len(list(m1)) == 4 assert len(list(m2)) == 8 assert len(list(m1)) == 8 assert len(list(m2)) == 8 def test_del(): def gen(): yield from range(4) m1 = Muxer(gen) i = next(m1) m2 = Muxer(gen) lm2 = list(m2) assert len(m2.listeners) == 2 assert len(lm2) == 3 m2.__del__() assert len(m1.listeners) == 1 j = list(m1) assert gen in Muxer.already m1.__del__() assert len(m1.listeners) == 0 assert gen not in Muxer.already PKoiO2ַ cloudsync/tests/test_provider.pyimport os import time import logging import pytest from io import BytesIO from unittest.mock import patch from typing import Union, NamedTuple import cloudsync from cloudsync import Event, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError, FILE from cloudsync.tests.fixtures import Provider, mock_provider_instance from cloudsync.runnable import time_helper from cloudsync.types import OInfo from cloudsync.providers import GDriveProvider, DropboxProvider log = logging.getLogger(__name__) ProviderMixin = Union[Provider, "ProviderHelper"] class ProviderHelper(Provider): def __init__(self, prov): self.api_retry = True self.prov = prov # need to copy in all attrs that are defined in the ABC self.oid_is_path = prov.oid_is_path self.case_sensitive = prov.case_sensitive self.test_root = getattr(self.prov, "test_root", None) self.event_timeout = getattr(self.prov, "event_timeout", 20) self.event_sleep = getattr(self.prov, "event_sleep", 1) self.creds = getattr(self.prov, "creds", {}) self.prov_api_func = self.prov._api self.prov._api = lambda *ar, **kw: self.__api_retry(self._api, *ar, **kw) self.prov.connect(self.creds) if not self.test_root: # if the provider class doesn't specify a testing root # then just make one up self.test_root = "/" + os.urandom(16).hex() self.prov.mkdir(self.test_root) def _api(self, *ar, **kw): return self.prov_api_func(*ar, **kw) def __api_retry(self: ProviderMixin, func, *ar, **kw): # the cloud providers themselves should *not* have their own backoff logic # rather, they should punt rate limit and temp errors to the sync system # since we're not testing the sync system here, we need to make our own if not self.api_retry: return func(*ar, **kw) for _ in time_helper(timeout=self.event_timeout, sleep=self.event_sleep, multiply=2): try: return func(*ar, **kw) except CloudTemporaryError: log.info("api retry %s %s %s", func, ar, kw) # TEST-ROOT WRAPPER def __getattr__(self, k): return getattr(self.prov, k) def events(self): for e in self.prov.events(): if self.__filter_root(e): yield e def walk(self, path): path = self.__add_root(path) log.debug("WALK %s", path) for e in self.prov.walk(path): if self.__filter_root(e): yield e def download(self, *args, **kwargs): return self.__strip_root(self.prov.download(*args, **kwargs)) def create(self, path, file_like, metadata=None): path = self.__add_root(path) log.debug("CREATE %s", path) return self.__strip_root(self.prov.create(path, file_like, metadata)) def upload(self, *args, **kwargs): return self.__strip_root(self.prov.upload(*args, **kwargs)) def rename(self, oid, path): path = self.__add_root(path) return self.__strip_root(self.prov.rename(oid, path)) def mkdir(self, path): path = self.__add_root(path) return self.__strip_root(self.prov.mkdir(path)) def delete(self, *args, **kwargs): return self.__strip_root(self.prov.delete(*args, **kwargs)) def exists_oid(self, oid): return self.prov.exists_oid(oid) def exists_path(self, path): path = self.__add_root(path) return self.prov.exists_path(path) def info_path(self, path): path = self.__add_root(path) return self.__strip_root(self.prov.info_path(path)) def info_oid(self, oid): return self.__strip_root(self.prov.info_oid(oid)) def listdir(self, oid): for e in self.prov.listdir(oid): if self.__filter_root(e): yield e def __add_root(self, path): return self.join(self.test_root, path) def __filter_root(self, obj): if hasattr(obj, "path"): raw_path = obj.path if not raw_path: info = self.prov.info_oid(obj.oid) if info: raw_path = info.path if not raw_path: # pathless objects always get passed through # so isolation is not perfect return True if not raw_path.startswith(self.test_root): return False self.__strip_root(obj) return True def __strip_root(self, obj): if hasattr(obj, "path"): path = obj.path if path: assert path.startswith(self.test_root) path = obj.path[len(self.test_root):] if not path.startswith("/"): path = "/" + path obj.path = path return obj # HELPERS def temp_name(self: ProviderMixin, name="tmp", *, folder=None): fname = self.join(folder or "/", os.urandom(16).hex() + "." + name) return fname def events_poll(self: ProviderMixin, timeout=None, until=None): if timeout is None: timeout = self.event_timeout if timeout == 0: yield from self.events() return for _ in time_helper(timeout, sleep=self.event_sleep, multiply=2): got = False for e in self.events(): yield e got = True if not until and got: break elif until and until(): break def __cleanup(self: ProviderMixin, oid): try: for info in self.prov.listdir(oid): if info.otype == FILE: log.debug("cleaning %s", info) self.delete(info.oid) else: self.__cleanup(info.oid) log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileNotFoundError: pass def test_cleanup(self: ProviderMixin, timeout=None, until=None): info = self.prov.info_path(self.test_root) self.__cleanup(info.oid) info = self.prov.info_path(self.test_root) if info: try: log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileExistsError: # deleting the root might now be supported pass def mixin_provider(prov): assert prov assert isinstance(prov, Provider) prov = ProviderHelper(prov) yield prov prov.test_cleanup() @pytest.fixture def provider_params(): return None class ProviderConfig: def __init__(self, name, param=(), param_id=None): if param_id is None: param_id = name self.name = name if name == "mock": assert param self.param = param self.param_id = param_id def __repr__(self): return "%s(%s)" % (type(self), self.__dict__) @pytest.fixture def config_provider(request, provider_config): try: request.raiseerror("foo") except Exception as e: FixtureLookupError = type(e) if provider_config.name == "external": # if there's a fixture available, use it return request.getfixturevalue("cloudsync_provider") # deferring imports to prevent needing deps we don't want to require for everyone elif provider_config.name == "mock": return mock_provider_instance(*provider_config.param) elif provider_config.name == "gdrive": from .providers.gdrive import gdrive_provider return gdrive_provider() elif provider_config.name == "dropbox": from .providers.dropbox import dropbox_provider return dropbox_provider() else: assert False, "Must provide a valid --provider name or use the -p " known_providers = ('gdrive', 'external', 'dropbox', 'mock') def configs_from_name(name): provs = [] if name == "mock": provs += [ProviderConfig("mock", (False, True), "mock_oid_cs")] provs += [ProviderConfig("mock", (True, True), "mock_path_cs")] else: provs += [ProviderConfig(name)] return provs def configs_from_keyword(kw): provs = [] # crappy approximation of pytest evaluation routine, because false = {} for known_prov in known_providers: false[known_prov] = False for known_prov in known_providers: if known_prov == kw or '[' + known_prov + ']' == kw: ok = True else: ids = false.copy() ids[known_prov] = True try: ok = eval(kw, {}, ids) except NameError as e: ok = False except Exception as e: log.error("%s %s", type(e), e) ok = False if type(ok) is list: ok = any(ok) if ok: provs += configs_from_name(known_prov) return provs _registered = False def pytest_generate_tests(metafunc): global _registered if not _registered: for known_prov in known_providers: metafunc.config.addinivalue_line( "markers", known_prov ) _registered = True if "provider_config" in metafunc.fixturenames: provs = [] for e in metafunc.config.getoption("provider", []): for n in e.split(","): provs += configs_from_name(n) if not provs: kw = metafunc.config.getoption("keyword", "") if kw: provs += configs_from_keyword(kw) if not provs: provs += configs_from_name("mock") ids = [p.param_id for p in provs] marks = [pytest.param(p, marks=[getattr(pytest.mark, p.name)]) for p in provs] metafunc.parametrize("provider_config", marks, ids=ids) @pytest.fixture def provider(config_provider): yield from mixin_provider(config_provider) def test_join(mock_provider): assert "/a/b/c" == mock_provider.join("a", "b", "c") assert "/a/c" == mock_provider.join("a", None, "c") assert "/a/b/c" == mock_provider.join("/a", "/b", "/c") assert "/a/c" == mock_provider.join("a", "/", "c") def test_connect(provider): assert provider.connected def test_create_upload_download(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) info2 = provider.upload(info1.oid, data()) assert info1.oid == info2.oid assert info1.hash == info2.hash assert provider.exists_path(dest) dest = BytesIO() provider.download(info2.oid, dest) dest.seek(0) assert dest.getvalue() == dat def test_rename(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) dest2 = provider.temp_name("dest2") provider.rename(info1.oid, dest2) assert provider.exists_path(dest2) assert not provider.exists_path(dest) # test that renaming a folder renames the children folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() file_name = os.urandom(16).hex() file_path1 = provider.join(folder_name1, file_name) file_path2 = provider.join(folder_name2, file_name) sub_folder_name = os.urandom(16).hex() sub_folder_path1 = provider.join(folder_name1, sub_folder_name) sub_folder_path2 = provider.join(folder_name2, sub_folder_name) sub_file_name = os.urandom(16).hex() sub_file_path1 = provider.join(sub_folder_path1, sub_file_name) sub_file_path2 = provider.join(sub_folder_path2, sub_file_name) folder_oid = provider.mkdir(folder_name1) sub_folder_oid = provider.mkdir(sub_folder_path1) file_info = provider.create(file_path1, data()) sub_file_info = provider.create(sub_file_path1, data()) assert provider.exists_path(file_path1) assert not provider.exists_path(file_path2) assert provider.exists_path(sub_file_path1) assert not provider.exists_path(sub_file_path2) assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) provider.rename(folder_oid, folder_name2) assert provider.exists_path(file_path2) assert not provider.exists_path(file_path1) assert provider.exists_path(sub_file_path2) assert not provider.exists_path(sub_file_path1) if not provider.oid_is_path: assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) assert provider.info_oid(file_info.oid).path == file_path2 assert provider.info_oid(sub_file_info.oid).path == sub_file_path2 else: assert not provider.exists_oid(file_info.oid) assert not provider.exists_oid(sub_file_info.oid) def test_mkdir(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") provider.mkdir(dest) info = provider.info_path(dest) assert info.otype == cloudsync.DIRECTORY sub_f = provider.temp_name("dest", folder=dest) log.debug("parent = %s, sub = %s", dest, sub_f) with pytest.raises(CloudFileExistsError): provider.create(dest, data(), None) assert provider.exists_path(dest) log.debug("folder %s exists", dest) provider.create(sub_f, data(), None) def test_walk(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") info = provider.create(dest, temp, None) got_event = False for e in provider.walk("/"): if e.otype == cloudsync.DIRECTORY: continue log.debug("WALK %s", e) assert e.oid == info.oid path = e.path if path is None: path = provider.info_oid(e.oid).path assert path == dest assert e.mtime assert e.exists got_event = True assert got_event def check_event_path(event: Event, provider: ProviderMixin, target_path): # confirms that the path in the event matches the target_path # if the provider doesn't provide the path in the event, look it up by the oid in the event # if we can't get the path, that's OK if the file doesn't exist event_path = event.path if event_path is None: try: event_path = provider.info_oid(event.oid).path assert event_path == target_path except CloudFileNotFoundError: if event.exists: raise def test_event_basic(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep, 1)): log.debug("event %s", e) wait_sleep_cycles = 10 info1 = provider.create(dest, temp, None) assert info1 is not None # TODO: check info1 for more things received_event = None event_count = 0 done = False waiting = None wait_secs = min(provider.event_sleep * wait_sleep_cycles, 2) for e in provider.events_poll(until=lambda: done): log.debug("got event %s", e) # you might get events for the root folder here or other setup stuff if e.exists: if not e.path: info = provider.info_oid(e.oid) if info: e.path = info.path if e.path == dest: received_event = e event_count += 1 log.debug("%s vs %s", e.path, dest) if e.path == dest and not waiting: waiting = time.monotonic() + wait_secs if waiting and time.monotonic() > waiting: # wait for extra events up to 10 sleep cycles, or 2 seconds done = True assert event_count == 1 assert received_event is not None assert received_event.oid path = received_event.path if path is None: path = provider.info_oid(received_event.oid).path assert path == dest assert received_event.mtime assert received_event.exists deleted_oid = received_event.oid provider.delete(oid=deleted_oid) provider.delete(oid=deleted_oid) # Tests that deleting a non-existing file does not raise a FNFE received_event = None event_count = 0 for e in provider.events_poll(): log.debug("event %s", e) if not e.exists or path in e.path: received_event = e event_count += 1 assert event_count == 1 assert received_event is not None assert received_event.oid assert not received_event.exists if received_event.path is not None: assert received_event.path == dest assert received_event.oid == deleted_oid assert received_event.mtime def test_event_del_create(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) temp2 = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) provider.delete(info1.oid) provider.create(dest, temp2) last_event = None saw_delete = False done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path if path == dest or e.exists is False: last_event = e if e.exists is True and saw_delete: log.debug("done, we saw the delete and got a create after") done = True if e.exists is False: saw_delete = True # the important thing is that we always get a create after the delete event assert last_event assert last_event.exists is True def test_event_rename(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") dest2 = provider.temp_name("dest") dest3 = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) oid2 = provider.rename(info1.oid, dest2) if provider.oid_is_path: info1.oid = provider.info_path(dest2).oid oid3 = provider.rename(info1.oid, dest3) if provider.oid_is_path: info1.oid = provider.info_path(dest3).oid seen = set() last_event = None second_to_last = None done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): if provider.oid_is_path: assert e.path log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path last_event = e seen.add(e.oid) if provider.oid_is_path: # 2 and 3 are in order if path == dest2: second_to_last = True if path == dest3 and (second_to_last or not provider.oid_is_path): done = True else: done = info1.oid in seen if provider.oid_is_path: # providers with path based oids need to send intermediate renames accurately and in order assert len(seen) > 2 assert last_event.path == dest3 assert last_event.prior_oid == oid2 else: # oid based providers just need to let us know something happend to that oid assert info1.oid in seen def test_api_failure(provider): # assert that the cloud # a) uses an api function # b) does not trap CloudTemporaryError's def side_effect(*a, **k): raise CloudTemporaryError("fake disconnect") with patch.object(provider, "_api", side_effect=side_effect): with patch.object(provider, "api_retry", False): with pytest.raises(CloudTemporaryError): provider.exists_path("/notexists") def test_file_not_found(provider: ProviderMixin): # Test that operations on nonexistent file system objects raise CloudFileNotFoundError # when appropriate, and don't when inappropriate dat = os.urandom(32) def data(): return BytesIO(dat) test_file_deleted_path = provider.temp_name("dest1") # Created, then deleted test_file_deleted_info = provider.create(test_file_deleted_path, data(), None) test_file_deleted_oid = test_file_deleted_info.oid provider.delete(test_file_deleted_oid) test_folder_deleted_path = provider.temp_name("dest1") # Created, then deleted test_folder_deleted_oid = provider.mkdir(test_folder_deleted_path) provider.delete(test_folder_deleted_oid) test_path_made_up = provider.temp_name("dest2") # Never created test_oid_made_up = "never created" # TODO: consider mocking info_path to always return None, and then call all the provider methods # to see if they are handling the None, and not raising exceptions other than FNF # Tests: # exists_path # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_path(test_file_deleted_path) is False assert provider.exists_path(test_folder_deleted_path) is False assert provider.exists_path(test_path_made_up) is False # exists_oid # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_oid(test_folder_deleted_oid) is False assert provider.exists_oid(test_oid_made_up) is False # info_path # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_path(test_file_deleted_path) is None assert provider.info_path(test_folder_deleted_path) is None assert provider.info_path(test_path_made_up) is None # info_oid # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_oid(test_file_deleted_oid) is None assert provider.info_oid(test_folder_deleted_oid) is None assert provider.info_oid(test_oid_made_up) is None # hash_oid # deleted file returns None # never existed file returns None # if getattr(provider, "hash_oid", False): # TODO implement hash_oid in gdrive, then don't have this be conditional # assert provider.hash_oid(test_file_deleted_oid) is None # assert provider.hash_oid(test_oid_made_up) is None # upload # to a deleted file raises FNF, or untrashes the file, either is OK # to a made up oid raises FNF # TODO: uploading to a deleted file might not raise an FNF, it might just untrash the file assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_path(test_file_deleted_path) is False try: info = provider.upload(test_file_deleted_oid, data(), None) # This succeeded so the file must exist now, at the same oid as before assert info.oid == test_file_deleted_oid assert provider.exists_path(test_file_deleted_path) is True assert provider.exists_oid(test_file_deleted_oid) is True re_delete = True except CloudFileNotFoundError: re_delete = False pass if re_delete: provider.delete(test_file_deleted_oid) with pytest.raises(CloudFileNotFoundError): provider.upload(test_oid_made_up, data(), None) # create # to a non-existent folder, raises FNF # to a previously deleted folder, raises FNF with pytest.raises(CloudFileNotFoundError): provider.create(test_path_made_up + "/junk", data(), None) with pytest.raises(CloudFileNotFoundError): provider.create(test_folder_deleted_path + "/junk", data(), None) # upload: to the OID of a deleted folder, raises FNFE with pytest.raises(CloudFileNotFoundError): provider.upload(test_folder_deleted_oid, data(), None) # download # on a deleted oid raises FNF # on a made up oid raises FNF with pytest.raises(CloudFileNotFoundError): provider.download(test_file_deleted_oid, data()) with pytest.raises(CloudFileNotFoundError): provider.download(test_oid_made_up, data()) # rename # from a deleted oid raises FNF # from a made up oid raises FNF # to a non-existent folder raises [something], conditionally # to a previously deleted folder raises # check the rename source to see if there are others with pytest.raises(CloudFileNotFoundError): provider.rename(test_file_deleted_oid, test_file_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_folder_deleted_oid, test_folder_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_oid_made_up, test_path_made_up) # mkdir # to a non-existent folder raises FNF # to a previously deleted folder as parent folder raises FNF # to a previously deleted file as parent folder raises FNF with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_path_made_up + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_folder_deleted_path + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_file_deleted_path + "/junk") # delete # on a deleted file oid does not raise # on a deleted folder oid does not raise # on a made up oid does not raise provider.delete(test_file_deleted_oid) provider.delete(test_folder_deleted_oid) provider.delete(test_oid_made_up) # delete: create a file, delete it, then create a new file at that path, then re-delete the deleted oid, raises FNFE temp_path = provider.temp_name() info1 = provider.create(temp_path, BytesIO(b"Hello")) provider.delete(info1.oid) info2 = provider.create(temp_path, BytesIO(b"world")) if provider.oid_is_path: assert provider.exists_oid(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) else: assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) provider.delete(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) # listdir # on a deleted file raises FNF # on a deleted folder raises FNF # on a made up path raises FNF with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_file_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_folder_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_oid_made_up)) # TODO: Google drive raises FNF when it can't find the root... can we test for that here? def test_file_exists(provider: ProviderMixin): dat = os.urandom(32) def data(da=dat): return BytesIO(da) def create_and_delete_file(): create_and_delete_file_name = provider.temp_name() file_info = provider.create(create_and_delete_file_name, data(), None) provider.delete(file_info.oid) return create_and_delete_file_name, file_info.oid def create_and_delete_folder(): create_and_delete_folder_name = provider.temp_name() create_and_delete_folder_oid = provider.mkdir(create_and_delete_folder_name) provider.delete(create_and_delete_folder_oid) return create_and_delete_folder_name, create_and_delete_folder_oid def create_and_rename_file(): file_name1 = provider.temp_name() file_name2 = provider.temp_name() assert file_name1 != file_name2 file_info1 = provider.create(file_name1, data(), None) provider.rename(file_info1.oid, file_name2) return file_name1, file_info1.oid def create_and_rename_folder(): folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() assert folder_name1 != folder_name2 folder_oid1 = provider.mkdir(folder_name1) provider.rename(folder_oid1, folder_name2) return folder_name1, folder_oid1 def create_file(create_file_name=None): if create_file_name is None: create_file_name = provider.temp_name() file_info = provider.create(create_file_name, data(), None) return create_file_name, file_info.oid def create_folder(create_folder_name=None): if create_folder_name is None: create_folder_name = provider.temp_name() create_folder_oid = provider.mkdir(create_folder_name) return create_folder_name, create_folder_oid # Test that operations on existent file system objects raise CloudExistsError # when appropriate, and don't when inappropriate # api methods to check for FileExists: # vulnerable to existing paths: # mkdir, create, rename # Possible issues to potentially check each of the vulnerable api methods: # target path has a component in the parent folder that already exists as a file # target path exists # target path exists, but the type of the existing object at that location is different from expected # target path exists, but the type of the existing object at that location is what was expected # target path existed, but was deleted, different type as source # target path existed, but was deleted, same type as source # target path existed, but was renamed, different type as source # target path existed, but was renamed, same type as source # # vulnerable to existing OIDs: # upload, delete # Possible issues to potentially check each of the vulnerable api methods: # target OID exists, but the type of the existing object at that location is different from expected # target OID existed, but was trashed, should un-trash the object # target OID is a non-empty folder, delete should raise FEx # # The enumerated tests: # mkdir: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name + "/junk") # mkdir: where target path exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name) # mkdir: where target path exists as a folder, does not raise name1, oid1 = create_folder() oid2 = provider.mkdir(name1) assert oid1 == oid2 # mkdir: creating a file, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: creating a folder, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as file, but was renamed name1, oid1 = create_and_rename_file() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as folder, but was renamed name1, oid1 = create_and_rename_folder() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # upload: where target OID is a folder, raises FEx _, oid = create_folder() with pytest.raises(CloudFileExistsError): provider.upload(oid, data(), None) # delete: a non-empty folder, raises FEx name1, oid1 = create_folder() create_file(name1 + "/junk") with pytest.raises(CloudFileExistsError): provider.delete(oid1) # create: where target path exists, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name) def get_contents(oid): temp_contents = BytesIO() provider.download(oid, temp_contents) temp_contents.seek(0) return temp_contents.getvalue() # create: creating a file, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() _, oid2 = create_file(name1) assert oid1 != oid2 or provider.oid_is_path if provider.oid_is_path: assert provider.exists_oid(oid1) else: assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) # piggyback test -- uploading to the deleted oid should not step on the file that replaced it at that path if not provider.oid_is_path: try: new_contents = b"yo" # bytes(os.urandom(16).hex(), 'utf-8') new_info = provider.upload(oid1, BytesIO(new_contents)) assert new_info.oid != oid1 assert new_info.oid != oid2 assert not provider.exists_oid(oid1) contents2 = get_contents(oid2) assert contents2 == new_contents contents1 = get_contents(oid1) assert contents1 == new_contents except CloudFileNotFoundError: pass # create: creating a folder, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() _, oid2 = create_file(name1) assert oid1 != oid or provider.oid_is_path # create: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name + "/junk") # create: target path existed as folder, but was renamed name, _ = create_and_rename_folder() create_file(name) # create: target path existed as file, but was renamed name, _ = create_and_rename_file() create_file(name) # rename: rename folder over empty folder succeeds name1, oid1 = create_folder() create_file(name1 + "/junk") name2, oid2 = create_folder() assert oid1 != oid2 contents1 = [x.name for x in provider.listdir(oid1)] provider.rename(oid1, name2) if provider.oid_is_path: log.debug("oid1 %s, oid2 %s", oid1, oid2) assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid2)] else: assert provider.exists_oid(oid1) assert not provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid1)] assert contents1 == contents2 # rename: rename folder over non-empty folder raises FEx _, oid1 = create_folder() name2, oid2 = create_folder() assert oid1 != oid2 create_file(name2 + "/junk") with pytest.raises(CloudFileExistsError): provider.rename(oid1, name2) # rename: target has a parent folder that already exists as a file, raises FEx folder_name, _ = create_file() # notice that I am creating a file, and calling it a folder with pytest.raises(CloudFileExistsError): create_file(folder_name + "/junk") # rename: renaming file over empty folder, raises FEx folder_name, folder_oid = create_folder() file_name, file_oid = create_file() other_file_name, other_file_oid = create_file() with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # rename: renaming file over non-empty folder, raises FEx create_file(folder_name + "/test") with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(folder_oid, file_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(file_oid, other_file_name) # reuse the same file and folder from the last test # rename: create a file, delete it, then rename a file to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_file() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename file to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_file() provider.rename(oid2, deleted_folder_name) # rename: create a file, delete it, then rename a folder to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_folder() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename folder to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_folder() provider.rename(oid2, deleted_folder_name) # rename: target folder path existed, but was renamed away, folder type as source name1, oid1 = create_and_rename_folder() name2, oid2 = create_folder() provider.rename(oid2, name1) # rename: target folder path existed, but was renamed away, file type as source name1, oid1 = create_folder() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, folder type as source name1, oid1 = create_file() name2, oid2 = create_folder() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, file type as source name1, oid1 = create_file() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # TODO: test that renaming A over B replaces B's OID with A's OID, and B's OID is trashed def test_listdir(provider: ProviderMixin): outer = provider.temp_name() root = provider.dirname(outer) temp_name = provider.is_subpath(root, outer) outer_oid_rm = provider.mkdir(outer) assert [] == list(provider.listdir(outer_oid_rm)) provider.delete(outer_oid_rm) outer_oid = provider.mkdir(outer) assert provider.exists_path(outer) assert provider.exists_oid(outer_oid) inner = outer + temp_name inner_oid = provider.mkdir(inner) assert provider.exists_oid(inner_oid) provider.create(outer + "/file1", BytesIO(b"hello")) provider.create(outer + "/file2", BytesIO(b"there")) provider.create(inner + "/file3", BytesIO(b"world")) contents = [x.name for x in provider.listdir(outer_oid)] assert len(contents) == 3 expected = ["file1", "file2", temp_name[1:]] assert contents.sort() == expected.sort() def test_upload_to_a_path(provider: ProviderMixin): temp_name = provider.temp_name() provider.create(temp_name, BytesIO(b"test")) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) def test_delete_doesnt_cross_oids(provider: ProviderMixin): temp_name = provider.temp_name() info1 = provider.create(temp_name, BytesIO(b"test1")) provider.delete(info1.oid) info2 = provider.create(temp_name, BytesIO(b"test2")) if not provider.oid_is_path: assert info1.oid != info2.oid assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) if not provider.oid_is_path: provider.delete(info1.oid) assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) PKoiOOG=G=cloudsync/tests/test_sync.pyimport logging from io import BytesIO import pytest from typing import NamedTuple from cloudsync import SyncManager, SyncState, CloudFileNotFoundError, LOCAL, REMOTE, FILE, DIRECTORY from cloudsync.provider import Provider class WaitFor(NamedTuple): side: int = None path: str = None hash: bytes = None oid: str = None exists: bool = True log = logging.getLogger(__name__) TIMEOUT = 2 class RunUntilHelper: def run_until_found(self: SyncManager, *files, timeout=TIMEOUT): log.debug("running until found") last_error = None def found(): ok = True for info in files: if type(info) is tuple: info = WaitFor(side=info[0], path=info[1]) try: other_info = self.providers[info.side].info_path(info.path) except CloudFileNotFoundError: other_info = None if other_info is None: nonlocal last_error if info.exists is False: log.debug("waiting not exists %s", info.path) continue log.debug("waiting exists %s", info.path) last_error = CloudFileNotFoundError(info.path) ok = False break if info.exists is False: ok = False break if info.hash and info.hash != other_info.hash: log.debug("waiting hash %s", info.path) ok = False break return ok self.run(timeout=timeout, until=found) if not found(): if last_error: raise TimeoutError("timed out while waiting: %s" % last_error) else: raise TimeoutError("timed out while waiting") class SyncMgrMixin(SyncManager, RunUntilHelper): pass @pytest.fixture(name="sync") def fixture_sync(mock_provider_generator): state = SyncState() def translate(to, path): if to == LOCAL: return "/local" + path.replace("/remote", "") if to == REMOTE: return "/remote" + path.replace("/local", "") raise ValueError("bad path: %s", path) # two providers and a translation function that converts paths in one to paths in the other sync = SyncMgrMixin(state, (mock_provider_generator(), mock_provider_generator(oid_is_path=False)), translate) yield sync sync.done() def test_sync_state_basic(): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="123") state._assert_index_is_correct() def test_sync_state_rename(): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state._assert_index_is_correct() def test_sync_state_rename2(): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") state.update(LOCAL, FILE, path="foo2", oid="456", prior_oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="456") assert not state.lookup_oid(LOCAL, oid="123") state._assert_index_is_correct() def test_sync_state_rename3(): state = SyncState() ahash = "ah" bhash = "bh" state.update(LOCAL, FILE, path="a", oid="a", hash=ahash) state.update(LOCAL, FILE, path="b", oid="b", hash=bhash) infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") assert infoa[LOCAL].hash == ahash assert infob[LOCAL].hash == bhash # rename in a circle state.update(LOCAL, FILE, path="c", oid="c", prior_oid="a") log.debug("TABLE 0:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="a", oid="a", prior_oid="b") log.debug("TABLE 1:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="b", oid="b", prior_oid="c") log.debug("TABLE 2:\n%s", state.pretty_print(use_sigs=False)) assert state.lookup_path(LOCAL, "a") assert state.lookup_path(LOCAL, "b") infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") # hashes should be flipped assert infoa[LOCAL].hash == bhash assert infob[LOCAL].hash == ahash state._assert_index_is_correct() def test_sync_state_multi(): state = SyncState() state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state._assert_index_is_correct() def test_sync_basic(sync: "SyncMgrMixin"): remote_parent = "/remote" local_parent = "/local" remote_path1 = Provider.join(remote_parent, "stuff1") local_path1 = sync.translate(LOCAL, remote_path1) local_path1.replace("\\", "/") assert local_path1 == "/local/stuff1" Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.state.update(LOCAL, FILE, oid=linfo.oid, exists=True) assert sync.state.entry_count() == 1 rinfo = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello2")) # inserts info about some cloud path sync.state.update(REMOTE, FILE, oid=rinfo.oid, path=remote_path2, hash=rinfo.hash) def done(): has_info = [None] * 4 try: has_info[0] = sync.providers[LOCAL].info_path("/local/stuff1") has_info[1] = sync.providers[LOCAL].info_path("/local/stuff2") has_info[2] = sync.providers[REMOTE].info_path("/remote/stuff2") has_info[3] = sync.providers[REMOTE].info_path("/remote/stuff2") except CloudFileNotFoundError as e: log.debug("waiting for %s", e) pass return all(has_info) # loop the sync until the file is found sync.run(timeout=TIMEOUT, until=done) assert done() info = sync.providers[LOCAL].info_path("/local/stuff2") assert info.hash == sync.providers[LOCAL].hash_oid(info.oid) assert info.oid log.debug("all state %s", sync.state.get_all()) sync.state._assert_index_is_correct() def test_sync_rename(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" local_path2 = Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) new_oid = sync.providers[LOCAL].rename(linfo.oid, local_path2) sync.state.update(LOCAL, FILE, path=local_path2, oid=new_oid, hash=linfo.hash, prior_oid=linfo.oid) sync.run_until_found((REMOTE, remote_path2)) assert sync.providers[REMOTE].info_path("/remote/stuff") is None sync.state._assert_index_is_correct() def test_sync_hash(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" remote_path1 = "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) linfo = sync.providers[LOCAL].upload(linfo.oid, BytesIO(b"hello2")) sync.state.update(LOCAL, FILE, linfo.oid, hash=linfo.hash) sync.run_until_found(WaitFor(REMOTE, remote_path1, hash=linfo.hash)) info = sync.providers[REMOTE].info_path(remote_path1) check = BytesIO() sync.providers[REMOTE].download(info.oid, check) assert check.getvalue() == b"hello2" sync.state._assert_index_is_correct() def test_sync_rm(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) sync.providers[LOCAL].delete(linfo.oid) sync.state.update(LOCAL, FILE, linfo.oid, exists=False) sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state._assert_index_is_correct() def test_sync_mkdir(sync): local_dir1 = "/local" local_path1 = "/local/stuff" remote_dir1 = "/remote" remote_path1 = "/remote/stuff" local_dir_oid1 = sync.providers[LOCAL].mkdir(local_dir1) local_path_oid1 = sync.providers[LOCAL].mkdir(local_path1) # inserts info about some local path sync.state.update(LOCAL, DIRECTORY, path=local_dir1, oid=local_dir_oid1) sync.state.update(LOCAL, DIRECTORY, path=local_path1, oid=local_path_oid1) sync.run_until_found((REMOTE, remote_dir1)) sync.run_until_found((REMOTE, remote_path1)) log.debug("BEFORE DELETE\n %s", sync.state.pretty_print()) sync.providers[LOCAL].delete(local_path_oid1) sync.state.update(LOCAL, FILE, local_path_oid1, exists=False) log.debug("AFTER DELETE\n %s", sync.state.pretty_print()) log.debug("wait for delete") sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state._assert_index_is_correct() def test_sync_conflict_simul(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo = sync.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.state.update(REMOTE, FILE, path=remote_path1, oid=rinfo.oid, hash=rinfo.hash) sync.run_until_found( (REMOTE, "/remote/stuff1.conflicted"), (LOCAL, "/local/stuff1.conflicted"), (REMOTE, "/remote/stuff1"), (LOCAL, "/local/stuff1") ) sync.providers[LOCAL].log_debug_state("LOCAL") sync.providers[REMOTE].log_debug_state("REMOTE") b1 = BytesIO() b2 = BytesIO() sync.providers[REMOTE].download_path("/remote/stuff1.conflicted", b1) sync.providers[REMOTE].download_path("/remote/stuff1", b2) # both files are intact assert b1.getvalue() != b2.getvalue() assert b1.getvalue() in (b"hello", b"goodbye") assert b2.getvalue() in (b"hello", b"goodbye") sync.state._assert_index_is_correct() def test_sync_conflict_path(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff" remote_path1 = "/remote/stuff" local_path2 = "/local/stuff-l" remote_path2 = "/remote/stuff-r" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) rinfo = sync.providers[REMOTE].info_path(remote_path1) assert len(sync.state.get_all()) == 1 ent = sync.state.get_all().pop() sync.providers[REMOTE].log_debug_state("BEFORE") new_oid_l = sync.providers[LOCAL].rename(linfo.oid, local_path2) new_oid_r = sync.providers[REMOTE].rename(rinfo.oid, remote_path2) sync.providers[REMOTE].log_debug_state("AFTER") sync.state.update(LOCAL, FILE, path=local_path2, oid=new_oid_l, hash=linfo.hash, prior_oid=linfo.oid) assert len(sync.state.get_all()) == 1 assert ent[REMOTE].oid == new_oid_r sync.state.update(REMOTE, FILE, path=remote_path2, oid=new_oid_r, hash=rinfo.hash, prior_oid=rinfo.oid) assert len(sync.state.get_all()) == 1 log.debug("TABLE 0:\n%s", sync.state.pretty_print()) # currently defers to the alphabetcially greater name, rather than conflicting sync.run_until_found((LOCAL, "/local/stuff-r")) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) assert not sync.providers[LOCAL].exists_path(local_path1) assert not sync.providers[LOCAL].exists_path(local_path2) sync.state._assert_index_is_correct() def test_sync_conflict_path_combine(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path3 = "/local/stuff" remote_path3 = "/remote/stuff" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo2 = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello")) # inserts info about some local path sync.state.update(LOCAL, FILE, path=local_path1, oid=linfo1.oid, hash=linfo1.hash) sync.state.update(REMOTE, FILE, path=remote_path2, oid=rinfo2.oid, hash=rinfo2.hash) sync.run_until_found((REMOTE, remote_path1), (LOCAL, local_path2)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) new_oid1 = sync.providers[LOCAL].rename(linfo1.oid, local_path3) prior_oid = sync.providers[LOCAL].oid_is_path and linfo1.oid sync.state.update(LOCAL, FILE, path=local_path3, oid=new_oid1, prior_oid=prior_oid) new_oid2 = sync.providers[REMOTE].rename(rinfo2.oid, remote_path3) prior_oid = sync.providers[REMOTE].oid_is_path and rinfo2.oid sync.state.update(REMOTE, FILE, path=remote_path3, oid=new_oid2, prior_oid=prior_oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((LOCAL, "/local/stuff.conflicted")) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) PKiVOMM$cloudsync/tests/fixtures/__init__.pyfrom .util import * from .mock_provider import * from .mock_storage import * PKoiOξ88)cloudsync/tests/fixtures/mock_provider.pyimport pytest import time import copy import logging from hashlib import md5 from typing import Dict, List, Any, Optional, Generator from re import split from cloudsync.event import Event from cloudsync.provider import Provider from cloudsync.types import OInfo, OType, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError log = logging.getLogger(__name__) class MockFSObject: # pylint: disable=too-few-public-methods FILE = 'mock file' DIR = 'mock dir' def __init__(self, path, object_type, oid_is_path, contents=None, mtime=None): # self.display_path = path # TODO: used for case insensitive file systems if contents is None and type == MockFSObject.FILE: contents = b"" self.path = path.rstrip("/") self.contents = contents self.oid = path if oid_is_path else str(id(self)) self.exists = True self.type = object_type self.mtime = mtime or time.time() @property def otype(self): if self.type == self.FILE: return OType.FILE else: return OType.DIRECTORY def hash(self) -> Optional[bytes]: if self.type == self.DIR: return None return md5(self.contents).digest() def update(self): self.mtime = time.time() def copy(self): return copy.copy(self) class MockEvent: # pylint: disable=too-few-public-methods ACTION_CREATE = "provider create" ACTION_RENAME = "provider rename" ACTION_UPDATE = "provider modify" ACTION_DELETE = "provider delete" def __init__(self, action, target_object: MockFSObject, prior_oid=None): self._target_object = copy.copy(target_object) self._action = action self._prior_oid = prior_oid self._timestamp = time.time() def serialize(self): ret_val = {"action": self._action, "id": self._target_object.oid, "object type": self._target_object.type, "path": self._target_object.path, "mtime": self._target_object.mtime, "prior_oid": self._prior_oid, "trashed": not self._target_object.exists, } return ret_val class MockProvider(Provider): connected = True # TODO: normalize names to get rid of trailing slashes, etc. def __init__(self, oid_is_path, case_sensitive): super().__init__() log.debug("mock mode: o:%s, c:%s", oid_is_path, case_sensitive) self.oid_is_path = oid_is_path self.case_sensitive = case_sensitive self._fs_by_path: Dict[str, MockFSObject] = {} self._fs_by_oid: Dict[str, MockFSObject] = {} self._events: List[MockEvent] = [] self._latest_event = -1 self._cursor = -1 self._type_map = { MockFSObject.FILE: OType.FILE, MockFSObject.DIR: OType.DIRECTORY, } def _register_event(self, action, target_object, prior_oid=None): event = MockEvent(action, target_object, prior_oid) self._events.append(event) target_object.update() self._latest_event = len(self._events) - 1 def _get_by_oid(self, oid): # TODO: normalize the path, support case insensitive lookups, etc self._api() return self._fs_by_oid.get(oid, None) def _get_by_path(self, path): # TODO: normalize the path, support case insensitive lookups, etc self._api() return self._fs_by_path.get(path, None) def _store_object(self, fo: MockFSObject): # TODO: support case insensitive storage assert fo.path == fo.path.rstrip("/") self._fs_by_path[fo.path] = fo self._fs_by_oid[fo.oid] = fo def _unstore_object(self, fo: MockFSObject): # TODO: do I need to check if the path and ID exist before del to avoid a key error, # or perhaps just catch and swallow that exception? del self._fs_by_path[fo.path] del self._fs_by_oid[fo.oid] def _translate_event(self, pe: MockEvent) -> Event: event = pe.serialize() provider_type = event.get("object type", None) standard_type = self._type_map.get(provider_type, None) assert standard_type oid = event.get("id", None) mtime = event.get("mtime", None) trashed = event.get("trashed", None) prior_oid = event.get("prior_oid", None) path = None if self.oid_is_path: path = event.get("path") retval = Event(standard_type, oid, path, None, not trashed, mtime, prior_oid) return retval def _api(self, *args, **kwargs): pass def events(self): self._api() done = False found = False while self._cursor < self._latest_event: self._cursor += 1 pe = self._events[self._cursor] yield self._translate_event(pe) def walk(self, path, since=None): # TODO: implement "since" parameter self._api() now = time.time() for obj in self._fs_by_oid.values(): if self.is_subpath(path, obj.path, strict=False): yield Event(obj.otype, obj.oid, obj.path, obj.hash(), obj.exists, obj.mtime) def upload(self, oid, file_like, metadata=None) -> OInfo: self._api() file = self._fs_by_oid.get(oid, None) if file is None or not file.exists: raise CloudFileNotFoundError(oid) if file.type != MockFSObject.FILE: raise CloudFileExistsError("Only files may be uploaded, and %s is not a file" % file.path) contents = file_like.read() file.contents = contents self._register_event(MockEvent.ACTION_UPDATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def listdir(self, oid) -> Generator[DirInfo, None, None]: folder_obj = self._get_by_oid(oid) if not (folder_obj and folder_obj.exists and folder_obj.type == MockFSObject.DIR): raise CloudFileNotFoundError(oid) path = folder_obj.path for obj in self._fs_by_oid.values(): if obj.exists: relative = self.is_subpath(path, obj.path, strict=True) if relative: relative = relative.lstrip("/") if "/" not in relative: yield DirInfo(otype=obj.otype, oid=obj.oid, hash=obj.hash(), path=obj.path, name=relative) def create(self, path, file_like, metadata=None) -> OInfo: # TODO: store the metadata self._api() file = self._get_by_path(path) if file is not None and file.exists: raise CloudFileExistsError("Cannot create, '%s' already exists" % file.path) self._verify_parent_folder_exists(path) if file is None or not file.exists: file = MockFSObject(path, MockFSObject.FILE, self.oid_is_path) self._store_object(file) file.contents = file_like.read() file.exists = True log.debug("created %s %s", file.oid, file.type) self._register_event(MockEvent.ACTION_CREATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def download(self, oid, file_like): self._api() file = self._fs_by_oid.get(oid, None) if file is None or file.exists is False: raise CloudFileNotFoundError(oid) file_like.write(file.contents) def rename(self, oid, new_path) -> str: log.debug("renaming %s -> %s", oid, new_path) self._api() # TODO: folders are implied by the path of the file... # actually check to make sure the folder exists and raise a FileNotFound if not object_to_rename = self._fs_by_oid.get(oid, None) if not (object_to_rename and object_to_rename.exists): raise CloudFileNotFoundError(oid) possible_conflict = self._get_by_path(new_path) self._verify_parent_folder_exists(new_path) if possible_conflict and possible_conflict.exists: if possible_conflict.type != object_to_rename.type: log.debug("rename %s:%s conflicts with existing object of another type", oid, object_to_rename.path) raise CloudFileExistsError(new_path) if possible_conflict.type == MockFSObject.DIR: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError(new_path) except StopIteration: pass # Folder is empty, rename over it no problem else: raise CloudFileExistsError(new_path) log.debug("secretly deleting folder%s", new_path) self.delete(possible_conflict.oid) if object_to_rename.path == new_path: return oid prior_oid = None if self.oid_is_path: prior_oid = object_to_rename.oid if object_to_rename.type == MockFSObject.FILE: self._rename_single_object(object_to_rename, new_path) else: # object to rename is a directory old_path = object_to_rename.path for obj in list(self._fs_by_oid.values()): if self.is_subpath(old_path, obj.path): new_obj_path = self.replace_path(obj.path, old_path, new_path) self._rename_single_object(obj, new_obj_path) assert NotImplementedError() if self.oid_is_path: assert object_to_rename.oid != prior_oid, "rename %s to %s" % (prior_oid, new_path) self._register_event(MockEvent.ACTION_RENAME, object_to_rename, prior_oid) return object_to_rename.oid def _rename_single_object(self, source_object: MockFSObject, destination_path): destination_path = destination_path.rstrip("/") # This will assume all validation has already been done, and just rename the thing # without trying to rename contents of folders, just rename the object itself log.debug("renaming %s to %s", source_object.path, destination_path) prior_oid = source_object.oid if self.oid_is_path else None self._unstore_object(source_object) source_object.path = destination_path if self.oid_is_path: source_object.oid = destination_path self._store_object(source_object) self._register_event(MockEvent.ACTION_RENAME, source_object, prior_oid) log.debug("rename complete %s", source_object.path) self.log_debug_state() def mkdir(self, path) -> str: self._api() self._verify_parent_folder_exists(path) file = self._get_by_path(path) if file and file.exists: if file.type == MockFSObject.FILE: raise CloudFileExistsError(path) else: log.debug("Skipped creating already existing folder: %s", path) return file.oid new_fs_object = MockFSObject(path, MockFSObject.DIR, self.oid_is_path) self._store_object(new_fs_object) self._register_event(MockEvent.ACTION_CREATE, new_fs_object) return new_fs_object.oid def delete(self, oid): log.debug("delete %s", oid) self._api() file = self._fs_by_oid.get(oid, None) log.debug("got %s", file) if file and file.exists: if file.otype == OType.DIRECTORY: try: next(self.listdir(file.oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, file.path)) except StopIteration: pass # Folder is empty, delete it no problem else: path = file.path if file else "" log.debug("Deleting non-existent oid %s:%s ignored", oid, path) return None file.exists = False self._register_event(MockEvent.ACTION_DELETE, file) def exists_oid(self, oid): self._api() file = self._fs_by_oid.get(oid, None) return file is not None and file.exists def exists_path(self, path) -> bool: self._api() file = self._get_by_path(path) return file is not None and file.exists def hash_oid(self, oid) -> Any: file = self._fs_by_oid.get(oid, None) if file and file.exists: return file.hash() else: return None def info_path(self, path): self._api() file: MockFSObject = self._get_by_path(path) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def info_oid(self, oid): self._api() file: MockFSObject = self._fs_by_oid.get(oid, None) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) # @staticmethod # def _slurp(path): # with open(path, "rb") as x: # return x.read() # # @staticmethod # def _burp(path, contents): # with open(path, "wb") as x: # x.write(contents) def log_debug_state(self, msg=""): log.debug("%s: mock provider state %s", msg, list(self.walk("/"))) ################### def mock_provider_instance(oid_is_path, case_sensitive): prov = MockProvider(oid_is_path, case_sensitive) prov.event_timeout = 1 prov.event_sleep = 0.001 prov.creds = {} return prov @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider(request): return mock_provider_instance(*request.param) @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider_generator(request): return lambda oid_is_path=None, case_sensitive=None: \ mock_provider_instance( request.param[0] if oid_is_path is None else oid_is_path, request.param[1] if case_sensitive is None else case_sensitive) def test_mock_basic(): """ basic spot-check, more tests are in test_providers with mock as one of the providers """ from io import BytesIO m = MockProvider() info = m.create("/hi.txt", BytesIO(b'hello')) assert info.hash assert info.oid b = BytesIO() m.download(info.oid, b) assert b.getvalue() == b'hello' PKoiOG5S(cloudsync/tests/fixtures/mock_storage.pyfrom threading import Lock from typing import Dict, Any, Tuple import logging from cloudsync import CloudSync, SyncState, Storage, LOCAL, REMOTE log = logging.getLogger(__name__) class MockStorage(Storage): # Does not actually persist the data... but it's just a mock top_lock = Lock() lock_dict = dict() def __init__(self, storage_dict: Dict[str, Dict[int, bytes]]): self.storage_dict = storage_dict self.cursor: int = 0 # the next eid def _get_internal_storage(self, tag: str) -> Tuple[Lock, Dict[int, bytes]]: with self.top_lock: lock: Lock = self.lock_dict.setdefault(tag, Lock()) return lock, self.storage_dict.setdefault(tag, dict()) def create(self, tag: str, serialization: bytes) -> Any: lock, storage = self._get_internal_storage(tag) with lock: current_index = self.cursor self.cursor += 1 storage[current_index] = serialization return current_index def update(self, tag: str, serialization: bytes, eid: Any): lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) storage[eid] = serialization def delete(self, tag: str, eid: Any): lock, storage = self._get_internal_storage(tag) log.debug("deleting eid%s", eid) with lock: if eid not in storage: log.debug("ignoring delete: id %s doesn't exist" % eid) return del storage[eid] def read_all(self, tag: str) -> Dict[Any, bytes]: lock, storage = self._get_internal_storage(tag) with lock: ret: Dict[Any, bytes] = storage.copy() return ret PKiVOoo cloudsync/tests/fixtures/util.pyimport pytest import os import tempfile import shutil from inspect import getframeinfo, stack import logging from cloudsync.provider import Provider log = logging.getLogger(__name__) log.setLevel(logging.INFO) class Util: def __init__(self): self.base = tempfile.mkdtemp(suffix=".cloudsync") log.debug("temp files will be in: %s", self.base) @staticmethod def get_context(level): caller = getframeinfo(stack()[level+1][0]) return caller def temp_file(self, *, fill_bytes=None): # pretty names for temps caller = self.get_context(1) fn = os.path.basename(caller.filename) if not fn: fn = "unk" else: fn = os.path.splitext(fn)[0] func = caller.function name = fn + '-' + func + "." + os.urandom(16).hex() fp = Provider.join(self.base, name) if fill_bytes is not None: with open(fp, "wb") as f: f.write(os.urandom(fill_bytes)) log.debug("temp file %s", fp) return fp def do_cleanup(self): shutil.rmtree(self.base) @pytest.fixture(scope="module") def util(request): # user can override at the module level or class level # if tehy want to look at the temp files made cleanup = getattr(getattr(request, "cls", None), "util_cleanup", True) if cleanup: cleanup = getattr(request.module, "util_cleanup", True) u = Util() yield u if cleanup: u.do_cleanup() def test_util(util): log.setLevel(logging.DEBUG) f = util.temp_file(fill_bytes=32) assert len(open(f, "rb").read()) == 32 PKiVO%cloudsync/tests/providers/__init__.pyPKoiO?~%%$cloudsync/tests/providers/dropbox.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.dropbox import DropboxProvider def dropbox_creds(): token_set = os.environ.get("DROPBOX_TOKEN") if not token_set: return None tokens = token_set.split(",") creds = { "key": tokens[random.randrange(0, len(tokens))], } return creds def dropbox_provider(): cls = DropboxProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = dropbox_creds() return cls() @pytest.fixture def cloudsync_provider(): return dropbox_provider() def test_connect(): creds = dropbox_creds() if not creds: pytest.skip('requires dropbox token and client secret') sync_root = "/" + os.urandom(16).hex() gd = DropboxProvider(sync_root) gd.connect(creds) assert gd.client quota = gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass PKoiO@#cloudsync/tests/providers/gdrive.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.gdrive import GDriveProvider # move this to provider ci_creds() function? def gdrive_creds(): token_set = os.environ.get("GDRIVE_TOKEN") cli_sec = os.environ.get("GDRIVE_CLI_SECRET") if not token_set or not cli_sec: return None tokens = token_set.split(",") creds = { "refresh_token": tokens[random.randrange(0, len(tokens))], "client_secret": cli_sec, "client_id": '433538542924-ehhkb8jn358qbreg865pejbdpjnm31c0.apps.googleusercontent.com', } return creds def gdrive_provider(): cls = GDriveProvider cls.event_timeout = 60 cls.event_sleep = 2 cls.creds = gdrive_creds() return cls() @pytest.fixture def cloudsync_provider(): gdrive_provider() def test_connect(): creds = gdrive_creds() if not creds: pytest.skip('requires gdrive token and client secret') sync_root = "/" + os.urandom(16).hex() gd = GDriveProvider(sync_root) gd.connect(creds) assert gd.client quota = gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass PKoiOjK!cloudsync/tests/providers/mock.pyimport os import random import pytest from ..fixtures.mock_provider import MockProvider @pytest.fixture def cloudsync_provider(request): cls = MockProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = {} return cls PK!H!J4P*cloudsync-0.1.3.dist-info/entry_points.txtN+I/N.,()J/M)Kr3%%H ~ Everyone is permitted to copy and distribute verbatim copies of this licensedocument, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, “this License” refers to version 3 of the GNU Lesser General Public License, and the “GNU GPL” refers to version 3 of the GNU General Public License. “The Library” refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An “Application” is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A “Combined Work” is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the “Linked Version”. The “Minimal Corresponding Source” for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The “Corresponding Application Code” for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License “or any later version” applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. PK!HPOcloudsync-0.1.3.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H2r{"cloudsync-0.1.3.dist-info/METADATAUMo6W MEd.M=4SCI$=993o| zQ /?:I::Zs(;2:9?m6PB'wZ*LPAK_h=Wj01bïgن"-XV4B2;k[oߖ_\FwpףgX!UC2VZtpmc􀮴x.ArOڣ>Zl+^ Z"9*஭$!!ԂLtRO΄p( t-pλЮJý( 'Ɨ^:dxM;wY&$oa^!6Ÿx93X-57VӾ{`NH0nSo\t^84*Ԫ j(qU KARPwO^ZGox͟ !KKc=s/EPK!HK! cloudsync-0.1.3.dist-info/RECORDǶJ}%A"J $&,@(2<}^ns|z&_ڵ믨j5ѿ7ߺ  ا4%A.+GZEUt&@?/,j:hK.ap A0!!Fm|:ew=tRt*-L&(m|N{&DP@ҟb-$bD1¢C*LEUM` $cX_teMf+cJ3g|{mq4#qsGݦjf7y5ȫxOqi]}b @Q)y 0z3ɶ9ڮ>0l<0~G2` $.eSrSԦ6RWVd6iAi{R OyB.N= ZJR~S&߫ jBDv\o4$o FZr}q'3oYwB3kw)ZȂMtQw)ƮrGE{ I} T=CtM! 6vWq"8ˆ{i1=s=}WwAmy(&M$}ߞ0yͿ'ɸB3czn<űt gR{xˆh/dN8[O)iyҦ;q[ _ |:8Yv & {I 4`&*!]؝Q_{i@~]p *c58[ToF:S S]<_PK@O+cloudsync/__init__.pyPKoiO@cloudsync/command.pyPKoiOcloudsync/cs.pyPKoiO  cloudsync/event.pyPKoiOá2;;cloudsync/exceptions.pyPKoiOL6gOPPFcloudsync/muxer.pyPKoiO_cloudsync/provider.pyPKoiO3.cloudsync/runnable.pyPKoiOYQQ5cloudsync/types.pyPKiVOw.HHf8cloudsync/providers/__init__.pyPKoiOJ=EE8cloudsync/providers/dropbox.pyPKoiO37W7W~cloudsync/providers/gdrive.pyPKiVO_lfcloudsync/sync/__init__.pyPKoiOq_XIXI2cloudsync/sync/manager.pyPKoiO ^SS cloudsync/sync/state.pyPKoiOeEDtcloudsync/sync/util.pyPKiVO==vcloudsync/tests/__init__.pyPKoiOPt vcloudsync/tests/conftest.pyPK|}NRlwcloudsync/tests/pytest.iniPKoiO;<%~/~/xcloudsync/tests/test_cs.pyPKoiO@L>cloudsync/tests/test_events.pyPKoiOtoBcloudsync/tests/test_mux.pyPKoiO2ַ Ecloudsync/tests/test_provider.pyPKoiOOG=G=Tcloudsync/tests/test_sync.pyPKiVOMM$ cloudsync/tests/fixtures/__init__.pyPKoiOξ88)cloudsync/tests/fixtures/mock_provider.pyPKoiOG5S(cloudsync/tests/fixtures/mock_storage.pyPKiVOoo cloudsync/tests/fixtures/util.pyPKiVO%cloudsync/tests/providers/__init__.pyPKoiO?~%%$cloudsync/tests/providers/dropbox.pyPKoiO@#?cloudsync/tests/providers/gdrive.pyPKoiOjK!cloudsync/tests/providers/mock.pyPK!H!J4P*cloudsync-0.1.3.dist-info/entry_points.txtPK0O+XX!Dcloudsync-0.1.3.dist-info/LICENSEPK!HPOcloudsync-0.1.3.dist-info/WHEELPK!H2r{"hcloudsync-0.1.3.dist-info/METADATAPK!HK! # cloudsync-0.1.3.dist-info/RECORDPK%%