PKL9OѧHSScloudsync/__init__.py""" cloudsync enables simple cloud file-level sync with a variety of cloud providers External modules: cloudsync.Event cloudsync.Provider cloudsync.Sync Example: import cloudsync prov = cloudsync.Provider('GDrive', token="237846234236784283") info = prov.upload(file, "/dest") print ("id of /dest is %s, hash of /dest is %s" % (info.id, info.hash)) Command-line example: cloudsync -p gdrive --token "236723782347823642786" -f ~/gdrive-folder --daemon """ __version__ = "0.2.16" from pystrict import strict, StrictError # must be imported before other cloudsync imports from .log import logger # import modules into top level for convenience from .provider import * from .event import * from .sync import * from .exceptions import * from .types import * from .cs import * from .command import main if __name__ == "__main__": main() PKL9Os+cloudsync/command.py#import argparse import os def main(): # parser = argparse.ArgumentParser(description='Process some integers.') # parser.add_argument('command', type=str, help='Command to run') # parser.add_argument('args', nargs=argparse.REMAINDER) # args = parser.parse_args() raise NotImplementedError() if __name__ == "__main__": import sys sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) main() PKL9Oڦcloudsync/cs.pyimport threading import logging from typing import Optional, Tuple, List, IO, Any from pystrict import strict from .sync import SyncManager, SyncState, Storage from .runnable import Runnable from .event import EventManager from .provider import Provider from .log import TRACE log = logging.getLogger(__name__) @strict # pylint: disable=too-many-instance-attributes class CloudSync(Runnable): def __init__(self, providers: Tuple[Provider, Provider], roots: Optional[Tuple[str, str]] = None, storage: Optional[Storage] = None, sleep: Optional[Tuple[float, float]] = None, ): if not roots and self.translate == CloudSync.translate: # pylint: disable=comparison-with-callable raise ValueError("Either override the translate() function, or pass in a pair of roots") self.providers = providers self.roots = roots if sleep is None: sleep = (providers[0].default_sleep, providers[1].default_sleep) self.sleep = sleep # The tag for the SyncState will isolate the state of a pair of providers along with the sync roots state = SyncState(providers, storage, tag=self.storage_label(), shuffle=False) smgr = SyncManager(state, providers, self.translate, self.resolve_conflict, sleep=sleep) # for tests, make these accessible self.state = state self.smgr = smgr # the label for each event manager will isolate the cursor to the provider/login combo for that side _roots: Tuple[Optional[str], Optional[str]] if not roots: _roots = (None, None) else: _roots = roots self.emgrs: Tuple[EventManager, EventManager] = ( EventManager(smgr.providers[0], state, 0, _roots[0]), EventManager(smgr.providers[1], state, 1, _roots[1]) ) log.info("initialized sync: %s", self.storage_label()) self.sthread = None self.ethreads = (None, None) self.test_mgr_iter = None self.test_mgr_order: List[int] = [] @property def aging(self) -> float: """float: The number of seconds to wait before syncing a file. Reduces storage provider traffic at the expense of increased conflict risk. Default is based on the max(provider.default_sleep) value """ return self.smgr.aging @aging.setter def aging(self, secs: float): self.smgr.aging = secs def prioritize(self, side: int, path: str) -> None: """Move path and all children of the path to be synchronized first Args: side (int): By convention 0 is local and 1 is remote path (str): The full path to the file """ paths: List[str] = [None, None] paths[side] = path paths[1 - side] = self.translate(1 - side, path) # forge events and fill the state table with the latest info # mark all events as "aged" by setting the times to low values # to prevent parent folder punting, dfs is used as the time for i, fp in enumerate(paths): self.emgrs[i].prioritize(fp) def storage_label(self): """ Returns: str: a unique label representing this paired, translated sync Override this if if you are re-using storage and are using a rootless translate. """ # if you're using a pure translate, and not roots, you don't have to override the storage label # just don't resuse storage for the same pair of providers roots = self.roots or ('?', '?') assert self.providers[0].connection_id is not None assert self.providers[1].connection_id is not None return f"{self.providers[0].name}:{self.providers[0].connection_id}:{roots[0]}."\ f"{self.providers[1].name}:{self.providers[1].connection_id}:{roots[1]}" def walk(self): roots = self.roots or ('/', '/') for index, provider in enumerate(self.providers): for event in provider.walk(roots[index]): self.emgrs[index].process_event(event) def translate(self, side: int, path: str): """Override this method to translate between local and remote paths By default uses `self.roots` to strip the path provided, and join the result to the root of the other side. If `self.roots` is None, this function must be overridden. Example: translate(REMOTE, "/home/full/local/path.txt") -> "/cloud/path.txt" Args: side: either 0 (LOCAL) or 1 (REMOTE) path: a path valid in the (1-side) provider Returns: The path, valid for the provider[side], or None to mean "don't sync" """ if not self.roots: raise ValueError("Override translate function or provide root paths") relative = self.providers[1-side].is_subpath(self.roots[1-side], path) if not relative: log.log(TRACE, "%s is not subpath of %s", path, self.roots[1-side]) return None return self.providers[side].join(self.roots[side], relative) def resolve_conflict(self, f1: IO, f2: IO) -> Tuple[Any, bool]: # pylint: disable=no-self-use, unused-argument """Override this method to handle conflict resolution of files Note: - f1 and f2 are file-likes that will block on read, and can possibly pull data from the network, internet, etc - f1 and f2 also support the .path property to get a relative path to the file - f1 and f2 also support the .side property Returns: A tuple of (result, keep) or None, meaning there is no good resolution result is one of: - A "merged" file-like which should be used as the data to replace both f1/f2 with - One of f1 or f2, which is selected as the correct version keep is true if we want to keep the old version of the file around as a .conflicted file, else False """ return None @property def change_count(self): return self.smgr.change_count def start(self, **kwargs): # override Runnable start/stop so that events can be processed in separate threads self.sthread = threading.Thread(target=self.smgr.run, kwargs={'sleep': 0.1, **kwargs}, daemon=True) self.ethreads = ( threading.Thread(target=self.emgrs[0].run, kwargs={'sleep': self.sleep[0], **kwargs}, daemon=True), threading.Thread(target=self.emgrs[1].run, kwargs={'sleep': self.sleep[1], **kwargs}, daemon=True) ) self.sthread.start() self.ethreads[0].start() self.ethreads[1].start() def stop(self, forever=True): log.info("stopping sync: %s", self.storage_label()) self.smgr.stop(forever=forever) self.emgrs[0].stop(forever=forever) self.emgrs[1].stop(forever=forever) if self.sthread: self.sthread.join() self.ethreads[0].join() self.ethreads[1].join() self.sthread = None # for tests, make this manually runnable def do(self): # imports are in the body of this test-only function import random # pylint: disable=import-outside-toplevel mgrs = [*self.emgrs, self.smgr] random.shuffle(mgrs) for m in mgrs: m.do() # conceptually this should work, but our tests rely on changeset_len # instead we need to expose a stuff-to-do property in cs # if self.test_mgr_iter: # try: # r = next(self.test_mgr_iter) # except StopIteration: # r = random.randint(0, 2) # else: # r = random.randint(0, 2) # self.test_mgr_order.append(r) # mgrs[r].do() def done(self): self.smgr.done() self.emgrs[0].done() self.emgrs[1].done() PKL9OAmmcloudsync/event.pyimport logging import time from typing import TYPE_CHECKING, Optional from dataclasses import dataclass from pystrict import strict from .exceptions import CloudTemporaryError, CloudDisconnectedError, CloudCursorError from .runnable import Runnable from .muxer import Muxer from .types import OType if TYPE_CHECKING: from cloudsync.sync import SyncState from cloudsync import Provider log = logging.getLogger(__name__) @dataclass class Event: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id path: Optional[str] # path hash: Optional[bytes] # fsobject hash (better name: ohash) exists: Optional[bool] mtime: Optional[float] = None prior_oid: Optional[str] = None # path basesd systems use this on renames new_cursor: Optional[str] = None @strict # pylint: disable=too-many-instance-attributes class EventManager(Runnable): def __init__(self, provider: "Provider", state: "SyncState", side: int, walk_root: Optional[str] = None): log.debug("provider %s, root %s", provider.name, walk_root) self.provider = provider assert self.provider.connection_id self.label = f"{self.provider.name}:{self.provider.connection_id}" self.events = Muxer(provider.events, restart=True) self.state = state self.side = side self._cursor_tag = self.label + "_cursor" self.walk_root = None self._walk_tag = None self.cursor = self.state.storage_get_data(self._cursor_tag) if self.cursor is not None: log.debug("retrieved existing cursor %s for %s", self.cursor, self.provider.name) try: self.provider.current_cursor = self.cursor except CloudCursorError as e: log.exception(e) self.cursor = None if walk_root: self._walk_tag = self.label + "_walked_" + walk_root if self.cursor is None or self.state.storage_get_data(self._walk_tag) is None: self.walk_root = walk_root if self.cursor is None: self.cursor = provider.current_cursor if self.cursor is not None: self.state.storage_update_data(self._cursor_tag, self.cursor) self.min_backoff = provider.default_sleep / 10 self.max_backoff = provider.default_sleep * 4 self.mult_backoff = 2 self.backoff = self.min_backoff def do(self): self.events.shutdown = False try: if self.walk_root: log.debug("walking all %s/%s files as events, because no working cursor on startup", self.provider.name, self.walk_root) for event in self.provider.walk(self.walk_root): self.process_event(event) self.state.storage_update_data(self._walk_tag, time.time()) self.walk_root = None self.backoff = self.min_backoff for event in self.events: if not event: log.error("%s got BAD event %s", self.label, event) continue self.process_event(event) current_cursor = self.provider.current_cursor if current_cursor != self.cursor: self.state.storage_update_data(self._cursor_tag, current_cursor) self.cursor = current_cursor except CloudDisconnectedError: try: time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) log.info("reconnect to %s", self.provider.name) # TODO: this will pop an oauth if there is a CloudTokenError on reconnect. create a mechanism # to pass authentication problems to the consumer and allow them to decide what to do self.provider.reconnect() except Exception as e: log.error("can't reconnect to %s: %s", self.provider.name, e) def _drain(self): # for tests, delete events for _ in self.events: pass def process_event(self, event: Event): with self.state.lock: log.debug("%s got event %s", self.label, event) path = event.path exists = event.exists otype = event.otype if not event.path and not self.state.lookup_oid(self.side, event.oid): try: info = self.provider.info_oid(event.oid) if info: if info.otype != event.otype: log.warning("provider %s gave a bad event: %s != %s, using %s", self.provider.name, info.path, event.otype, info.otype) path = info.path otype = info.otype except CloudTemporaryError: pass self.state.update(self.side, otype, event.oid, path=path, hash=event.hash, exists=exists, prior_oid=event.prior_oid) def prioritize(self, path): # depth first search index, so parent folder happen first priority_order = 1 check_parent = path while check_parent: parent = self.provider.dirname(check_parent) if path == parent or not parent: break ents = self.state.lookup_path(self.side, parent) if not ents: break for e in ents: if e[self.side].changed: e[self.side].changed = priority_order priority_order += 1 check_parent = parent for event in self.provider.walk(path): self.process_event(event) ent = self.state.lookup_oid(self.side, event.oid) # move up to process now... ent[self.side].changed = priority_order priority_order += 1 def stop(self, forever=True): if forever: self.events.shutdown = True super().stop(forever=forever) PKL9O`#JJcloudsync/exceptions.pyclass CloudException(Exception): def __init__(self, *args, original_exception=None): super().__init__(*args) self.original_exception = original_exception class CloudFileNotFoundError(CloudException): # ENOENT pass class CloudTemporaryError(CloudException): pass class CloudOutOfSpaceError(CloudTemporaryError): # ENOSPC pass class CloudFileExistsError(CloudException): pass class CloudTokenError(CloudException): pass class CloudDisconnectedError(CloudException): pass class CloudCursorError(CloudException): pass PKL9OÖ&##cloudsync/log.py# add TRACE named level, because libraries need it import logging logger = logging.getLogger(__package__) if isinstance(logging.getLevelName('TRACE'), str): logging.addLevelName(5, 'TRACE') # ses docs, this actually gets a number, because reasons TRACE = logging.getLevelName('TRACE') PKL9Omccloudsync/muxer.pyimport queue from threading import Lock from collections import namedtuple from typing import Dict, Any, Callable class Muxer: Entry = namedtuple('Entry', 'genref listeners, lock') already: Dict[Any, Entry] = {} top_lock = Lock() def __init__(self, func, key=None, restart=False): self.restart = restart self.func: Callable = func self.queue: queue.Queue = queue.Queue() self.shutdown = False self.key = key or func with self.top_lock: if self.key not in self.already: self.already[self.key] = self.Entry([func()], [], Lock()) ent = self.already[self.key] self.genref = ent.genref self.lock = ent.lock self.listeners = ent.listeners self.listeners.append(self) def __iter__(self): return self def __next__(self): try: e = self.queue.get_nowait() except queue.Empty: with self.lock: try: e = self.queue.get_nowait() except queue.Empty: try: e = next(self.genref[0]) for other in self.listeners: if other is not self: other.queue.put(e) except StopIteration: if self.restart and not self.shutdown: self.genref[0] = self.func() raise return e def __del__(self): with self.top_lock: try: self.listeners.remove(self) except ValueError: pass if not self.listeners and self.key in self.already: del self.already[self.key] PKL9O$$cloudsync/provider.pyfrom abc import ABC, abstractmethod import os import re import logging from typing import TYPE_CHECKING, Generator, Optional, Union, List, Any from cloudsync.types import OInfo, DIRECTORY, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError, CloudTokenError if TYPE_CHECKING: from cloudsync.event import Event log = logging.getLogger(__name__) class Provider(ABC): # pylint: disable=too-many-public-methods sep: str = '/' # path delimiter alt_sep: str = '\\' # alternate path delimiter oid_is_path: bool = False case_sensitive: bool = True win_paths: bool = False connection_id: Optional[str] = None default_sleep: float = 0.01 __creds: None @abstractmethod def _api(self, *args, **kwargs): ... def connect(self, creds): # some providers don't need connections, so just don't implement/overload this method # providers who implement connections need to set the connection_id to a value # that is unique to each connection, so that connecting to this provider # under multiple userid's will produce different connection_id's. One # suggestion is to just set the connection_id to the user's login_id self.connection_id = os.urandom(16).hex() self.__creds = creds def reconnect(self): # reuse existing credentials and reconnect # raises: CloudDisconnectedError on failure if not self.connected: self.connect(self.__creds) def disconnect(self): # disconnect from cloud self.connection_id = None @property def connected(self): return self.connection_id is not None def authenticate(self): # implement this method for providers that need authentication pass def connect_or_authenticate(self, creds): # This won't attempt oauth unless the specific failure to connect is an authentication error try: self.connect(creds) except CloudTokenError: creds = self.authenticate() # pylint: disable=assignment-from-no-return self.connect(creds) return creds @property @abstractmethod def name(self): ... @property @abstractmethod def latest_cursor(self): ... @property @abstractmethod def current_cursor(self) -> Any: ... @current_cursor.setter def current_cursor(self, val: Any) -> None: # pylint: disable=no-self-use, unused-argument ... @abstractmethod def events(self) -> Generator["Event", None, None]: ... @abstractmethod def walk(self, path, since=None): # Test that the root path does not show up in the walk ... @abstractmethod def upload(self, oid, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def create(self, path, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def download(self, oid, file_like): ... @abstractmethod def rename(self, oid, path) -> str: # TODO: test that a renamed file can be renamed again # TODO: test that renaming a folder renames the children in the state file ... @abstractmethod def mkdir(self, path) -> str: ... @abstractmethod def delete(self, oid): ... @abstractmethod def exists_oid(self, oid): ... @abstractmethod def exists_path(self, path) -> bool: ... @abstractmethod def listdir(self, oid) -> Generator[DirInfo, None, None]: ... def hash_oid(self, oid) -> Optional[bytes]: # TODO add a test to FNFE info = self.info_oid(oid) return info.hash if info else None @staticmethod @abstractmethod def hash_data(file_like) -> Union[str, bytes]: ... @abstractmethod def info_path(self, path: str) -> Optional[OInfo]: ... @abstractmethod def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: ... # CONVENIENCE def download_path(self, path, io): info = self.info_path(path) if not info or not info.oid: raise CloudFileNotFoundError() self.download(info.oid, io) # HELPER @classmethod def join(cls, *paths): res = "" rl: List[str] = [] for path in paths: if not path or path == cls.sep: continue if isinstance(path, str): rl = rl + [path.strip(cls.sep).strip(cls.alt_sep)] continue for sub_path in path: if sub_path is None or sub_path == cls.sep or sub_path == cls.alt_sep: continue rl = rl + [sub_path.strip(cls.sep)] if not rl: return cls.sep res = cls.sep.join(rl) if not cls.win_paths or res[1] != ':': res = cls.sep + res return res def split(self, path): # todo cache regex index = path.rfind(self.sep) if self.alt_sep: index = max(index, path.rfind(self.alt_sep)) if index == -1: return path, "" if index == 0: return self.sep, path[index+1:] return path[:index], path[index+1:] def normalize_path(self, path: str): norm_path = path.rstrip(self.sep) if self.sep in ["\\", "/"]: parts = re.split(r'[\\/]+', norm_path) else: parts = re.split(r'[%s]+' % self.sep, norm_path) norm_path = self.join(*parts) if not self.case_sensitive: norm_path = norm_path.lower() return norm_path def is_subpath(self, folder, target, sep=None, alt_sep=None, strict=False): sep = sep or self.sep alt_sep = alt_sep or self.alt_sep if alt_sep: folder = folder.replace(alt_sep, sep) target = target.replace(alt_sep, sep) # Will return True for is-same-path in addition to target folder_full = str(folder) folder_full = folder_full.rstrip(sep) target_full = str(target) target_full = target_full.rstrip(sep) # .lower() instead of normcase because normcase will also mess with separators if not self.case_sensitive: folder_full_case = folder_full.lower() target_full_case = target_full.lower() else: folder_full_case = folder_full target_full_case = target_full # target is same as folder, or target is a subpath (ensuring separator is there for base) if folder_full_case == target_full_case: return False if strict else sep elif len(target_full) > len(folder_full) and target_full[len(folder_full)] == sep: if target_full_case.startswith(folder_full_case): return target_full[len(folder_full):] else: return False return False def replace_path(self, path, from_dir, to_dir): relative = self.is_subpath(from_dir, path) if relative: retval = to_dir + (relative if relative != self.sep else "") return retval if relative != "" else self.sep raise ValueError("replace_path used without subpath") def paths_match(self, patha, pathb): if patha is None and pathb is None: return True elif patha is None or pathb is None: return False return self.normalize_path(patha) == self.normalize_path(pathb) def dirname(self, path: str): ret, _ = self.split(path) return ret def basename(self, path: str): _, ret = self.split(path) return ret def _verify_parent_folder_exists(self, path): parent_path = self.dirname(path) if parent_path != self.sep: parent_obj = self.info_path(parent_path) if parent_obj is None: # perhaps this should separate "FileNotFound" and "non-folder parent exists" # and raise different exceptions raise CloudFileNotFoundError(parent_path) if parent_obj.otype != DIRECTORY: raise CloudFileExistsError(parent_path) def mkdirs(self, path): log.debug("mkdirs %s", path) try: oid = self.mkdir(path) # todo update state except CloudFileExistsError: # todo: mabye CloudFileExistsError needs to have an oid and/or path in it # at least optionally info = self.info_path(path) if info and info.otype == DIRECTORY: oid = info.oid else: raise except CloudFileNotFoundError: ppath, _ = self.split(path) if ppath == path: raise log.debug("mkdirs parent, %s", ppath) unused_oid = self.mkdirs(ppath) try: oid = self.mkdir(path) # todo update state except CloudFileNotFoundError: # when syncing, file exists seems to hit better conflict code for these sorts of situations # but this is a guess. todo: scenarios that make this happen raise CloudFileExistsError("f'ed up mkdir") return oid PKL9Ocloudsync/py.typedPKL9O`X cloudsync/runnable.pyimport time from abc import ABC, abstractmethod import threading import logging log = logging.getLogger(__name__) def time_helper(timeout, sleep=None, multiply=1): forever = not timeout end = forever or time.monotonic() + timeout while forever or end >= time.monotonic(): yield True if sleep is not None: time.sleep(sleep) sleep = sleep * multiply raise TimeoutError() class Runnable(ABC): stopped = False shutdown = False wakeup = False thread = None def run(self, *, timeout=None, until=None, sleep=0.01): self.stopped = False self.wakeup = False endtime = sleep + time.monotonic() for _ in time_helper(timeout, sleep=.01): while time.monotonic() < endtime and not self.stopped and not self.wakeup: time.sleep(max(0, min(.01, endtime - time.monotonic()))) self.wakeup = False if self.stopped: break try: self.do() except Exception: log.exception("unhandled exception in %s", self.__class__) if self.stopped or (until is not None and until()): break endtime = sleep + time.monotonic() if self.shutdown: self.done() def wake(self): self.wakeup = True def start(self, **kwargs): self.thread = threading.Thread(target=self.run, kwargs=kwargs, daemon=True) self.thread.start() @abstractmethod def do(self): ... def stop(self, forever=True): self.stopped = True self.shutdown = forever if self.thread: self.thread.join() self.thread = None elif forever: self.done() def done(self): # cleanup code goes here pass def test_runnable(): class TestRun(Runnable): def __init__(self): self.cleaned = False self.called = 0 def do(self): self.called += 1 def done(self): self.cleaned = True testrun = TestRun() testrun.run(timeout=0.02, sleep=0.001) assert testrun.called testrun.called = 0 testrun.run(until=lambda: testrun.called == 1) assert testrun.called == 1 thread = threading.Thread(target=testrun.run) thread.start() testrun.stop() thread.join(timeout=1) assert testrun.stopped == 1 PKL9Odmcloudsync/scramble.pyimport random def scramble(gen, buffer_size): buf = [] i = iter(gen) while True: try: e = next(i) buf.append(e) if len(buf) >= buffer_size: choice = random.randint(0, len(buf)-1) buf[-1], buf[choice] = buf[choice], buf[-1] yield buf.pop() except StopIteration: random.shuffle(buf) yield from buf return PKL9O2cloudsync/types.pyfrom typing import Optional from enum import Enum from dataclasses import dataclass class OType(Enum): DIRECTORY = "dir" FILE = "file" NOTKNOWN = "trashed" class IgnoreReason(Enum): NONE = "none" TRASHED = "trashed" CONFLICT = "conflict" TEMP_RENAME = "temp rename" IRRELEVANT = "irrelevant" DIRECTORY = OType.DIRECTORY FILE = OType.FILE NOTKNOWN = OType.NOTKNOWN # only allowed for deleted files! @dataclass class OInfo: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id hash: Optional[bytes] # fsobject hash (better name: ohash) path: Optional[str] # path @dataclass class DirInfo(OInfo): name: Optional[str] = None mtime: Optional[float] = None shared: bool = False readonly: bool = False PKL9OTաcloudsync/utils.pyimport logging from hashlib import md5 from base64 import b64encode from typing import Any, Union, List, Dict log = logging.getLogger(__name__) MAX_DEBUG_STR = 64 def _debug_arg(val: Any): ret: Any = val if isinstance(val, dict): r: Dict[Any, Any] = {} for k, v in val.items(): r[k] = _debug_arg(v) ret = r elif isinstance(val, str): if len(val) > 64: ret = val[0:61] + "..." elif isinstance(val, bytes): if len(val) > 64: ret = val[0:61] + b"..." else: try: rlist: List[Any] = [] for v in iter(val): rlist.append(_debug_arg(v)) ret = rlist except TypeError: pass return ret # prevents very long arguments from getting logged def debug_args(*stuff: Any): if log.isEnabledFor(logging.DEBUG): r = _debug_arg(stuff) if len(r) == 1: r = r[0] return r return "N/A" # useful for converting oids and pointer nubmers into digestible nonces def debug_sig(t: Any, size: int = 3) -> Union[str, int]: if not t: return 0 return b64encode(md5(str(t).encode()).digest()).decode()[0:size] PKL9Oc`88cloudsync/oauth/__init__.pyfrom .redir_server import * from .oauth_config import * PKL9Oh'&&cloudsync/oauth/apiserver.pyimport re import json import traceback import time import urllib.parse as urlparse import threading import logging from enum import Enum from typing import Callable, Dict import unittest import requests # TODO: this is an inappropriate default server, default should be wsgiref builtin import waitress # TODO: caller should specify the mechanism for channel empty detection from waitress.channel import HTTPChannel log = logging.getLogger(__name__) class ApiServerLogLevel(Enum): NONE = 0 # do not log calls CALLS = 1 # log calls but not their args ARGS = 2 # log calls with args class ApiError(Exception): def __init__(self, code, msg=None, desc=None): super().__init__() self.code = code self.msg = str(msg) or "UNKNOWN" self.desc = desc def __str__(self): return f"{self.code}, {self.msg}" @classmethod def from_json(cls, error_json): return cls(error_json.get('code', 500), msg=error_json.get('msg', None), desc=error_json.get('desc', None)) def api_route(path): def outer(func): if not hasattr(func, "_routes"): setattr(func, "_routes", []) func._routes += [path] return func return outer def sanitize_for_status(e): e = e.replace("\r\n", " ") e = e.replace("\n", " ") e = e.replace("\r", " ") e = e[0:100] return e class ApiServer: def __init__(self, addr, port, headers=None, log_level=ApiServerLogLevel.ARGS): """ Create a new server on address, port. Port can be zero. from apiserver import ApiServer, ApiError, api_route Create your handlers by inheriting from ApiServer and tagging them with @api_route("/path"). Alternately you can use the ApiServer() directly, and call add_handler("path", function) Raise errors by raising ApiError(code, message, description=None) Return responses by simply returning a dict() or str() object Parameter to handlers is a dict() Query arguments are shoved into the dict via urllib.parse_qs """ self.__addr = addr self.__port = port self.__headers = headers if headers else [] self.__log_level = log_level self.__server = waitress.server.create_server(self, host=self.__addr, port=self.__port, clear_untrusted_proxy_headers=False) self.__started = False self.__routes: Dict[str, Callable] = {} self.__shutting_down = False self.__shutdown_lock = threading.Lock() # routed methods map into handler for meth in type(self).__dict__.values(): if hasattr(meth, "_routes"): for route in meth._routes: # pylint: disable=protected-access self.add_route(route, meth) def add_route(self, path, meth, content_type='application/json'): self.__routes[path] = (meth, content_type) def port(self): """Get my port""" sa = self.__server.socket.getsockname() return sa[1] def address(self): """Get my ip address""" sa = self.__server.socket.getsockname() return sa[0] def uri(self, path): """Make a URI pointing at myself""" if path[0] == "/": path = path[1:] uri = "http://" + self.__addr + ":" + str(self.port()) + "/" + path return uri def serve_forever(self): self.__started = True try: self.__server.run() except OSError: pass def __del__(self): self.shutdown() def shutdown(self): try: if self.__started: with self.__shutdown_lock: if not self.__shutting_down: self.__shutting_down = True timeout = time.time() + 2 channel: HTTPChannel for channel in list(self.__server.active_channels.values()): # Convert to a list to make a copy while channel.total_outbufs_len > 0 and time.time() < timeout: time.sleep(.01) # give any connections with a non-empty output buffer a chance to drain self.__server.socket.close() self.__server.asyncore.close_all() except Exception: log.exception("exception during shutdown") def __call__(self, env, start_response): # pylint: disable=too-many-locals, too-many-branches, too-many-statements with self.__shutdown_lock: if self.__shutting_down: raise ConnectionAbortedError('Cannot handle request while shutting down') content = b"{}" length = env.get("CONTENT_LENGTH", 0) if length: content = env['wsgi.input'].read(int(length)) if content: try: info = json.loads(content) except Exception: raise ApiError(400, "Invalid JSON " + str(content, "utf-8")) else: info = {} url = '' try: url = env.get('PATH_INFO', '/') if self.__log_level == ApiServerLogLevel.CALLS or self.__log_level == ApiServerLogLevel.ARGS: log.debug('Processing URL %s', url) handler_tmp = self.__routes.get(url) if not handler_tmp: if url[-1] == "/": tmp = url[0:-1] handler_tmp = self.__routes.get(tmp) if not handler_tmp: m = re.match(r"(.*?/)[^/]+$", url) if m: # adding a route "/" handles /foo # adding a route "/foo/bar/" handles /foo/bar/baz handler_tmp = self.__routes.get(m[1]) query = env.get('QUERY_STRING') if query: params = urlparse.parse_qs(query) else: params = {} info.update(params) if handler_tmp: handler, content_type = handler_tmp try: response = handler(env, info) if response is None: response = "" if isinstance(response, dict): response = json.dumps(response) response = bytes(str(response), "utf-8") headers = self.__headers + [('Content-Type', content_type), ("Content-Length", str(len(response)))] start_response('200 OK', headers) yield response except ApiError: raise except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) except Exception as e: raise ApiError(500, type(e).__name__ + " : " + str(e), traceback.format_exc()) else: raise ApiError(404, f"No handler for {url}") except ApiError as e: try: log.error("GET %s : ERROR : %s", url, e) response = json.dumps({"code": e.code, "msg": e.msg, "desc": e.desc}) start_response(str(e.code) + ' ' + sanitize_for_status(e.msg), [('Content-Type', 'application/json'), ("Content-Length", str(len(response)))]) yield bytes(response, "utf-8") except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) class TestApiServer(unittest.TestCase): @staticmethod def test_nostart(): httpd = ApiServer('127.0.0.1', 0) httpd.shutdown() def test_basic(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, req): # pylint: disable=no-self-argument,no-self-use return "HERE" + str(req) @api_route("/json") def json(ctx, req): # pylint: disable=no-self-argument,no-self-use _ = req return {"obj": 1} httpd = MyServer('127.0.0.1', 0) httpd.add_route("/foo", lambda ctx, x: "FOO" + x["x"][0]) try: print("serving on ", httpd.address(), httpd.port()) threading.Thread(target=httpd.serve_forever, daemon=True).start() response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.text, "HERE{}") response = requests.post(httpd.uri("/notfound"), data='{}') self.assertEqual(response.status_code, 404) response = requests.get(httpd.uri("/foo?x=4")) self.assertEqual(response.text, "FOO4") finally: httpd.shutdown() def test_error(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, unused_req): # pylint: disable=no-self-argument,no-self-use raise ApiError(501, "BLAH") httpd = MyServer('127.0.0.1', 0) try: print("serving on ", httpd.address(), httpd.port()) thread = threading.Thread(target=httpd.serve_forever, daemon=True) thread.start() response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.status_code, 501) finally: httpd.shutdown() if thread: thread.join(timeout=2) assert not thread.is_alive() if __name__ == "__main__": unittest.main() PKL9OW{|cloudsync/oauth/oauth_config.pyimport logging from typing import Optional from .redir_server import OAuthRedirServer __all__ = ["OAuthConfig"] log = logging.getLogger(__name__) # this class delibarately not strict, since it can contain provider-specific configuration # applications can derive from this class and provide appropriate defaults class OAuthConfig: def __init__(self, *, app_id: str = None, app_secret: str = None, manual_mode: bool = False, oauth_redir_server: Optional[OAuthRedirServer] = None): """ There are two ways to create an OAuthConfig object: by providing a OAuthRedirServer or by providing the success and failure callbacks, as well as changing the `use_predefined_ports` parameter if desired. :param manual_mode: :param oauth_redir_server: """ self.app_id = app_id self.app_secret = app_secret self.manual_mode = manual_mode self._oauth_redir_server = oauth_redir_server if self.manual_mode and self._oauth_redir_server: raise ValueError('Cannot use both manual mode and an oauth server') if not self.manual_mode and not self._oauth_redir_server: self._oauth_redir_server = OAuthRedirServer(html_generator=self._gen_html_response) @property def oauth_redir_server(self) -> Optional[OAuthRedirServer]: return self._oauth_redir_server def _gen_html_response(self, success: bool, err_msg: str): if success: return self.success_message() else: return self.failure_message(err_msg) @staticmethod def success_message() -> str: return 'OAuth succeeded!' @staticmethod def failure_message(error_str: str) -> str: return 'OAuth failed: {}'.format(error_str) PKL9OrZZcloudsync/oauth/redir_server.pyimport logging import sys import socket import threading import errno from typing import Callable, Any, Optional # from src import config from .apiserver import ApiServer log = logging.getLogger(__name__) __all__ = ['OAuthFlowException', 'OAuthBadTokenException', 'OAuthRedirServer'] # todo: use https://requests-oauthlib.readthedocs.io/en/latest/oauth2_workflow.html#web-application-flow # then provide tools to that provider-writers don't have to do much to get their app-specific oauth to work other than # providing the resource name, auth url and any app-specific parameters def _is_windows(): return sys.platform in ("win32", "cygwin") class OAuthFlowException(Exception): pass class OAuthBadTokenException(Exception): pass class OAuthRedirServer: PORT_MIN = 52400 PORT_MAX = 52450 GUI_TIMEOUT = 15 def __init__(self, *, html_generator: Callable[[bool, str], str] = None): self.__html_response_generator = html_generator self.__on_success: Optional[Callable[[Any], None]] = None self.__on_failure: Optional[Callable[[str], None]] = None self.__api_server: Optional[ApiServer] = None self.__thread: Optional[threading.Thread] = None self.__running = False @property def running(self): return self.__running def run(self, on_success: Callable[[Any], None], on_failure: Callable[[str], None], use_predefined_ports=False): if self.__running: raise RuntimeError('OAuth server was run() twice') if not on_success: raise ValueError('A valid on_success(...) callback is required') if not on_failure: raise ValueError('A valid on_failure(...) callback is required') self.__on_success = on_success self.__on_failure = on_failure log.debug('Creating oauth redir server') self.__running = True if use_predefined_ports: # Some providers (Dropbox) don't allow us to just use localhost # redirect. For these providers, we define a range of # 127.0.0.1:(PORT_MIN..PORT_MAX) as valid redir URLs for port in range(self.PORT_MIN, self.PORT_MAX): try: if _is_windows(): # Windows is dumb and will be weird if we just try to # connect to the port directly. Check to see if the # port is responsive before claiming it as our own try: socket.create_connection(('127.0.0.1', port), 0.001) continue except socket.timeout: pass log.debug('Attempting to start api server on port %d', port) self.__api_server = ApiServer('127.0.0.1', port) break except OSError: pass else: self.__api_server = ApiServer('127.0.0.1', 0) if not self.__api_server: raise OSError(errno.EADDRINUSE, "Unable to open any port in range 52400-52405") self.__api_server.add_route('/auth/', self.auth_redir_success, content_type='text/html') self.__api_server.add_route('/favicon.ico', lambda x, y: "", content_type='text/html') self.__thread = threading.Thread(target=self.__api_server.serve_forever, daemon=True) self.__thread.start() log.info('Listening on %s', self.__api_server.uri('/auth/')) def auth_redir_success(self, env, info): log.debug('In auth_redir_success with env=%s, info=%s', env, info) err = "" if info and ('error' in info or 'error_description' in info): err = info['error'] if 'error' in info else \ info['error_description'][0] if isinstance(err, list): err = err[0] self.__on_failure(err) return self.auth_failure(err) try: self.__on_success(info) except OAuthFlowException: log.warning('Got a page request when not in flow', exc_info=True) err = "No pending OAuth. This can happen if you refreshed this tab. " except Exception as e: log.exception('Failed to authenticate') err = 'Unknown error: %s' % e return self.auth_failure(err) if err else self.auth_success() def auth_success(self): if self.__html_response_generator: return self.__html_response_generator(True, '') return "OAuth Success" def auth_failure(self, msg): if self.__html_response_generator: return self.__html_response_generator(False, msg) return "OAuth Failure:" + msg def shutdown(self): if self.__api_server and self.__running: self.__api_server.shutdown() self.__running = False self.__on_success = None self.__on_failure = None self.__thread = None def uri(self, *args, **kwargs): return self.__api_server.uri(*args, **kwargs) def port(self): return self.__api_server.port() PKL9O 牀cloudsync/providers/__init__.pyfrom .gdrive import GDriveProvider from ..tests.fixtures.mock_provider import MockProvider from .dropbox import DropboxProvider PKL9O;N__cloudsync/providers/dropbox.pyimport io import os import time import logging import threading from hashlib import sha256 import webbrowser from typing import Generator, Optional, Dict, Any, Union from os import urandom from base64 import urlsafe_b64encode as u_b64enc import requests import arrow import dropbox from dropbox import Dropbox, exceptions, files, DropboxOAuth2Flow from dropbox.oauth import OAuth2FlowResult from cloudsync.utils import debug_args from cloudsync.oauth import OAuthConfig from cloudsync import Provider, OInfo, DIRECTORY, FILE, NOTKNOWN, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, CloudOutOfSpaceError, \ CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError, CloudCursorError log = logging.getLogger(__name__) logging.getLogger('dropbox').setLevel(logging.INFO) class _FolderIterator: def __init__(self, api, path, *, recursive, cursor=None): self.api = api self.path = path self.ls_res = None if not cursor: self.ls_res = self.api('files_list_folder', path=self.path, recursive=recursive, limit=200) else: self.ls_res = self.api('files_list_folder_continue', cursor) def __iter__(self): return self def __next__(self): if self.ls_res: if not self.ls_res.entries and self.ls_res.has_more: self.ls_res = self.api( 'files_list_folder_continue', self.ls_res.cursor) if self.ls_res and self.ls_res.entries: ret = self.ls_res.entries.pop() ret.cursor = self.ls_res.cursor return ret raise StopIteration() @property def cursor(self): return self.ls_res and self.ls_res.cursor class DropboxProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 _max_simple_upload_size = 15 * 1024 * 1024 _upload_block_size = 10 * 1024 * 1024 name = "Dropbox" _redir = 'urn:ietf:wg:oauth:2.0:oob' def __init__(self, oauth_config: Optional[OAuthConfig] = None, app_id=None, app_secret=None): super().__init__() self.__root_id = None self.__cursor = None self.__creds = None self.client = None self.api_key = None self._csrf = None self._flow = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() self._session: Dict[Any, Any] = {} self._oauth_config = oauth_config if oauth_config else OAuthConfig(app_id=app_id, app_secret=app_secret) self._oauth_done = threading.Event() @property def connected(self): return self.client is not None def get_display_name(self): return self.name def initialize(self): self._csrf = u_b64enc(urandom(32)) key = self._oauth_config.app_id secret = self._oauth_config.app_secret log.debug('Initializing Dropbox with manual mode=%s', self._oauth_config.manual_mode) if not self._oauth_config.manual_mode: try: self._oauth_config.oauth_redir_server.run( on_success=self._on_oauth_success, on_failure=self._on_oauth_failure, use_predefined_ports=True, ) if not key and secret: raise ValueError("require app key and secret") self._flow = DropboxOAuth2Flow(consumer_key=key, consumer_secret=secret, redirect_uri=self._oauth_config.oauth_redir_server.uri('/auth/'), session=self._session, csrf_token_session_key=self._csrf, locale=None) except OSError: log.exception('Unable to use redir server. Falling back to manual mode') self._oauth_config.manual_mode = False if self._oauth_config.manual_mode: if not key and secret: raise ValueError("require app key and secret") self._flow = DropboxOAuth2Flow(consumer_key=key, consumer_secret=secret, redirect_uri=self._redir, session=self._session, csrf_token_session_key=self._csrf, locale=None) url = self._flow.start() self._oauth_done.clear() webbrowser.open(url) return url def interrupt_oauth(self): if not self._oauth_config.manual_mode: self._oauth_config.oauth_redir_server.shutdown() # ApiServer shutdown does not throw exceptions self._flow = None self._oauth_done.clear() def _on_oauth_success(self, auth_dict): if auth_dict and 'state' in auth_dict and isinstance(auth_dict['state'], list): auth_dict['state'] = auth_dict['state'][0] try: res: OAuth2FlowResult = self._flow.finish(auth_dict) self.api_key = res.access_token self._oauth_done.set() except Exception: log.exception('Authentication failed') raise def _on_oauth_failure(self, err): log.error("oauth failure: %s", err) self._oauth_done.set() def authenticate(self): try: self.initialize() self._oauth_done.wait() return {"key": self.api_key, } finally: if not self._oauth_config.manual_mode: self._oauth_config.oauth_redir_server.shutdown() def get_quota(self): space_usage = self._api('users_get_space_usage') account = self._api('users_get_current_account') if space_usage.allocation.is_individual(): used = space_usage.used allocated = space_usage.allocation.get_individual().allocated else: team_allocation = space_usage.allocation.get_team() used, allocated = team_allocation.used, team_allocation.allocated res = { 'used': used, 'total': allocated, 'login': account.email, 'uid': account.account_id[len('dbid:'):] } return res def reconnect(self): self.connect_or_authenticate(self.__creds) def connect(self, creds): log.debug('Connecting to dropbox') if not self.client: self.__creds = creds api_key = creds.get('key', self.api_key) if not api_key: raise CloudTokenError() with self.mutex: self.client = Dropbox(api_key) try: quota = self.get_quota() self.connection_id = quota['login'] except Exception as e: self.disconnect() if isinstance(e, exceptions.AuthError): log.debug("auth error connecting %s", e) raise CloudTokenError() log.exception("error connecting %s", e) raise CloudDisconnectedError() self.api_key = api_key def _api(self, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") log.debug("_api: %s (%s)", method, debug_args(args, kwargs)) with self.mutex: try: return getattr(self.client, method)(*args, **kwargs) except exceptions.ApiError as e: inside_error: Union[files.LookupError, files.WriteError] if isinstance(e.error, (files.ListFolderError, files.GetMetadataError, files.ListRevisionsError)): if e.error.is_path() and isinstance(e.error.get_path(), files.LookupError): inside_error = e.error.get_path() if inside_error.is_malformed_path(): log.debug('Malformed path when executing %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'Malformed path when executing %s(%s)' % (method, kwargs)) if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.UploadError): if e.error.is_path() and isinstance(e.error.get_path(), files.UploadWriteFailed): inside_error = e.error.get_path() write_error = inside_error.reason if write_error.is_insufficient_space(): log.debug('out of space %s(%s %s) : %s', method, args, kwargs, e) raise CloudOutOfSpaceError( 'Out of space when executing %s(%s)' % (method, kwargs)) if write_error.is_conflict(): raise CloudFileExistsError( 'Conflict when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.DeleteError): if e.error.is_path_lookup(): inside_error = e.error.get_path_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.RelocationError): if e.error.is_from_lookup(): inside_error = e.error.get_from_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s,%s)' % (method, args, kwargs)) if e.error.is_to(): inside_error = e.error.get_to() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) log.debug("here") if isinstance(e.error, files.CreateFolderError): if e.error.is_path() and isinstance(e.error.get_path(), files.WriteError): inside_error = e.error.get_path() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) except (exceptions.InternalServerError, exceptions.RateLimitError, requests.exceptions.ReadTimeout): raise CloudTemporaryError() except dropbox.stone_validators.ValidationError as e: log.debug("f*ed up api error: %s", e) if "never created" in str(e): raise CloudFileNotFoundError() if "did not match" in str(e): log.warning("oid error %s", e) raise CloudFileNotFoundError() raise except requests.exceptions.ConnectionError as e: log.exception('api error handled exception %s:%s', "dropbox", e.__class__.__name__) self.disconnect() raise CloudDisconnectedError() @property def root_id(self): return "" def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('files_list_folder_get_latest_cursor', self.root_id, recursive=True, include_deleted=True, limit=200) if res: return res.cursor else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor @current_cursor.setter def current_cursor(self, val): if val is None: val = self.latest_cursor if not isinstance(val, str) and val is not None: raise CloudCursorError(val) self.__cursor = val def _events(self, cursor, path=None): # pylint: disable=too-many-branches if path and path != "/": info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) oid = info.oid else: oid = self.root_id for res in _FolderIterator(self._api, oid, recursive=True, cursor=cursor): exists = True log.debug("event %s", res) if isinstance(res, files.DeletedMetadata): # dropbox doesn't give you the id that was deleted # we need to get the ids of every revision # then find out which one was the latest before the deletion time # then get the oid for that revs = self._api('files_list_revisions', res.path_lower, limit=10) if revs is None: # dropbox will give a 409 conflict if the revision history was deleted # instead of raising an error, this gets converted to revs==None log.info("revs is none for %s %s", oid, path) continue log.debug("revs %s", revs) deleted_time = revs.server_deleted if deleted_time is None: # not really sure why this happens, but this event isn't useful without it log.error("revs %s has no deleted time?", revs) continue latest_time = None for ent in revs.entries: assert ent.server_modified is not None if ent.server_modified <= deleted_time and \ (latest_time is None or ent.server_modified >= latest_time): oid = ent.id latest_time = ent.server_modified if not oid: log.error( "skipping deletion %s, because we don't know the oid", res) continue exists = False otype = NOTKNOWN ohash = None elif isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None oid = res.id else: otype = FILE ohash = res.content_hash oid = res.id path = res.path_display event = Event(otype, oid, path, ohash, exists, time.time()) yield event if getattr(res, "cursor", False): self.__cursor = res.cursor def events(self) -> Generator[Event, None, None]: yield from self._events(self.current_cursor) def walk(self, path, since=None): yield from self._events(None, path=path) def listdir(self, oid) -> Generator[DirInfo, None, None]: yield from self._listdir(oid, recursive=False) def _listdir(self, oid, *, recursive) -> Generator[DirInfo, None, None]: info = self.info_oid(oid) for res in _FolderIterator(self._api, oid, recursive=recursive): if isinstance(res, files.DeletedMetadata): continue if isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None else: otype = FILE ohash = res.content_hash path = res.path_display oid = res.id relative = self.is_subpath(info.path, path).lstrip("/") if relative: yield DirInfo(otype, oid, ohash, path, name=relative) def create(self, path: str, file_like, metadata=None) -> OInfo: self._verify_parent_folder_exists(path) if self.exists_path(path): raise CloudFileExistsError(path) return self._upload(path, file_like, metadata) def upload(self, oid: str, file_like, metadata=None) -> OInfo: if oid.startswith(self.sep): raise CloudFileNotFoundError("Called upload with a path instead of an OID: %s" % oid) if not self.exists_oid(oid): raise CloudFileNotFoundError(oid) return self._upload(oid, file_like, metadata) def _upload(self, oid, file_like, metadata=None) -> OInfo: res = None metadata = metadata or {} file_like.seek(0, io.SEEK_END) size = file_like.tell() file_like.seek(0) if size < self._max_simple_upload_size: res = self._api('files_upload', file_like.read(), oid, mode=files.WriteMode('overwrite')) else: cursor = None while True: data = file_like.read(self._upload_block_size) if not data: if cursor: local_mtime = arrow.get(metadata.get('mtime', time.time())).datetime commit = files.CommitInfo(path=oid, mode=files.WriteMode.overwrite, autorename=False, client_modified=local_mtime, mute=True) res = self._api( 'files_upload_session_finish', data, cursor, commit ) break if not cursor: self._api('files_upload_session_start', data) cursor = files.UploadSessionCursor( res.session_id, len(data)) else: self._api('files_upload_session_append_v2', data, cursor) cursor.offset += len(data) if res is None: raise CloudFileExistsError() ret = OInfo(otype=FILE, oid=res.id, hash=res.content_hash, path=res.path_display) log.debug('upload result is %s', ret) return ret def download(self, oid, file_like): ok = self._api('files_download', oid) if not ok: raise CloudFileNotFoundError() res, content = ok for data in content.iter_content(self._upload_block_size): file_like.write(data) return OInfo(otype=FILE, oid=oid, hash=res.content_hash, path=res.path_display) def _attempt_rename_folder_over_empty_folder(self, info: OInfo, path: str) -> None: if info.otype != DIRECTORY: raise CloudFileExistsError(path) possible_conflict = self.info_path(path) if possible_conflict.otype == DIRECTORY: try: next(self._listdir(possible_conflict.oid, recursive=False)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: pass # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) self._api('files_move_v2', info.oid, path) return else: # conflict is a file, and we already know that the rename is on a folder raise CloudFileExistsError(path) def rename(self, oid, path): try: self._api('files_move_v2', oid, path) except CloudFileExistsError: old_info = self.info_oid(oid) if self.paths_match(old_info.path, path): new_info = self.info_path(path) if oid == new_info.oid and old_info.path != path: temp_path = path + "." + os.urandom(16).hex() self._api('files_move_v2', oid, temp_path) self.rename(oid, path) return oid if old_info.otype == DIRECTORY: self._attempt_rename_folder_over_empty_folder(old_info, path) else: raise return oid @staticmethod def hash_data(file_like) -> str: # get a hash from a filelike that's the same as the hash i natively use binstr = b'' while True: data = file_like.read(4 * 1024 * 1024) if not data: break binstr += sha256(data).digest() return sha256(binstr).hexdigest() def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ, unused-argument # TODO: check if a regular filesystem lets you mkdir over a non-empty folder... self._verify_parent_folder_exists(path) if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError() log.debug("Skipped creating already existing folder: %s", path) return info.oid res = self._api('files_create_folder_v2', path) log.debug("dbx mkdir %s", res) res = res.metadata return res.id def delete(self, oid): info = self.info_oid(oid) if not info: return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self._listdir(oid, recursive=False)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files_delete_v2', oid) except CloudFileNotFoundError: # shouldn't happen because we are checking above... return def exists_oid(self, oid) -> bool: return bool(self.info_oid(oid)) def info_path(self, path: str) -> Optional[OInfo]: if path == "/": return OInfo(DIRECTORY, "", None, "/") try: log.debug("res info path %s", path) res = self._api('files_get_metadata', path) log.debug("res info path %s", res) oid = res.id if oid[0:3] != 'id:': log.warning("invalid oid %s from path %s", oid, path) if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash path = res.path_display or path return OInfo(otype, oid, fhash, path) except CloudFileNotFoundError: return None def exists_path(self, path) -> bool: return self.info_path(path) is not None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: if oid == "": otype = DIRECTORY fhash = None path = "/" else: try: res = self._api('files_get_metadata', oid) log.debug("res info oid %s", res) path = res.path_display if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash except CloudFileNotFoundError: return None return OInfo(otype, oid, fhash, path) PKL9Oa8hcxcxcloudsync/providers/gdrive.pyimport io import time import logging import threading import webbrowser import hashlib from ssl import SSLError import json from typing import Generator, Optional, List, Dict, Any import arrow from googleapiclient.discovery import build # pylint: disable=import-error from googleapiclient.errors import HttpError # pylint: disable=import-error from httplib2 import Http, HttpLib2Error from oauth2client import client # pylint: disable=import-error from oauth2client.client import OAuth2WebServerFlow, HttpAccessTokenRefreshError, OAuth2Credentials # pylint: disable=import-error from googleapiclient.http import _should_retry_response # This is necessary because google masks errors from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload # pylint: disable=import-error from cloudsync.utils import debug_args from cloudsync import Provider, OInfo, DIRECTORY, FILE, NOTKNOWN, Event, DirInfo, OType from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, CloudFileNotFoundError, CloudTemporaryError, \ CloudFileExistsError, CloudCursorError, CloudOutOfSpaceError from cloudsync.oauth import OAuthConfig class GDriveFileDoneError(Exception): pass log = logging.getLogger(__name__) logging.getLogger('googleapiclient').setLevel(logging.INFO) logging.getLogger('googleapiclient.discovery').setLevel(logging.WARN) class GDriveInfo(DirInfo): # pylint: disable=too-few-public-methods pids: List[str] = [] oid: str hash: Any otype: OType path: str def __init__(self, *a, pids=None, **kws): super().__init__(*a, **kws) if pids is None: pids = [] self.pids = pids class GDriveProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 provider = 'googledrive' name = 'Google Drive' _scope = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.activity.readonly' ] _redir = 'urn:ietf:wg:oauth:2.0:oob' _token_uri = 'https://accounts.google.com/o/oauth2/token' _folder_mime_type = 'application/vnd.google-apps.folder' _io_mime_type = 'application/octet-stream' def __init__(self, oauth_config: Optional[OAuthConfig] = None, app_id: str = None, app_secret: str = None): super().__init__() self.__root_id = None self.__cursor = None self.__creds = None self.client = None self.api_key = None self.refresh_token = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() self._ids = {"/": "root"} self._trashed_ids: Dict[str, str] = {} self._flow = None self._oauth_config = oauth_config if oauth_config else OAuthConfig(app_id=app_id, app_secret=app_secret) self._oauth_done = threading.Event() @property def connected(self): return self.client is not None def get_display_name(self): return self.name def initialize(self): if not self._oauth_config.manual_mode: try: self._oauth_config.oauth_redir_server.run( on_success=self._on_oauth_success, on_failure=self._on_oauth_failure, ) self._flow = OAuth2WebServerFlow(client_id=self._oauth_config.app_id, client_secret=self._oauth_config.app_secret, scope=self._scope, redirect_uri=self._oauth_config.oauth_redir_server.uri('/auth/')) except OSError: log.exception('Unable to use redir server. Falling back to manual mode') self._oauth_config.manual_mode = False if self._oauth_config.manual_mode: self._flow = OAuth2WebServerFlow(client_id=self._oauth_config.app_id, client_secret=self._oauth_config.app_secret, scope=self._scope, redirect_uri=self._redir) url = self._flow.step1_get_authorize_url() self._oauth_done.clear() webbrowser.open(url) return url def interrupt_oauth(self): if not self._oauth_config.manual_mode: self._oauth_config.oauth_redir_server.shutdown() # ApiServer shutdown does not throw exceptions self._flow = None self._oauth_done.clear() def _on_oauth_success(self, auth_dict): auth_string = auth_dict['code'][0] try: res: OAuth2Credentials = self._flow.step2_exchange(auth_string) self.refresh_token = res.refresh_token self.api_key = res.access_token self._oauth_done.set() except Exception: log.exception('Authentication failed') raise def _on_oauth_failure(self, err): log.error("oauth failure: %s", err) self._oauth_done.set() def authenticate(self): try: self.initialize() self._oauth_done.wait() return {"refresh_token": self.refresh_token, "api_key": self.api_key, } finally: if not self._oauth_config.manual_mode: self._oauth_config.oauth_redir_server.shutdown() def get_quota(self): # https://developers.google.com/drive/api/v3/reference/about res = self._api('about', 'get', fields='storageQuota,user') quota = res['storageQuota'] user = res['user'] usage = int(quota['usage']) if 'limit' in quota and quota['limit']: limit = int(quota['limit']) else: # It is possible for an account to have unlimited space - pretend it's 1TB limit = 1024 * 1024 * 1024 * 1024 res = { 'used': usage, 'total': limit, 'login': user['emailAddress'], 'uid': user['permissionId'] } return res def reconnect(self): self.connect(self.__creds) def connect(self, creds): log.debug('Connecting to googledrive') if not self.client: api_key = creds.get('api_key', self.api_key) refresh_token = creds.get('refresh_token', self.refresh_token) self.__creds = creds if not refresh_token: new_creds = self.authenticate() self.__creds = new_creds api_key = new_creds.get('api_key', None) refresh_token = new_creds.get('refresh_token', None) kwargs = {} if (not self._oauth_config.app_id or not self._oauth_config.app_secret) and not api_key: raise ValueError("require app_id/secret or api_key") try: with self.mutex: creds = client.GoogleCredentials(access_token=api_key, client_id=self._oauth_config.app_id, client_secret=self._oauth_config.app_secret, refresh_token=refresh_token, token_expiry=None, token_uri=self._token_uri, user_agent=self.user_agent) creds.refresh(Http()) self.client = build( 'drive', 'v3', http=creds.authorize(Http())) kwargs['api_key'] = creds.access_token if getattr(creds, 'refresh_token', None): refresh_token = creds.refresh_token self.refresh_token = refresh_token self.api_key = api_key quota = None try: quota = self.get_quota() except SSLError: # pragma: no cover # Seeing some intermittent SSL failures that resolve on retry log.warning('Retrying intermittent SSLError') quota = self.get_quota() self.connection_id = quota['login'] except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() return self.client @staticmethod def _get_reason_from_http_error(e): # gets a default something (actually the message, not the reason) using their secret interface reason = e._get_reason() # pylint: disable=protected-access # parses the JSON of the content to get the reason from where it really lives in the content try: # this code was copied from googleapiclient/http.py:_should_retry_response() data = json.loads(e.content.decode('utf-8')) if isinstance(data, dict): reason = data['error']['errors'][0]['reason'] else: reason = data[0]['error']['errors']['reason'] except (UnicodeDecodeError, ValueError, KeyError): log.warning('Invalid JSON content from response: %s', e.content) return reason @staticmethod def __escape(filename: str): ret = filename ret = ret.replace("\\", "\\\\") ret = ret.replace("'", "\\'") return ret def _api(self, resource, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") with self.mutex: try: if resource == 'media': res = args[0] args = args[1:] else: res = getattr(self.client, resource)() meth = getattr(res, method)(*args, **kwargs) if resource == 'media' or (resource == 'files' and method == 'get_media'): ret = meth else: ret = meth.execute() log.debug("api: %s (%s) -> %s", method, debug_args(args, kwargs), ret) return ret except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() except SSLError as e: if "WRONG_VERSION" in str(e): # httplib2 used by google's api gives this weird error for no discernable reason raise CloudTemporaryError(str(e)) raise except HttpError as e: if str(e.resp.status) == '416': raise GDriveFileDoneError() if str(e.resp.status) == '413': raise CloudOutOfSpaceError('Payload too large') if str(e.resp.status) == '409': raise CloudFileExistsError('Another user is modifying') if str(e.resp.status) == '404': raise CloudFileNotFoundError('File not found when executing %s.%s(%s)' % ( resource, method, kwargs )) reason = self._get_reason_from_http_error(e) if str(e.resp.status) == '403' and str(reason) == 'storageQuotaExceeded': raise CloudOutOfSpaceError("Storage storageQuotaExceeded") if str(e.resp.status) == '403' and str(reason) == 'parentNotAFolder': raise CloudFileExistsError("Parent Not A Folder") if (str(e.resp.status) == '403' and reason in ('userRateLimitExceeded', 'rateLimitExceeded', 'dailyLimitExceeded')) \ or str(e.resp.status) == '429': raise CloudTemporaryError("rate limit hit") # At this point, _should_retry_response() returns true for error codes >=500, 429, and 403 with # the reason 'userRateLimitExceeded' or 'rateLimitExceeded'. 403 without content, or any other # response is not retried. We have already taken care of some of those cases above, but we call this # below to catch the rest, and in case they improve their library with more conditions. If we called # meth.execute() above with a num_retries argument, all this retrying would happen in the google api # library, and we wouldn't have to think about retries. should_retry = _should_retry_response(e.resp.status, e.content) if should_retry: raise CloudTemporaryError("unknown error %s" % e) log.error("Unhandled %s error %s", e.resp.status, reason) raise except (TimeoutError, HttpLib2Error): self.disconnect() raise CloudDisconnectedError("disconnected on timeout") @property def root_id(self): if not self.__root_id: res = self._api('files', 'get', fileId='root', fields='id', ) self.__root_id = res['id'] self._ids['/'] = self.__root_id return self.__root_id def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('changes', 'getStartPageToken') if res: return res.get('startPageToken') else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor @current_cursor.setter def current_cursor(self, val): if val is None: val = self.latest_cursor if not isinstance(val, str) and val is not None: raise CloudCursorError(val) self.__cursor = val def events(self) -> Generator[Event, None, None]: # pylint: disable=too-many-locals, too-many-branches page_token = self.current_cursor while page_token is not None: # log.debug("looking for events, timeout: %s", timeout) response = self._api('changes', 'list', pageToken=page_token, spaces='drive', includeRemoved=True, includeItemsFromAllDrives=True, supportsAllDrives=True) new_cursor = response.get('newStartPageToken', None) for change in response.get('changes'): log.debug("got event %s", change) # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T16:57:06.779Z', # 'removed': False, 'fileId': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'file': {'kind': 'drive#file', # 'id': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'name': 'dest', 'mimeType': 'application/octet-stream'}} # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T20:02:14.156Z', # 'removed': True, 'fileId': '1lhRe0nDplA6I5JS18642rg0KIbYN66lR'} ts = arrow.get(change.get('time')).float_timestamp oid = change.get('fileId') exists = not change.get('removed') fil = change.get('file') if fil: if fil.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE else: otype = NOTKNOWN ohash = None path = self._path_oid(oid, use_cache=False) event = Event(otype, oid, path, ohash, exists, ts, new_cursor=new_cursor) remove = [] for cpath, coid in self._ids.items(): if coid == oid: if cpath != path: remove.append(cpath) if path and otype == DIRECTORY and self.is_subpath(path, cpath): remove.append(cpath) for r in remove: self._ids.pop(r, None) if path: self._ids[path] = oid log.debug("converted event %s as %s", change, event) yield event if new_cursor and page_token and new_cursor != page_token: self.__cursor = new_cursor page_token = response.get('nextPageToken') def _walk(self, path, oid): for ent in self.listdir(oid): current_path = self.join(path, ent.name) event = Event(otype=ent.otype, oid=ent.oid, path=current_path, hash=ent.hash, exists=True, mtime=time.time()) log.debug("walk %s", event) yield event if ent.otype == DIRECTORY: if self.exists_oid(ent.oid): yield from self._walk(current_path, ent.oid) def walk(self, path, since=None): info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) yield from self._walk(path, info.oid) def __prep_upload(self, path, metadata): # modification time mtime = metadata.get("modifiedTime", time.time()) mtime = arrow.get(mtime).isoformat() gdrive_info = { 'modifiedTime': mtime } # mime type, if provided mime_type = metadata.get("mimeType", None) if mime_type: gdrive_info['mimeType'] = mime_type # path, if provided if path: _, name = self.split(path) gdrive_info['name'] = name # misc properties, if provided app_props = metadata.get("appProperties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['appProperties'] = app_props # misc properties, if provided app_props = metadata.get("properties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['properties'] = app_props log.debug("info %s", gdrive_info) return gdrive_info def upload(self, oid, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(None, metadata) file_like.seek(0, io.SEEK_END) file_size = file_like.tell() file_like.seek(0, io.SEEK_SET) chunksize = 4 * 1024 * 1024 resumable = file_size > chunksize ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=chunksize, resumable=resumable) fields = 'id, md5Checksum' res = self._api('files', 'update', body=gdrive_info, fileId=oid, media_body=ul, fields=fields) log.debug("response from upload %s", res) if not res: raise CloudTemporaryError("unknown response from drive on upload") md5 = res.get('md5Checksum', None) # can be none if the user tries to upload to a folder if md5 is None: possible_conflict = self._info_oid(oid) if possible_conflict and possible_conflict.otype == DIRECTORY: raise CloudFileExistsError("Can only upload to a file: %s" % possible_conflict.path) return OInfo(otype=FILE, oid=res['id'], hash=md5, path=None) def create(self, path, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(path, metadata) if self.exists_path(path): raise CloudFileExistsError() ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' parent_oid = self.get_parent_id(path) gdrive_info['parents'] = [parent_oid] res = self._api('files', 'create', body=gdrive_info, media_body=ul, fields=fields) log.debug("response from create %s : %s", path, res) if not res: raise CloudTemporaryError("unknown response from drive on upload") self._ids[path] = res['id'] log.debug("path cache %s", self._ids) return OInfo(otype=FILE, oid=res['id'], hash=res['md5Checksum'], path=path) def download(self, oid, file_like): req = self._api('files', 'get_media', fileId=oid) dl = MediaIoBaseDownload(file_like, req, chunksize=4 * 1024 * 1024) done = False while not done: try: _, done = self._api('media', 'next_chunk', dl) except GDriveFileDoneError: done = True def rename(self, oid, path): # pylint: disable=too-many-locals, too-many-branches pid = self.get_parent_id(path) add_pids = [pid] if pid == 'root': add_pids = [self.root_id] info = self._info_oid(oid) if info is None: log.debug("can't rename, oid doesn't exist %s", oid) raise CloudFileNotFoundError(oid) remove_pids = info.pids old_path = info.path _, name = self.split(path) body = {'name': name} if self.exists_path(path): possible_conflict = self.info_path(path) if FILE in (info.otype, possible_conflict.otype): if possible_conflict.oid != oid: # it's OK to rename a file over itself, frex, to change case raise CloudFileExistsError(path) else: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: # Folder is empty, rename over it no problem if possible_conflict.oid != oid: # delete the target if we're not just changing case self.delete(possible_conflict.oid) if not old_path: for cpath, coid in list(self._ids.items()): if coid == oid: old_path = cpath if add_pids == remove_pids: add_pids_str = "" remove_pids_str = "" else: add_pids_str = ",".join(add_pids) remove_pids_str = ",".join(remove_pids) self._api('files', 'update', body=body, fileId=oid, addParents=add_pids_str, removeParents=remove_pids_str, fields='id') if old_path: for cpath, coid in list(self._ids.items()): relative = self.is_subpath(old_path, cpath) if relative: new_cpath = self.join(path, relative) self._ids.pop(cpath) self._ids[new_cpath] = coid log.debug("renamed %s -> %s", oid, body) return oid def listdir(self, oid) -> Generator[GDriveInfo, None, None]: query = f"'{oid}' in parents" try: res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, name, mimeType, trashed, shared, capabilities)', pageToken=None) except CloudFileNotFoundError: if self._info_oid(oid): return log.debug("listdir oid gone %s", oid) raise if not res or not res['files']: if self.exists_oid(oid): return raise CloudFileNotFoundError(oid) log.debug("listdir got res %s", res) for ent in res['files']: fid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent['name'] shared = ent['shared'] readonly = not ent['capabilities']['canEdit'] trashed = ent.get('trashed', False) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE if not trashed: yield GDriveInfo(otype, fid, fhash, None, shared=shared, readonly=readonly, pids=pids, name=name) def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError(path) log.debug("Skipped creating already existing folder: %s", path) return info.oid pid = self.get_parent_id(path) _, name = self.split(path) file_metadata = { 'name': name, 'parents': [pid], 'mimeType': self._folder_mime_type, } if metadata: file_metadata.update(metadata) res = self._api('files', 'create', body=file_metadata, fields='id') fileid = res.get('id') self._ids[path] = fileid return fileid def delete(self, oid): info = self.info_oid(oid) if not info: log.debug("deleted non-existing oid %s", oid) return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self.listdir(oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files', 'delete', fileId=oid) except CloudFileNotFoundError: log.debug("deleted non-existing oid %s", oid) for currpath, curroid in list(self._ids.items()): if curroid == oid: self._trashed_ids[currpath] = self._ids[currpath] del self._ids[currpath] def exists_oid(self, oid): return self._info_oid(oid) is not None def info_path(self, path: str) -> Optional[OInfo]: # pylint: disable=too-many-locals if path == "/": return self.info_oid(self.root_id) try: parent_id = self.get_parent_id(path) _, name = self.split(path) escaped_name = self.__escape(name) query = f"'{parent_id}' in parents and name='{escaped_name}'" res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, mimeType, trashed, name, shared, capabilities)', pageToken=None) except CloudFileNotFoundError: return None if not res['files']: return None ent = res['files'][0] if ent.get('trashed'): # TODO: # need to write a tests that moves files to the trash, as if a user moved the file to the trash # then assert it shows up "file not found" in all queries return None log.debug("res is %s", res) oid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent.get('name') # query is insensitive to certain features of the name # ....cache correct basename path = self.join(self.dirname(path), name) shared = ent['shared'] readonly = not ent['capabilities']['canEdit'] if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE self._ids[path] = oid return GDriveInfo(otype, oid, fhash, path, shared=shared, readonly=readonly, pids=pids) def exists_path(self, path) -> bool: if path in self._ids: return True return self.info_path(path) is not None def get_parent_id(self, path): if not path: return None parent, _ = self.split(path) if parent == path: return self._ids.get(parent) # get the latest version of the parent path # it may have changed, or case may be different, etc. info = self.info_path(parent) if not info: raise CloudFileNotFoundError("parent %s must exist" % parent) # cache the latest version return self._ids[info.path] def _path_oid(self, oid, info=None, use_cache=True) -> Optional[str]: """convert oid to path""" if use_cache: for p, pid in self._ids.items(): if pid == oid: return p for p, pid in self._trashed_ids.items(): if pid == oid: return p # todo, better cache, keep up to date, etc. if not info: info = self._info_oid(oid) if info and info.pids and info.name: ppath = self._path_oid(info.pids[0]) if ppath: path = self.join(ppath, info.name) self._ids[path] = oid return path return None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: info = self._info_oid(oid) if info is None: return None # expensive path = self._path_oid(oid, info, use_cache=use_cache) ret = OInfo(info.otype, info.oid, info.hash, path) log.debug("info oid ret: %s", ret) return ret @staticmethod def hash_data(file_like) -> str: # get a hash from a filelike that's the same as the hash i natively use md5 = hashlib.md5() for c in iter(lambda: file_like.read(32768), b''): md5.update(c) return md5.hexdigest() def _info_oid(self, oid) -> Optional[GDriveInfo]: try: res = self._api('files', 'get', fileId=oid, fields='name, md5Checksum, parents, mimeType, trashed, shared, capabilities', ) except CloudFileNotFoundError: return None log.debug("info oid %s", res) if res.get('trashed'): return None pids = res.get('parents') fhash = res.get('md5Checksum') name = res.get('name') shared = res['shared'] readonly = not res['capabilities']['canEdit'] if res.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE return GDriveInfo(otype, oid, fhash, None, shared=shared, readonly=readonly, pids=pids, name=name) PKL9O/cloudsync/sync/__init__.py__all__ = ['SyncManager', 'SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY', 'UNKNOWN'] from .manager import * from .state import * PKL9Ot6!cloudsync/sync/manager.py#pylint: disable=too-many-lines import os import logging import tempfile import shutil import time from typing import Tuple, Optional, Callable, TYPE_CHECKING, List, Dict, Any __all__ = ['SyncManager'] from pystrict import strict from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError, CloudTemporaryError, CloudDisconnectedError, CloudOutOfSpaceError, CloudException from cloudsync.types import DIRECTORY, FILE, IgnoreReason from cloudsync.runnable import Runnable from cloudsync.log import TRACE from cloudsync.utils import debug_sig from .state import SyncState, SyncEntry, SideState, TRASHED, EXISTS, LOCAL, REMOTE, UNKNOWN if TYPE_CHECKING: from cloudsync.provider import Provider log = logging.getLogger(__name__) # useful for converting oids and pointer nubmers into digestible nonces FINISHED = 1 REQUEUE = 0 def other_side(index): return 1-index @strict class ResolveFile(): def __init__(self, info: SideState, provider: 'Provider'): self.info = info self.provider = provider self.path = info.path self.side = info.side self.otype = info.otype self.__temp_file = info.temp_file if self.otype == FILE: assert info.temp_file self.__fh = None def download(self): if not os.path.exists(self.__temp_file): try: with open(self.__temp_file + ".tmp", "wb") as f: self.provider.download(self.info.oid, f) os.rename(self.__temp_file + ".tmp", self.__temp_file) except Exception as e: log.debug("error downloading %s", e) try: os.unlink(self.__temp_file) except FileNotFoundError: pass raise return self.__temp_file @property def fh(self): if not self.__fh: self.download() # NOOP if it was already downloaded self.__fh = open(self.__temp_file, "rb") log.debug("ResolveFile opening temp file %s for real file %s", self.__temp_file, self.path) return self.__fh def read(self, *a): return self.fh.read(*a) def write(self, buf): # we don't want this raise NotImplementedError() def close(self): # don't use self.fh, use self.__fh. No need to download and open it, just to close it if self.__fh: log.debug("ResolveFile closing temp file %s for real file %s", self.__temp_file, self.path) self.__fh.close() def seek(self, *a): return self.fh.seek(*a) def tell(self): return self.fh.tell() @strict # pylint: disable=too-many-public-methods, too-many-instance-attributes class SyncManager(Runnable): def __init__(self, state: SyncState, providers: Tuple['Provider', 'Provider'], translate: Callable, resolve_conflict: Callable, sleep: Optional[Tuple[float, float]] = None): self.state: SyncState = state self.providers: Tuple['Provider', 'Provider'] = providers self.translate = translate self._resolve_conflict = resolve_conflict self.tempdir = tempfile.mkdtemp(suffix=".cloudsync") if not sleep: # these are the event sleeps, but really we need more info than this sleep = (self.providers[LOCAL].default_sleep, self.providers[REMOTE].default_sleep) self.sleep = sleep # TODO: we need sync_aging, backoff_min, backoff_max, backoff_mult documented with an interface and tests! #### self.min_backoff = 0.0 self.max_backoff = 0.0 self.backoff = 0.0 max_sleep = max(sleep) # on sync fail, use the worst time for backoff self.aging = max_sleep / 5 # how long before even trying to sync self.min_backoff = max_sleep / 10.0 # event sleep of 15 seconds == 1.5 second backoff on failures self.max_backoff = max_sleep * 4.0 # escalating up to a 1 minute wait time self.backoff = self.min_backoff self.mult_backoff = 2 assert len(self.providers) == 2 def set_resolver(self, resolver): self._resolve_conflict = resolver def in_backoff(self): return self.backoff > self.min_backoff def do(self): with self.state.lock: sync: SyncEntry = self.state.change(self.aging) if sync: try: self.sync(sync) self.state.storage_update(sync) self.backoff = self.min_backoff except (CloudTemporaryError, CloudDisconnectedError, CloudOutOfSpaceError) as e: log.warning( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted) time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) except Exception as e: log.exception( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted, stack_info=True) sync.punt() time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) def done(self): log.info("cleanup %s", self.tempdir) try: shutil.rmtree(self.tempdir) except FileNotFoundError: pass def change_count(self, side: Optional[int] = None, unverified: bool = False): count = 0 sides: Tuple[int, ...] if side is None: sides = (LOCAL, REMOTE) else: sides = (side, ) if unverified: for i in sides: count += self.state.changeset_len else: for e in self.state.changes: for i in sides: if e[i].path and e[i].changed: translated_path = self.translate(other_side(i), e[i].path) if translated_path: count += 1 break return count def path_conflict(self, ent): # both are synced have_paths = ent[0].path and ent[1].path if not have_paths: return False are_synced = (ent[0].sync_hash and ent[1].sync_hash) or (ent[0].otype == DIRECTORY and ent[1].otype == DIRECTORY) if not are_synced: return False both_exist = ent[0].exists == EXISTS and ent[0].exists == EXISTS if not both_exist: return False return not self.providers[0].paths_match(ent[0].path, ent[0].sync_path) and \ not self.providers[1].paths_match(ent[1].path, ent[1].sync_path) def check_revivify(self, sync: SyncEntry): if sync.is_trashed: for i in (LOCAL, REMOTE): changed = i synced = other_side(i) se = sync[changed] if not se.changed or se.sync_path or not se.oid or se.exists == TRASHED or sync.is_conflicted: continue looked_up_sync = self.state.lookup_oid(changed, sync[changed].oid) if looked_up_sync and looked_up_sync != sync: continue provider_info = self.providers[changed].info_oid(sync[changed].oid) if not provider_info: continue provider_path = provider_info.path if not provider_path: continue translated_path = self.translate(synced, provider_path) if sync.is_irrelevant and translated_path and not sync[changed].sync_path: # was irrelevant, but now is relevant log.debug(">>>about to embrace %s", sync) log.debug(">>>Suddenly a cloud path %s, creating", provider_path) sync.ignored = IgnoreReason.NONE sync[changed].sync_path = None def sync(self, sync: SyncEntry): self.check_revivify(sync) if sync.is_trashed: self.finished(LOCAL, sync) self.finished(REMOTE, sync) return sync.get_latest() if sync.hash_conflict(): log.debug("handle hash conflict") self.handle_hash_conflict(sync) return if self.path_conflict(sync) and not sync.is_temp_rename: log.debug("handle path conflict") self.handle_path_conflict(sync) return log.log(TRACE, "table\r\n%s", self.state.pretty_print()) for i in (LOCAL, REMOTE): if sync[i].changed: if sync[i].hash is None and sync[i].otype == FILE and sync[i].exists == EXISTS: log.debug("ignore:%s, side:%s", sync, i) # no hash for file, ignore it self.finished(i, sync) break if sync[i].oid is None and sync[i].exists != TRASHED: log.debug("ignore:%s, side:%s", sync, i) self.finished(i, sync) continue # if the other side changed hash, handle it first if sync[i].hash == sync[i].sync_hash: other = other_side(i) if sync[other].changed and sync[other].hash != sync[other].sync_hash: continue response = self.embrace_change(sync, i, other_side(i)) if response == FINISHED: self.finished(i, sync) break def temp_file(self, temp_for=None): if not os.path.exists(self.tempdir): # in case user deletes it... recreate os.mkdir(self.tempdir) # prefer big random name over NamedTemp which can infinite loop ret = os.path.join(self.tempdir, os.urandom(16).hex()) log.debug("tempdir %s -> %s", self.tempdir, ret) if temp_for: log.debug("%s is temped by %s", temp_for, ret) return ret def finished(self, side, sync): sync[side].changed = 0 # todo: changing the state above should signal this call below self.state.finished(sync) # todo: likewise... clearing the changebit is enough to know to clear temps self.clean_temps(sync) @staticmethod def clean_temps(sync): # todo: move this to the sync obj for side in (LOCAL, REMOTE): if sync[side].temp_file: try: os.unlink(sync[side].temp_file) except FileNotFoundError: pass except OSError as e: log.debug("exception unlinking %s", e) except Exception as e: # any exceptions here are pointless log.warning("exception unlinking %s", e) sync[side].temp_file = None def download_changed(self, changed, sync): sync[changed].temp_file = sync[changed].temp_file or self.temp_file(temp_for=sync[changed].path) assert sync[changed].oid if os.path.exists(sync[changed].temp_file): return True try: partial_temp = sync[changed].temp_file + ".tmp" log.debug("%s download %s to %s", self.providers[changed], sync[changed].oid, partial_temp) with open(partial_temp, "wb") as f: self.providers[changed].download(sync[changed].oid, f) os.rename(partial_temp, sync[changed].temp_file) return True except PermissionError as e: raise CloudTemporaryError("download or rename exception %s" % e) except CloudFileNotFoundError: log.debug("download from %s failed fnf, switch to not exists", self.providers[changed].name) sync[changed].exists = TRASHED return False def get_folder_file_conflict(self, sync: SyncEntry, translated_path: str, synced: int) -> SyncEntry: # if a non-dir file exists with the same name on the sync side syents: List[SyncEntry] = list(self.state.lookup_path(synced, translated_path)) conflicts = [ent for ent in syents if ent[synced].exists != TRASHED and ent != sync and ent[synced].otype != DIRECTORY] nc: List[SyncEntry] = [] for ent in conflicts: info = self.providers[synced].info_oid(ent[synced].oid) if not info: ent[synced].exists = TRASHED else: nc.append(ent) return nc[0] if nc else None def mkdir_synced(self, changed, sync, translated_path): synced = other_side(changed) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] if ents: for ent in ents: if ent[changed].otype == DIRECTORY: # these we can toss, they are other folders # keep the current one, since it exists for sure log.debug("discard %s", ent) ent.ignore(IgnoreReason.TRASHED) ents = [ent for ent in ents if TRASHED not in ( ent[changed].exists, ent[synced].exists)] if ents: if not sync.punted: log.debug("punt mkdir") sync.punt() return REQUEUE log.debug("rename to fix conflict %s", translated_path) self.rename_to_fix_conflict(sync, synced, translated_path) try: log.debug("translated %s as path %s", sync[changed].path, translated_path) # could have made a dir that already existed on my side or other side chents = list(self.state.lookup_path(changed, sync[changed].path)) notme_chents = [ent for ent in chents if ent != sync] for ent in notme_chents: # dup dirs on remote side can be ignored if ent[synced].otype == DIRECTORY: log.debug("discard duplicate dir entry %s", ent) ent.ignore(IgnoreReason.TRASHED) chent: SyncEntry = self.get_folder_file_conflict(sync, translated_path, synced) if chent: log.debug("resolve %s conflict with %s", translated_path, chent) # pylint bugs here... no idea why self.resolve_conflict((sync[changed], chent[synced])) # pylint: disable=unsubscriptable-object return FINISHED # make the dir oid = self.providers[synced].mkdirs(translated_path) log.debug("mkdir %s as path %s oid %s", self.providers[synced].name, translated_path, debug_sig(oid)) # did i already have that oid? if so, chuck it already_dir = self.state.lookup_oid(synced, oid) if already_dir and already_dir != sync and already_dir[synced].otype == DIRECTORY: log.debug("discard %s", already_dir) already_dir.ignore(IgnoreReason.TRASHED) sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=oid, path=translated_path) return FINISHED except CloudFileNotFoundError: if not sync.punted: sync.punt() return REQUEUE log.debug("mkdir %s : %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].name, translated_path) raise NotImplementedError("TODO mkdir, and make state etc") def upload_synced(self, changed, sync): assert sync[changed].temp_file synced = other_side(changed) try: info = self.providers[synced].upload( sync[synced].oid, open(sync[changed].temp_file, "rb")) log.debug("upload to %s as path %s", self.providers[synced].name, sync[synced].sync_path) sync[synced].hash = info.hash sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = sync[synced].path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path) return True except FileNotFoundError: log.info("FNF during upload %s:%s", sync[synced].sync_path, sync[changed].temp_file) return False except CloudFileNotFoundError: info = self.providers[synced].info_oid(sync[synced].oid) if not info: log.debug("convert to unsynced") sync[synced].exists = TRASHED else: # you got an FNF during upload, but when you queried... it existed # basically this is just a "retry" or something log.warning("Upload to %s failed fnf, info: %s", self.providers[synced].name, info) return False except CloudFileExistsError: # this happens if the remote oid is a folder log.debug("split bc upload to folder") defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) return self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def _create_synced(self, changed, sync, translated_path): synced = other_side(changed) log.debug("create on %s as path %s", self.providers[synced].name, translated_path) try: with open(sync[changed].temp_file, "rb") as f: info = self.providers[synced].create(translated_path, f) log.debug("created %s", info) except CloudFileExistsError: log.debug("exists error %s", translated_path) info = self.providers[synced].info_path(translated_path) if not info: raise with open(sync[changed].temp_file, "rb") as f: existing_hash = self.providers[synced].hash_data(f) if existing_hash != info.hash: raise log.debug("use existing %s", info) except Exception as e: log.debug("failed to create %s, %s", translated_path, e) raise assert info.hash assert sync[changed].hash sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = translated_path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path, hash=info.hash) def update_entry(self, ent, side, oid, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin # updates entry without marking as changed unless explicit # used internally self.state.update_entry(ent, side, oid, path=path, hash=hash, exists=exists, changed=changed, otype=otype) def change_state(self, side, otype, oid, *, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin # looks up oid and changes state, marking changed as if it's an event # used only for testing self.state.update(side, otype, oid, path=path, hash=hash, exists=exists, prior_oid=prior_oid) def create_synced(self, changed, sync, translated_path): synced = other_side(changed) try: self._create_synced(changed, sync, translated_path) return FINISHED except CloudFileNotFoundError: # parent presumably exists parent = self.providers[changed].dirname(sync[changed].path) log.debug("make %s first before %s", parent, sync[changed].path) ents = self.state.lookup_path(changed, parent) if not ents: info = self.providers[changed].info_path(parent) if info: self.state.update(changed, DIRECTORY, info.oid, path=parent) else: log.info("no info and no dir, ignoring?") else: if not ents[0][changed].changed: # Clear the sync_path so we will recognize that this dir needs to be created ents[0][changed].sync_path = None self.update_entry(ents[0], changed, ents[0][changed].oid, changed=True) log.debug("updated entry %s", parent) sync.punt() return REQUEUE except CloudFileExistsError: # there's a file or folder in the way, let that resolve if possible log.debug("can't create %s, try punting", translated_path) if sync.punted > 0: info = self.providers[synced].info_path(translated_path) if not info: log.debug("got a file exists, and then it didn't exist %s", sync) sync.punt() return REQUEUE sync[synced].oid = info.oid sync[synced].hash = info.hash sync[synced].path = translated_path self.update_entry(sync, synced, info.oid, path=translated_path) # maybe it's a hash conflict sync.punt() return REQUEUE else: # maybe it's a name conflict sync.punt() return REQUEUE def __resolve_file_likes(self, side_states): class Guard: def __init__(guard, side_states): # pylint: disable=no-self-argument guard.side_states = side_states guard.fhs: List[ResolveFile] = [] def __enter__(guard): # pylint: disable=no-self-argument for ss in guard.side_states: assert type(ss) is SideState if not ss.temp_file and ss.otype == FILE: ss.temp_file = self.temp_file(temp_for=ss.path) guard.fhs.append(ResolveFile(ss, self.providers[ss.side])) assert ss.oid return guard.fhs def __exit__(guard, *args): # pylint: disable=no-self-argument for fh in guard.fhs: fh.close() return Guard(side_states) def __safe_call_resolver(self, fhs): if DIRECTORY in (fhs[0].otype, fhs[1].otype): if fhs[0].otype != fhs[1].otype: if fhs[0].otype == DIRECTORY: fh = fhs[0] else: fh = fhs[1] # for simplicity: directory conflicted with file, always favors directory return (fh, True) is_file_like = lambda f: hasattr(f, "read") and hasattr(f, "close") ret = None try: ret = self._resolve_conflict(*fhs) if ret: if not isinstance(ret, tuple): log.error("resolve conflict should return a tuple of 2 values, got %s(%s)", ret, type(ret)) ret = None elif len(ret) != 2: log.error("bad return value for resolve conflict %s", ret) ret = None elif not is_file_like(ret[0]): log.error("bad return value for resolve conflict %s", ret) ret = None except Exception as e: log.exception("exception during conflict resolution %s", e) if ret is None: # we defer to the remote... since this can prevent loops if fhs[0].side == REMOTE: ret = (fhs[0], True) else: ret = (fhs[1], True) return ret def __resolver_merge_upload(self, side_states, fh, keep): # we modified to both sides, need to merge the entries ent1, ent2 = side_states defer_ent = self.state.lookup_oid(ent1.side, ent1.oid) replace_ent = self.state.lookup_oid(ent2.side, ent2.oid) if keep: # both sides are being kept, so we have to upload since there are no entries fh.seek(0) info1 = self.providers[ent1.side].create(ent1.path, fh) fh.seek(0) info2 = self.providers[ent2.side].create(ent2.path, fh) ent1.oid = info1.oid ent2.oid = info2.oid self.update_entry(defer_ent, ent1.side, ent1.oid, path=ent1.path, hash=ent1.hash) ent1 = defer_ent[ent1.side] ent2 = defer_ent[ent2.side] assert info2.hash assert info1.hash ent2.sync_hash = info2.hash ent2.sync_path = info2.path ent1.sync_hash = info1.hash ent1.sync_path = info1.path else: info1 = self.providers[ent1.side].info_oid(ent1.oid) info2 = self.providers[ent2.side].info_oid(ent2.oid) ent2.sync_path = ent2.path ent1.sync_path = self.translate(ent1.side, ent2.path) # in case oids have changed self.update_entry(defer_ent, ent2.side, ent2.oid, path=ent2.path, hash=ent2.hash) defer_ent[ent2.side].sync_hash = ent2.sync_hash defer_ent[ent2.side].sync_path = ent2.sync_path replace_ent.ignore(IgnoreReason.TRASHED) def resolve_conflict(self, side_states): # pylint: disable=too-many-statements, too-many-branches with self.__resolve_file_likes(side_states) as fhs: fh: ResolveFile keep: bool fh, keep = self.__safe_call_resolver(fhs) log.debug("keeping ret side %s", getattr(fh, "side", None)) log.debug("fhs[0].side=%s", fhs[0].side) defer = None # search the fhs for any fh that is going away # could be one, the other, or both (if the fh returned by the conflict resolver is a new one, not in fhs) for i, rfh in enumerate(fhs): if fh is not rfh: # this rfh is getting replaced (this will be true for at least one rfh) loser = side_states[i] winner = side_states[1 - i] log.debug("i=%s, fhs=%s", i, fhs) log.debug("loser.side=%s, winner.side=%s", loser.side, winner.side) # user didn't opt to keep my rfh log.debug("replacing side %s", loser.side) if not keep: log.debug("not keeping side %s, simply uploading to replace with new contents", loser.side) fh.seek(0) info2 = self.providers[loser.side].upload(loser.oid, fh) loser.hash = info2.hash loser.path = info2.path assert info2.hash loser.sync_hash = loser.hash if not loser.sync_path: loser.sync_path = loser.path else: log.debug("rename side %s to conflicted", loser.side) try: self._resolve_rename(loser) except CloudFileNotFoundError: log.debug("there is no conflict, because the file doesn't exist? %s", loser) if defer is None: # the first time we see an rfh to replace, defer gets set to the winner side defer = winner.side else: # if we replace both rfh, then defer gets set back to None defer = None if defer is not None: # we are replacing one side, not both sorted_states = sorted(side_states, key=lambda e: e.side) replace_side = other_side(defer) replace_ent = self.state.lookup_oid(replace_side, sorted_states[replace_side].oid) if keep: # toss the other side that was replaced if replace_ent: replace_ent.ignore(IgnoreReason.CONFLICT) else: log.debug("defer not none, and not keeping, so merge sides") defer_ent = self.state.lookup_oid(defer, sorted_states[defer].oid) replace_ent[defer] = defer_ent[defer] defer_ent.ignore(IgnoreReason.TRASHED) replace_ent[replace_side].sync_path = replace_ent[replace_side].path replace_ent[replace_side].sync_hash = replace_ent[replace_side].hash replace_ent[defer].sync_path = self.translate(defer, replace_ent[replace_side].path) replace_ent[defer].sync_hash = replace_ent[defer].hash else: # both sides were modified, because the fh returned was some third thing that should replace both log.debug("resolver merge upload to both sides: %s", keep) self.__resolver_merge_upload(side_states, fh, keep) log.debug("RESOLVED CONFLICT: %s side: %s", side_states, defer) log.debug("table\r\n%s", self.state.pretty_print()) def delete_synced(self, sync, changed, synced): log.debug("try sync deleted %s", sync[changed].path) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] for ent in ents: if ent.is_creation(synced): log.debug("discard delete, pending create %s:%s", synced, ent) sync.ignore(IgnoreReason.TRASHED) return FINISHED # deltions don't always have paths if sync[changed].path: translated_path = self.translate(synced, sync[changed].path) if translated_path: # find conflicting entries that will be renamed away ents = list(self.state.lookup_path(synced, translated_path)) ents = [ent for ent in ents if ent != sync] for ent in ents: if ent.is_rename(synced): log.debug("discard delete, pending rename %s:%s", synced, ent) sync.ignore(IgnoreReason.TRASHED) return FINISHED if sync[synced].oid: try: self.providers[synced].delete(sync[synced].oid) except CloudFileNotFoundError: pass except CloudFileExistsError: return self._handle_dir_delete_not_empty(sync, changed) else: log.debug("was never synced, ignoring deletion") sync[synced].exists = TRASHED sync.ignore(IgnoreReason.TRASHED, previous_reasons=IgnoreReason.IRRELEVANT) return FINISHED def _handle_dir_delete_not_empty(self, sync, changed): # punt once to allow children to be processed, if already done just forget about it if sync.punted > 0: all_synced = True for kid, _ in self.state.get_kids(sync[changed].path, changed): if kid.needs_sync(): all_synced = False break if all_synced: log.info("dropping dir removal because children fully synced %s", sync[changed].path) return FINISHED else: log.debug("all children not fully synced, punt %s", sync[changed].path) sync.punt() return REQUEUE # Mark children changed so we will check if already deleted log.debug("kids exist, mark changed and punt %s", sync[changed].path) for kid, _ in self.state.get_kids(sync[changed].path, changed): kid[changed].changed = time.time() sync.punt() return REQUEUE def _get_unstrashed_peers(self, sync, changed, synced, translated_path): # check for creation of a new file with another in the table if sync[changed].otype != FILE: return None ents = list(self.state.lookup_path(synced, translated_path)) # filter for exists other_ents = [ent for ent in ents if ent != sync] if not other_ents: return None log.debug("found matching %s, other ents: %s", translated_path, other_ents) # ignoring trashed entries with different oids on the same path if all(TRASHED in (ent[synced].exists, ent[changed].exists) for ent in other_ents): for ent in other_ents: if ent[synced].exists == TRASHED: # old trashed entries can be safely ignored ent.ignore(IgnoreReason.TRASHED) return None other_untrashed_ents = [ent for ent in other_ents if TRASHED not in ( ent[synced].exists, ent[changed].exists)] return other_untrashed_ents def check_disjoint_create(self, sync, changed, synced, translated_path): other_untrashed_ents = self._get_unstrashed_peers(sync, changed, synced, translated_path) if not other_untrashed_ents: return False if sync.punted == 0: # delaying sometimes helps, because future events can resolve conflicts # it's generally better to wait before conflicting something log.debug("punting, maybe it will fix itself") sync.punt() return True log.debug("split conflict found : %s:%s", len(other_untrashed_ents), other_untrashed_ents) found = None info = self.providers[synced].info_path(translated_path) if not info: return False if info: for e in other_untrashed_ents: if e[synced].oid == info.oid: if e[synced].sync_hash != e[synced].hash: found = e else: if not e[synced].changed: log.debug("merge split entries") sync[synced] = e[synced] else: found = e if not found: log.debug("disjoint conflict with something I don't understand") sync.punt() return True return self.handle_split_conflict( found, synced, sync, changed) def handle_path_change_or_creation(self, sync, changed, synced): # pylint: disable=too-many-branches, too-many-return-statements if not sync[changed].path: self.update_sync_path(sync, changed) log.debug("NEW SYNC %s", sync) if sync[changed].exists == TRASHED or sync.is_trashed: log.debug("requeue trashed event %s", sync) return REQUEUE translated_path = self.translate(synced, sync[changed].path) if translated_path is None: # ignore these return FINISHED if not sync[changed].path: log.debug("can't sync, no path %s", sync) if sync[changed].sync_path and sync[synced].exists == TRASHED: # see test: test_sync_folder_conflicts_del if not sync.punted: sync.punt() return REQUEUE sync[synced].clear() log.debug("cleared trashed info, converting to create %s", sync) if sync.is_creation(changed): # never synced this before, maybe there's another local path with # the same name already? if self.check_disjoint_create(sync, changed, synced, translated_path): log.debug("disjoint, requeue") return REQUEUE if sync.is_creation(changed): assert not sync[changed].sync_hash # looks like a new file if sync[changed].otype == DIRECTORY: return self.mkdir_synced(changed, sync, translated_path) if not self.download_changed(changed, sync): return REQUEUE if sync[synced].oid and sync[synced].exists != TRASHED: if self.upload_synced(changed, sync): return FINISHED return REQUEUE return self.create_synced(changed, sync, translated_path) return self.handle_rename(sync, changed, synced, translated_path) def handle_rename(self, sync, changed, synced, translated_path): # handle rename # use == to allow rename for case reasons # todo: need a paths_match flag instead, so slashes don't break this line if sync[synced].sync_path == translated_path: return FINISHED assert sync[synced].sync_hash or sync[synced].otype == DIRECTORY log.debug("rename %s %s", sync[synced].sync_path, translated_path) try: new_oid = self.providers[synced].rename(sync[synced].oid, translated_path) except CloudFileNotFoundError as e: log.debug("ERROR: can't rename for now %s: %s", sync, e) if sync.punted > 5: log.exception("punted too many times, giving up") return FINISHED else: log.debug("fnf, punt") sync.punt() return REQUEUE except CloudFileExistsError: log.debug("can't rename, file exists") if sync.punted: ents = self.state.lookup_path(synced, translated_path) if ents: conflict = ents[0] if not conflict[changed].changed and not conflict[synced].changed: # file is up to date, we're replacing a known synced copy self.providers[synced].delete(conflict[synced].oid) log.debug("deleting %s out of the way", translated_path) sync.punt() return REQUEUE log.debug("rename to fix conflict %s because %s not synced", translated_path, conflict) self.rename_to_fix_conflict(sync, synced, translated_path, temp_rename=True) sync.punt() return REQUEUE sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, path=translated_path, oid=new_oid) return FINISHED def _resolve_rename(self, replace): _old_oid, new_oid, new_name = self.conflict_rename(replace.side, replace.path) if new_name is None: return False replace.oid = new_oid replace.changed = time.time() return True def rename_to_fix_conflict(self, sync, side, path, temp_rename=False): old_oid, new_oid, new_name = self.conflict_rename(side, path) if new_name is None: return False # file can get renamed back, if there's a cycle if old_oid == sync[side].oid: self.update_entry(sync, side=side, oid=new_oid) if temp_rename: sync.ignore(IgnoreReason.TEMP_RENAME) else: ent = self.state.lookup_oid(side, old_oid) if ent: self.update_entry(ent, side=side, oid=new_oid) if temp_rename: ent.ignore(IgnoreReason.TEMP_RENAME) return True def conflict_rename(self, side, path): folder, base = self.providers[side].split(path) if base == "": raise ValueError("bad path %s" % path) index = base.find(".") if index >= 0: ext = base[index:] base = base[:index] else: # base = base ext = "" oinfo = self.providers[side].info_path(path) if not oinfo: return None, None, None i = 1 new_oid = None conflict_name = base + ".conflicted" + ext while new_oid is None: try: conflict_path = self.providers[side].join(folder, conflict_name) new_oid = self.providers[side].rename(oinfo.oid, conflict_path) except CloudFileExistsError: log.debug("already exists %s", conflict_name) i = i + 1 conflict_name = base + ".conflicted" + str(i) + ext log.debug("conflict renamed: %s -> %s", path, conflict_path) return oinfo.oid, new_oid, conflict_path def embrace_change(self, sync, changed, synced): # pylint: disable=too-many-return-statements, too-many-branches if sync[changed].path: translated_path = self.translate(synced, sync[changed].path) if not translated_path: if sync[changed].sync_path: # This entry was relevent, but now it is irrelevant log.debug(">>>Removing remnants of file moved out of cloud root") sync[changed].exists = TRASHED # This will discard the ent later else: # we don't have a new or old translated path... just irrelevant so discard log.log(TRACE, ">>>Not a cloud path %s, ignoring", sync[changed].path) sync.ignore(IgnoreReason.IRRELEVANT) if sync.is_trashed: log.log(TRACE, "Ignoring entry because %s:%s", sync.ignored.value, sync) return FINISHED log.debug("embrace %s, side:%s", sync, changed) log.log(TRACE, "table\r\n%s", self.state.pretty_print()) if sync.is_conflicted: log.debug("Conflicted file %s is changing", sync[changed].path) if "conflicted" in sync[changed].path: return FINISHED else: sync.unignore(IgnoreReason.CONFLICT) if sync[changed].path and sync[changed].exists == EXISTS: # parent_conflict code # todo: make this walk up parents to the translate == None "root" conflict = self._get_parent_conflict(sync, changed) if conflict: log.debug("parent modify %s should happen first %s", sync[changed].path, conflict) conflict[changed].set_aged() sync.punt() return REQUEUE if sync[changed].exists == TRASHED: log.debug("delete") return self.delete_synced(sync, changed, synced) if sync.is_path_change(changed) or sync.is_creation(changed): ret = self.handle_path_change_or_creation(sync, changed, synced) if ret == REQUEUE: log.debug("requeue, not handled") return ret if sync[changed].hash != sync[changed].sync_hash: return self.handle_hash_diff(sync, changed, synced) log.debug("nothing changed %s", sync) return FINISHED def handle_hash_diff(self, sync, changed, synced): if sync[changed].path is None: return FINISHED if sync[changed].sync_hash is None: sync[changed].sync_path = None # creation must have failed log.warning("needs create: %s index: %s bc %s != %s", sync, synced, sync[changed].hash, sync[changed].sync_hash) return REQUEUE # not a new file, which means we must have last sync info if sync[synced].exists == TRASHED or sync[synced].oid is None: log.debug("dont upload to trashed, zero out trashed side") # not an upload sync[synced].exists = UNKNOWN sync[synced].hash = None sync[synced].changed = 0 sync[synced].path = None sync[synced].oid = None sync[synced].sync_path = None sync[synced].sync_hash = None sync[changed].sync_path = None sync[changed].sync_hash = None return REQUEUE log.debug("needs upload: %s index: %s bc %s != %s", sync, synced, sync[changed].hash, sync[changed].sync_hash) assert sync[synced].oid if not self.download_changed(changed, sync): return REQUEUE if not self.upload_synced(changed, sync): return REQUEUE return FINISHED def update_sync_path(self, sync, changed): assert sync[changed].oid info = self.providers[changed].info_oid(sync[changed].oid) if not info: sync[changed].exists = TRASHED return if not info.path: log.warning("impossible sync, no path. " "Probably a file that was shared, but not placed into a folder. discarding. %s", sync[changed]) sync.ignore(IgnoreReason.TRASHED) return log.debug("UPDATE PATH %s->%s", sync, info.path) self.update_entry( sync, changed, sync[changed].oid, path=info.path, exists=True) def handle_hash_conflict(self, sync): log.debug("splitting hash conflict %s", sync) try: save: Tuple[Dict[str, Any], Dict[str, Any]] = ({}, {}) for side in (LOCAL, REMOTE): for field in ("sync_hash", "sync_path", "oid", "hash", "path", "exists"): save[side][field] = getattr(sync[side], field) # split the sync in two defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) return self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) except CloudException as e: log.info("exception during hash conflict split: %s", e) for side in (LOCAL, REMOTE): for field in ("sync_hash", "sync_path", "oid", "hash", "path", "exists"): setattr(defer_ent[side], field, save[side][field]) replace_ent.ignore(IgnoreReason.TRASHED) raise def handle_split_conflict(self, defer_ent, defer_side, replace_ent, replace_side): if defer_ent[defer_side].otype == FILE: if not self.download_changed(defer_side, defer_ent): return False try: with open(defer_ent[defer_side].temp_file, "rb") as f: dhash = self.providers[replace_side].hash_data(f) if dhash == replace_ent[replace_side].hash: log.debug("same hash as remote, discard entry") replace_ent.ignore(IgnoreReason.TRASHED) return True except FileNotFoundError: return False log.debug(">>> about to resolve_conflict") self.resolve_conflict((defer_ent[defer_side], replace_ent[replace_side])) return True def handle_path_conflict(self, sync): # consistent handling log.debug("handle path conflict %s", sync) path1 = sync[0].path path2 = sync[1].path if path1 > path2: pick = 0 else: pick = 1 picked = sync[pick] other = sync[other_side(pick)] other_path = self.translate(other.side, picked.path) if other_path is None: return other_info = self.providers[other.side].info_oid(other.oid) log.debug("renaming to handle path conflict: %s -> %s", other.oid, other_path) def _update_syncs(new_oid): self.update_entry(sync, other.side, new_oid, path=other_path) if sync[other.side].sync_path: sync[other.side].sync_path = sync[other.side].path if sync[picked.side].sync_path: sync[picked.side].sync_path = sync[picked.side].path try: if other_info.path == other_path: # don't sync this entry log.info("supposed rename conflict, but the names are the same") if not sync[other.side].sync_hash and sync[other.side].otype == FILE: log.warning("sync_hashes missing even though the sync_path is set...") sync[other.side].sync_path = None if not sync[picked.side].sync_hash and sync[picked.side].otype == FILE: log.warning("sync_hashes missing even though the sync_path is set...") sync[picked.side].sync_path = None raise CloudFileExistsError() new_oid = self.providers[other.side].rename(other.oid, other_path) _update_syncs(new_oid) except CloudFileExistsError: # other side already agrees _update_syncs(other.oid) except CloudFileNotFoundError: # other side doesnt exist, or maybe parent doesn't exist log.info("punting path conflict %s", sync) sync.punt() return def _get_parent_conflict(self, sync: SyncEntry, changed) -> SyncEntry: provider = self.providers[changed] path = sync[changed].path parent = provider.dirname(path) ret = None while path != parent: ents = list(self.state.lookup_path(changed, parent)) for ent in ents: if ent[changed].changed and ent[changed].exists == EXISTS: ret = ent path = parent parent = provider.dirname(path) return ret PKL9O~* * cloudsync/sync/sqlite_storage.pyfrom typing import Dict, Any, Optional import logging import sqlite3 from cloudsync import Storage from cloudsync.log import TRACE log = logging.getLogger(__name__) class SqliteStorage(Storage): def __init__(self, filename: str): self._filename = filename self._ensure_table_exists() def _ensure_table_exists(self): self.db = sqlite3.connect(self._filename, uri=self._filename.startswith('file:'), check_same_thread=self._filename == ":memory:", timeout=5, isolation_level=None, ) self.db.execute("PRAGMA journal_mode=WAL;") self.db.execute("PRAGMA busy_timeout=5000;") # Not using AUTOINCREMENT: http://www.sqlitetutorial.net/sqlite-autoincrement/ self.db.execute('CREATE TABLE IF NOT EXISTS cloud (id INTEGER PRIMARY KEY, ' 'tag TEXT NOT NULL, serialization BLOB)') def create(self, tag: str, serialization: bytes) -> Any: db_cursor = self.db.execute('INSERT INTO cloud (tag, serialization) VALUES (?, ?)', [tag, serialization]) eid = db_cursor.lastrowid return eid def update(self, tag: str, serialization: bytes, eid: Any) -> int: log.log(TRACE, "updating eid%s", eid) db_cursor = self.db.execute('UPDATE cloud SET serialization = ? WHERE id = ? AND tag = ?', [serialization, eid, tag]) ret = db_cursor.rowcount if ret == 0: raise ValueError("id %s doesn't exist" % eid) return ret def delete(self, tag: str, eid: Any): log.log(TRACE, "deleting eid%s", eid) db_cursor = self.db.execute('DELETE FROM cloud WHERE id = ? AND tag = ?', [eid, tag]) if db_cursor.rowcount == 0: log.debug("ignoring delete: id %s doesn't exist", eid) return def read_all(self, tag: str) -> Dict[Any, bytes]: ret = {} db_cursor = self.db.execute('SELECT id, serialization FROM cloud WHERE tag = ?', [tag]) for row in db_cursor.fetchall(): eid, serialization = row ret[eid] = serialization return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: db_cursor = self.db.execute('SELECT serialization FROM cloud WHERE id = ? and tag = ?', [eid, tag]) for row in db_cursor.fetchall(): return row return None PKL9OpGcloudsync/sync/state.py# pylint: disable=attribute-defined-outside-init, protected-access """ SyncEntry[SideState, SideState] is a pair of entries, indexed by oid. The SideState class makes extensive use of __getattr__ logic to keep indexes up to date. There may be no need to keep them in "pairs". This is artificial. States should probably be altered to independent, and not paired at all. """ import copy import json import logging import time from threading import RLock from abc import ABC, abstractmethod from enum import Enum from typing import Optional, Tuple, Any, List, Dict, Set, cast, TYPE_CHECKING, Callable, Generator from typing import Union, Sequence from pystrict import strict from cloudsync.types import DIRECTORY, FILE, NOTKNOWN, IgnoreReason from cloudsync.types import OType from cloudsync.scramble import scramble from cloudsync.log import TRACE from cloudsync.utils import debug_sig if TYPE_CHECKING: from cloudsync import Provider log = logging.getLogger(__name__) __all__ = ['SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY', 'UNKNOWN'] # safe ternary, don't allow traditional comparisons class Exists(Enum): UNKNOWN = None EXISTS = True TRASHED = False def __bool__(self): raise ValueError("never bool enums") UNKNOWN = Exists.UNKNOWN EXISTS = Exists.EXISTS TRASHED = Exists.TRASHED # state of a single object @strict # pylint: disable=too-many-instance-attributes class SideState(): def __init__(self, parent: 'SyncEntry', side: int, otype: Optional[OType]): self._parent = parent self._side: int = side # just for assertions self._otype: Optional[OType] = otype self._hash: Optional[bytes] = None # hash at provider # time of last change (we maintain this) self._changed: Optional[float] = None self._last_gotten: float = 0.0 # set to == changed when getting self._sync_hash: Optional[bytes] = None # hash at last sync self._sync_path: Optional[str] = None # path at last sync self._path: Optional[str] = None # path at provider self._oid: Optional[str] = None # oid at provider self._exists: Exists = UNKNOWN # exists at provider self._temp_file: Optional[str] = None self.hash: Optional[bytes] self.sync_hash: Optional[bytes] def __getattr__(self, k): if k[0] != "_": return getattr(self, "_" + k) raise AttributeError("%s not in SideState" % k) def __setattr__(self, k, v): if k[0] == "_": object.__setattr__(self, k, v) return self._parent.updated(self._side, k, v) if k == "exists": self._set_exists(v) else: object.__setattr__(self, "_" + k, v) def _set_exists(self, val: Union[bool, Exists]): if val is False: xval = TRASHED elif val is True: xval = EXISTS elif val is None: xval = UNKNOWN else: xval = cast(Exists, val) if type(xval) != Exists: raise ValueError("use enum for exists") self._exists = xval self._parent.updated(self._side, "exists", xval) def set_aged(self): # setting to an old mtime marks this as fully aged self.changed = 1 def clear(self): self.parent[1-self.side].sync_path = None self.parent[1-self.side].sync_hash = None self.exists = UNKNOWN self.changed = None self.hash = None self.sync_hash = None self.sync_path = None self.path = None self.oid = None def __repr__(self): d = self.__dict__.copy() d.pop("_parent", None) return self.__class__.__name__ + ":" + debug_sig(id(self)) + str(d) # these are not really local or remote # but it's easier to reason about using these labels LOCAL = 0 REMOTE = 1 def other_side(index): return 1-index class Storage(ABC): @abstractmethod def create(self, tag: str, serialization: bytes) -> Any: """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def update(self, tag: str, serialization: bytes, eid: Any) -> int: """ take a serialization str, update it in sqlite, return the count of rows updated """ ... @abstractmethod def delete(self, tag: str, eid: Any): """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def read_all(self, tag: str) -> Dict[Any, bytes]: """yield all the serialized strings in a generator""" ... @abstractmethod def read(self, tag: str, eid: Any) -> Optional[bytes]: """return one serialized string or None""" ... # single entry in the syncs state collection @strict # pylint: disable=too-many-instance-attributes class SyncEntry: def __init__(self, parent: 'SyncState', otype: Optional[OType], storage_init: Optional[Tuple[Any, bytes]] = None, ignore_reason: IgnoreReason = IgnoreReason.NONE): super().__init__() assert otype is not None or storage_init self.__states: List[SideState] = [SideState(self, 0, otype), SideState(self, 1, otype)] self._ignored = ignore_reason self._storage_id: Any = None self._dirty: bool = True self._punted: int = 0 self._parent = parent if storage_init is not None: self._storage_id = storage_init[0] self.deserialize(storage_init) self._dirty = False log.debug("new syncent %s", debug_sig(id(self))) def __getattr__(self, k): if k[0] != "_": return getattr(self, "_" + k) raise AttributeError("%s not in SyncEntry" % k) def __setattr__(self, k, v): if k[0] == "_": object.__setattr__(self, k, v) return if getattr(self, "_" + k) != v: self.updated(None, k, v) object.__setattr__(self, "_" + k, v) def updated(self, side, key, val): self._dirty = True self._parent.updated(self, side, key, val) def serialize(self) -> bytes: """converts SyncEntry into a json str""" def side_state_to_dict(side_state: SideState) -> dict: ret = dict() ret['otype'] = side_state.otype.value ret['side'] = side_state.side ret['hash'] = side_state.hash.hex() if isinstance(side_state.hash, bytes) else None ret['changed'] = side_state.changed ret['sync_hash'] = side_state.sync_hash.hex() if isinstance(side_state.sync_hash, bytes) else None ret['path'] = side_state.path ret['sync_path'] = side_state.sync_path ret['oid'] = side_state.oid ret['exists'] = side_state.exists.value ret['temp_file'] = side_state.temp_file # storage_id does not get serialized, it always comes WITH a serialization when deserializing return ret ser = dict() ser['side0'] = side_state_to_dict(self.__states[0]) ser['side1'] = side_state_to_dict(self.__states[1]) ser['ignored'] = self._ignored.value return json.dumps(ser).encode('utf-8') def deserialize(self, storage_init: Tuple[Any, bytes]): """loads the values in the serialization dict into self""" def dict_to_side_state(side, side_dict: dict) -> SideState: otype = OType(side_dict['otype']) side_state = SideState(self, side, otype) side_state.side = side_dict['side'] side_state.hash = bytes.fromhex(side_dict['hash']) if side_dict['hash'] else None side_state.changed = side_dict['changed'] side_state.sync_hash = bytes.fromhex(side_dict['sync_hash']) if side_dict['sync_hash'] else None side_state.sync_path = side_dict['sync_path'] side_state.path = side_dict['path'] side_state.oid = side_dict['oid'] side_state.exists = side_dict['exists'] side_state.temp_file = side_dict['temp_file'] return side_state self.storage_id = storage_init[0] ser: dict = json.loads(storage_init[1].decode('utf-8')) self.__states = [dict_to_side_state(0, ser['side0']), dict_to_side_state(1, ser['side1'])] reason_string = ser.get('ignored', "") log.debug("here") if reason_string: try: reason = IgnoreReason(reason_string) self._ignored = reason except ValueError: # reason was specified, but had an unrecognized value? log.warning("deserializing state, but ignored had bad value %s", reason_string) reason = IgnoreReason.TRASHED # is this the best thing to use here? elif ser.get('discarded', ""): self._ignored = IgnoreReason.TRASHED elif ser.get('conflicted', ""): self._ignored = IgnoreReason.CONFLICT def __getitem__(self, i): return self.__states[i] def __setitem__(self, side, val): # can't really move items.. just copy stuff new_path = val._path new_oid = val._oid # old value is no longer in charge of anything val.path = None val.oid = None # new value rulez val = copy.copy(val) val._path = new_path val._oid = new_oid val._parent = self assert type(val) is SideState assert val.side == side # we need to ensure a valid oid is present when changing the path if val._oid is None: self.updated(side, "path", val._path) self.updated(side, "oid", val._oid) else: self.updated(side, "oid", val._oid) self.updated(side, "path", val._path) self.updated(side, "changed", val._changed) self.__states[side] = copy.copy(val) def hash_conflict(self): if self[0].hash and self[1].hash: return self[0].hash != self[0].sync_hash and self[1].hash != self[1].sync_hash return False def is_path_change(self, changed): return self[changed].path != self[changed].sync_path def is_creation(self, changed): return not self[changed].sync_path and self[changed].path def is_rename(self, changed): return (self[changed].sync_path and self[changed].path and self[changed].sync_path != self[changed].path) def needs_sync(self): for i in (LOCAL, REMOTE): if not self[i].changed: continue if self[i].path != self[i].sync_path: return True if self[i].hash != self[i].sync_hash: return True if self[LOCAL].exists != self[REMOTE].exists: return True return False # @property # def ignored(self): # return self._ignored != IgnoreReason.NONE # # @ignored.setter # def ignored(self, val: Optional[IgnoreReason]): # if val is None: # val = IgnoreReason.NONE # self._ignored = val @property def is_trashed(self): return self.ignored in (IgnoreReason.TRASHED, IgnoreReason.IRRELEVANT) @property def is_irrelevant(self): return self.ignored == IgnoreReason.IRRELEVANT @property def is_conflicted(self): return self.ignored == IgnoreReason.CONFLICT @property def is_temp_rename(self): return self.ignored == IgnoreReason.TEMP_RENAME # @property # def is_temp_conflicted(self): # return self.ignored == IgnoreReason.TEMP_CONFLICT # # def discard(self): # raise NotImplementedError( ) # self.discarded = ''.join(traceback.format_stack()) def ignore(self, reason: IgnoreReason, previous_reasons: Union[Sequence[IgnoreReason], IgnoreReason] = (IgnoreReason.NONE,)): if isinstance(previous_reasons, IgnoreReason): previous_reasons = (previous_reasons,) # always ok to set the target reason if the current reason is none or is already the target reason if self._ignored not in (IgnoreReason.NONE, reason, *previous_reasons): log.warning("Ignoring entry for reason '%s' that should have been '%s' already, but was actually '%s':%s", reason.value, [x.value for x in previous_reasons], self._ignored, self) if reason == IgnoreReason.NONE: log.warning("don't call ignore(IgnoreReason.NONE), call unignore() with the reason to stop ignoring") self.ignored = reason def unignore(self, reason: IgnoreReason): assert self.ignored in (reason, IgnoreReason.NONE) self.ignored = IgnoreReason.NONE @staticmethod def prettyheaders(): ret = "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s %s" % ( "EID", # _sig(id(self)), "SID", # _sig(self.storage_id), "Typ", # otype, "Change", # secs(self[LOCAL].changed), "Path", # self[LOCAL].path, "OID", # _sig(self[LOCAL].oid), "Last Sync Path E H", # str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, "Change", # secs(self[REMOTE].changed), "Path", # self[REMOTE].path, "OID", # _sig(self[REMOTE].oid), "Last Sync Path ", # str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, "Punt", # self.punted or "" "Ignored", # self.ignored or "" ) return ret def pretty(self, fixed=True, use_sigs=True): ret = "" if self.ignored != IgnoreReason.NONE: # reason is printed at the end ret = "# " def secs(t): if t: if t >= self.parent._pretty_time: return str(int(1000*round(t-self.parent._pretty_time, 3))) return -t else: return 0 def abbrev_bool(b, tup=('T', 'F', '?')): idx = 1-int(bool(b)) if b is None: idx = 2 return tup[idx] lexv = abbrev_bool(self[LOCAL].exists.value, ("E", "X", "?")) rexv = abbrev_bool(self[REMOTE].exists.value, ("E", "X", "?")) lhma = abbrev_bool(self[LOCAL].hash and self[LOCAL].sync_hash != self[LOCAL].hash, ("H", "=", "?")) rhma = abbrev_bool(self[REMOTE].hash and self[REMOTE].sync_hash != self[REMOTE].hash, ("H", "=", "?")) _sig: Callable[[Any], Any] if use_sigs: _sig = debug_sig else: _sig = lambda a: a local_otype = self[LOCAL].otype.value if self[LOCAL].otype else '?' remote_otype = self[REMOTE].otype.value if self[REMOTE].otype else '?' if local_otype != remote_otype: otype = local_otype[0] + "-" + remote_otype[0] else: otype = local_otype if not fixed: return str((_sig(id(self)), otype, (secs(self[LOCAL].changed), self[LOCAL].path, _sig( self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma), (secs(self[REMOTE].changed), self[REMOTE].path, _sig( self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma), self._punted, self.ignored.value )) ret += "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s %s" % ( _sig(id(self)), _sig(self.storage_id), otype[:3], secs(self[LOCAL].changed), self[LOCAL].path, _sig(self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, secs(self[REMOTE].changed), self[REMOTE].path, _sig(self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, self._punted or "", self.ignored.value if self.ignored != IgnoreReason.NONE else "", ) return ret def __str__(self): return self.pretty(fixed=False) def __repr__(self): d = self.__dict__.copy() d.pop("_parent", None) return self.__class__.__name__ + ":" + debug_sig(id(self)) + str(d) def store(self, tag: str, storage: Storage): if not self.storage_id: self.storage_id = storage.create(tag, self.serialize()) else: storage.update(tag, self.serialize(), self.storage_id) def punt(self): # do this one later # TODO provide help for making sure that we don't punt too many times self.punted += 1 # pylint: disable=no-member if self[LOCAL].changed: self[LOCAL].changed += self._parent._punt_secs[LOCAL] if self[REMOTE].changed: self[REMOTE].changed = self._parent._punt_secs[REMOTE] def get_latest(self): max_changed = max(self[LOCAL].changed or 0, self[REMOTE].changed or 0) for side in (LOCAL, REMOTE): if max_changed > self[side]._last_gotten: self._parent.unconditionally_get_latest(self, side) self[side]._last_gotten = max_changed def is_latest(self) -> bool: for side in (LOCAL, REMOTE): if self[side]._changed and self[side]._changed > self[side]._last_gotten: return False return True @strict class SyncState: # pylint: disable=too-many-instance-attributes def __init__(self, providers: Tuple['Provider', 'Provider'], storage: Optional[Storage] = None, tag: Optional[str] = None, shuffle: bool = False): self._oids: Tuple[Dict[Any, SyncEntry], Dict[Any, SyncEntry]] = ({}, {}) self._paths: Tuple[Dict[str, Dict[Any, SyncEntry]], Dict[str, Dict[Any, SyncEntry]]] = ({}, {}) self._changeset: Set[SyncEntry] = set() self._storage: Optional[Storage] = storage self._tag = tag self.providers = providers self._punt_secs = (providers[0].default_sleep/10.0, providers[1].default_sleep/10.0) self._pretty_time = time.time() assert len(providers) == 2 self.lock = RLock() self.data_id: Dict[str, Any] = dict() self.shuffle = shuffle self._loading = False if self._storage: assert self._tag self._loading = True storage_dict = self._storage.read_all(cast(str, tag)) for eid, ent_ser in storage_dict.items(): ent = SyncEntry(self, None, (eid, ent_ser)) for side in [LOCAL, REMOTE]: path, oid = ent[side].path, ent[side].oid if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][oid] = ent self._oids[side][oid] = ent if ent[side].changed: self._changeset.add(ent) self._loading = False def updated(self, ent, side, key, val): if self._loading: return assert key if key == "path": self._change_path(side, ent, val, self.providers[side]) elif key == "oid": self._change_oid(side, ent, val) elif key == "ignored": if val == IgnoreReason.TRASHED: ent[LOCAL]._changed = False ent[REMOTE]._changed = False self._changeset.discard(ent) ent._ignored = val # TODO: remove this when storage_update is refactored self.storage_update(ent) elif key == "changed": if val or ent[other_side(side)].changed: self._changeset.add(ent) else: self._changeset.discard(ent) @property def changes(self): return tuple(self._changeset) @property def changeset_len(self): return len(self._changeset) def _change_path(self, side, ent, path, provider): assert type(ent) is SyncEntry if path: assert ent[side].oid prior_ent = ent prior_path = ent[side].path if prior_path == path: return if prior_path and prior_path in self._paths[side]: prior_ent = self._paths[side][prior_path].pop(ent[side].oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] prior_ent = None if path: if path not in self._paths[side]: self._paths[side][path] = {} path_ents = self._paths[side][path] if ent[side].oid in path_ents: prior_ent = path_ents[ent[side].oid] assert prior_ent is not ent # ousted this ent prior_ent[side]._path = None self._paths[side][path][ent[side].oid] = ent ent[side]._path = path self._update_kids(ent, side, prior_path, path, provider) def _update_kids(self, ent, side, prior_path, path, provider): if ent[side].otype == DIRECTORY and prior_path != path and not prior_path is None: # changing directory also changes child paths for sub, relative in self.get_kids(prior_path, side): new_path = provider.join(path, relative) sub[side].path = new_path if provider.oid_is_path: # TODO: state should not do online hits esp from event manager # either # a) have event manager *not* trigger this, maybe by passing none as the provider, etc # this may have knock on effects where the sync engine needs to process parent folders first # b) have a special oid_from_path function that is guaranteed not to be "online" # assert not _api() called, etc. new_info = provider.info_path(new_path) if new_info: sub[side].oid = new_info.oid def _change_oid(self, side, ent, oid): assert type(ent) is SyncEntry for remove_oid in set([ent[side].oid, oid]): prior_ent = self._oids[side].pop(remove_oid, None) if prior_ent: if prior_ent[side].path: prior_path = prior_ent[side].path if prior_path in self._paths[side]: self._paths[side][prior_path].pop(remove_oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] if prior_ent is not ent: # no longer indexed by oid, also clear change bit prior_ent[side].oid = None if oid: ent[side]._oid = oid self._oids[side][oid] = ent assert self.lookup_oid(side, oid) is ent if oid and ent[side].path: if ent[side].path not in self._paths[side]: self._paths[side][ent[side].path] = {} self._paths[side][ent[side].path][oid] = ent if oid: assert self.lookup_oid(side, oid) is ent def get_kids(self, parent_path: str, side: int) -> Generator[Tuple[SyncEntry, str], None, None]: provider = self.providers[side] for sub in self.get_all(): if not sub[side].path: continue relpath = provider.is_subpath(parent_path, sub[side].path, strict=True) if relpath: yield sub, relpath def lookup_oid(self, side, oid) -> SyncEntry: try: ret = self._oids[side][oid] return ret except KeyError: return None def lookup_path(self, side, path, stale=False) -> List[SyncEntry]: try: ret: Sequence[SyncEntry] = list(self._paths[side][path].values()) if ret: return [e for e in ret if stale or (not e.is_trashed and not e.is_conflicted)] return [] except KeyError: return [] def rename_dir(self, side, from_dir, to_dir, is_subpath, replace_path): """ when a directory changes, utility to rename all kids """ remove = [] # TODO: refactor this so that a list of affected items is gathered, then the alterations happen to the final # list, which will avoid having to remove after adding, which feels mildly risky # TODO: is this function called anywhere? ATM, it looks like no... It should be called or removed for path, oid_dict in self._paths[side].items(): if is_subpath(from_dir, path): new_path = replace_path(path, from_dir, to_dir) remove.append(path) self._paths[side][new_path] = oid_dict for ent in oid_dict.values(): ent[side].path = new_path for path in remove: self._paths[side].pop(path) def update_entry(self, ent, side, oid, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin, too-many-arguments assert ent if oid is not None: if ent.is_trashed: if self.providers[side].oid_is_path: if path: if otype: log.log(TRACE, "dropping old entry %s, and making new", ent) ent = SyncEntry(self, otype) ent[side].oid = oid if otype is not None and otype != ent[side].otype: ent[side].otype = otype assert otype is not NOTKNOWN or not exists if path is not None and path != ent[side].path: ent[side].path = path if hash is not None and hash != ent[side].hash: ent[side].hash = hash if exists is not None and exists is not ent[side].exists: ent[side].exists = exists if changed: assert ent[side].path or ent[side].oid log.log(TRACE, "add %s to changeset", ent) self._mark_changed(side, ent) log.log(TRACE, "updated %s", ent) def _mark_changed(self, side, ent): ent[side].changed = time.time() assert ent in self._changeset def storage_get_data(self, data_tag): if data_tag is None: return None retval = None if self._storage is not None: if data_tag in self.data_id: retval = self._storage.read(data_tag, self.data_id[data_tag]) # retval can be 0, but None is reserved if retval is None: datas = self._storage.read_all(data_tag) for eid, data in datas.items(): self.data_id[data_tag] = eid retval = data if len(datas) > 1: log.warning("Multiple datas found for %s", data_tag) assert False log.debug("storage_get_data id=%s data=%s", data_tag, str(retval)) return retval def storage_update_data(self, data_tag, data): if data_tag is None: return # None is reserved, cannot be stored assert data is not None updated = 0 if self._storage is not None: # stuff cache self.storage_get_data(data_tag) # data_id's cannot be None, but 0 is valid if data_tag in self.data_id and self.data_id[data_tag] is not None: updated = self._storage.update(data_tag, data, self.data_id[data_tag]) log.log(TRACE, "storage_update_data data %s %s -> %s", data_tag, data, updated) if not updated: self.data_id[data_tag] = self._storage.create(data_tag, data) log.log(TRACE, "storage_update_data data CREATE %s %s", data_tag, data) def storage_update(self, ent: SyncEntry): if self._tag is None: return log.log(TRACE, "storage_update eid%s", ent.storage_id) if self._storage is not None: if ent.storage_id is not None: self._storage.update(cast(str, self._tag), ent.serialize(), ent.storage_id) else: new_id = self._storage.create(cast(str, self._tag), ent.serialize()) ent.storage_id = new_id log.debug("storage_update creating eid%s", ent.storage_id) ent.dirty = False def __len__(self): return len(self.get_all()) def update(self, side, otype, oid, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin, too-many-arguments log.log(TRACE, "lookup %s", debug_sig(oid)) ent: SyncEntry = self.lookup_oid(side, oid) prior_ent = None if prior_oid and prior_oid != oid: # this is an oid_is_path provider prior_ent = self.lookup_oid(side, prior_oid) if prior_ent and not prior_ent.is_trashed: if not ent or not ent.is_conflicted: ent = prior_ent elif not ent: path_ents = self.lookup_path(side, path, stale=True) for path_ent in path_ents: ent = path_ent ent.unignore(IgnoreReason.TRASHED) log.debug("matched existing entry %s:%s", debug_sig(oid), path) if not ent: log.debug("creating new entry because %s not found in %s", debug_sig(oid), side) ent = SyncEntry(self, otype) self.update_entry(ent, side, oid, path=path, hash=hash, exists=exists, changed=True, otype=otype) self.storage_update(ent) def change(self, age): if not self._changeset: return None if self.shuffle: changes = scramble(self._changeset, 10) else: changes = sorted(self._changeset, key=lambda a: max(a[LOCAL].changed or 0, a[REMOTE].changed or 0)) earlier_than = time.time() - age for puntlevel in range(3): for e in changes: if not e.is_trashed and e.punted == puntlevel: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): return e ret = None for e in self._changeset: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): ret = e return ret def finished(self, ent): if ent[1].changed or ent[0].changed: log.info("not marking finished: %s", ent) return self._changeset.discard(ent) for e in self._changeset: e.punted = 0 def pretty_print(self, use_sigs=True): ret = SyncEntry.prettyheaders() + "\n" e: SyncEntry for e in self.get_all(discarded=True): # allow conflicted to be printed ret += e.pretty(fixed=True, use_sigs=use_sigs) + "\n" return ret def assert_index_is_correct(self): for ent in self._changeset: if not ent.is_trashed and not ent.is_conflicted: assert ent in self.get_all(), ("%s in changeset, not in index" % ent) for ent in self.get_all(): assert ent if ent[LOCAL].path: assert ent in self.lookup_path(LOCAL, ent[LOCAL].path), ("%s local path not indexed" % ent) if ent[REMOTE].path: assert ent in self.lookup_path(REMOTE, ent[REMOTE].path), ("%s remote path not indexed" % ent) if ent[LOCAL].oid: assert ent is self.lookup_oid(LOCAL, ent[LOCAL].oid), ("%s local oid not indexed" % ent) if ent[REMOTE].oid: assert ent is self.lookup_oid(REMOTE, ent[REMOTE].oid), ("%s local oid not indexed" % ent) if ent[LOCAL].changed or ent[REMOTE].changed: if ent not in self._changeset: assert False, ("changeset missing %s" % ent) def get_all(self, discarded=False) -> Set['SyncEntry']: ents = set() for ent in self._oids[LOCAL].values(): assert ent if (ent.is_trashed or ent.is_conflicted) and not discarded: continue ents.add(ent) for ent in self._oids[REMOTE].values(): assert ent if (ent.is_trashed or ent.is_conflicted) and not discarded: continue ents.add(ent) return ents def entry_count(self): return len(self.get_all()) def split(self, ent): log.debug("splitting %s", ent) defer = REMOTE replace = LOCAL defer_ent = ent replace_ent = SyncEntry(self, ent[replace].otype) assert ent[replace].oid replace_ent[replace] = ent[replace] assert replace_ent[replace].oid if ent[replace].oid: assert replace_ent in self.get_all() if defer_ent[replace].path: assert self.lookup_path(replace, defer_ent[replace].path) defer_ent[replace].clear() assert replace_ent[replace].oid assert replace_ent in self.get_all() self._mark_changed(replace, replace_ent) self._mark_changed(defer, defer_ent) assert replace_ent[replace].oid # we aren't synced replace_ent[replace].sync_path = None replace_ent[replace].sync_hash = None # never synced defer_ent[defer].sync_path = None defer_ent[defer].sync_hash = None log.debug("split: %s", defer_ent) log.debug("split: %s", replace_ent) log.info("SPLIT\n%s", self.pretty_print()) assert replace_ent[replace].oid self.storage_update(defer_ent) self.storage_update(replace_ent) return defer_ent, defer, replace_ent, replace def unconditionally_get_latest(self, ent, i): if not ent[i].oid: if ent[i].exists != TRASHED: ent[i].exists = UNKNOWN return info = self.providers[i].info_oid(ent[i].oid, use_cache=False) if not info: ent[i].exists = TRASHED return ent[i].exists = EXISTS ent[i].hash = info.hash ent[i].otype = info.otype if ent[i].otype == FILE: if ent[i].hash is None: ent[i].hash = self.providers[i].hash_oid(ent[i].oid) if ent[i].exists == EXISTS: if ent[i].hash is None: log.warning("Cannot sync %s, since hash is None", ent[i]) if ent[i].path != info.path: ent[i].path = info.path self.update_entry(ent, oid=ent[i].oid, side=i, path=info.path) PKL9O555cloudsync/tests/__init__.pyfrom .fixtures import * from .test_provider import * PKL9OA%{{cloudsync/tests/conftest.pyimport cloudsync from .fixtures import * # pylint: disable=unused-import, unused-wildcard-import, wildcard-import cloudsync.logger.setLevel("TRACE") def pytest_configure(config): config.addinivalue_line("markers", "manual") def pytest_runtest_setup(item): if 'manual' in item.keywords and not item.config.getoption("--manual"): pytest.skip("need --manual option to run this test") def pytest_addoption(parser): parser.addoption("--provider", action="append", default=[], help="provider(s) to run tests for") parser.addoption("--manual", action="store_true", default=False, help="run the manual tests") PKL9Ov&bcloudsync/tests/pytest.ini[pytest] log_format = %(asctime)s p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s: %(message)s log_date_format = %Y-%m-%d %H:%M:%S log_level = 5 log_cli = true # dont change this to debug, it will log passing tests as debug log_cli_level = warning PKL9OA%''cloudsync/tests/test_cs.pyfrom io import BytesIO import logging import pytest from typing import List, Dict, Any from unittest.mock import patch from .fixtures import MockProvider, MockStorage, mock_provider_instance from cloudsync.sync.sqlite_storage import SqliteStorage from cloudsync import Storage, CloudSync, SyncState, SyncEntry, LOCAL, REMOTE, FILE, DIRECTORY, CloudFileExistsError, CloudTemporaryError from .fixtures import WaitFor, RunUntilHelper log = logging.getLogger(__name__) @pytest.fixture(name="cs_storage") def fixture_cs_storage(mock_provider_generator, mock_provider_creator): storage_dict: Dict[Any, Any] = dict() storage = MockStorage(storage_dict) for cs in _fixture_cs(mock_provider_generator, mock_provider_creator, storage): yield cs, storage @pytest.fixture(name="cs") def fixture_cs(mock_provider_generator, mock_provider_creator): yield from _fixture_cs(mock_provider_generator, mock_provider_creator) def _fixture_cs(mock_provider_generator, mock_provider_creator, storage=None): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass cs = CloudSyncMixin((mock_provider_generator(), mock_provider_creator(oid_is_path=False, case_sensitive=True)), roots, storage=storage, sleep=None) yield cs cs.done() def make_cs(mock_provider_creator, left, right, storage=None): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass return CloudSyncMixin((mock_provider_creator(*left), mock_provider_creator(*right)), roots, storage=storage, sleep=None) # multi local test has two local providers, each syncing up to the same folder on one remote provider. # this simulates two separate machines syncing up to a shared folder @pytest.fixture(name="multi_local_cs") def fixture_multi_local_cs(mock_provider_generator): storage_dict: Dict[Any, Any] = dict() storage = MockStorage(storage_dict) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = mock_provider_generator() # local p2 = mock_provider_generator() # local p3 = mock_provider_instance(oid_is_path=False, case_sensitive=True) # remote (cloud) storage log.debug(f"p1 oip={p1.oid_is_path} cs={p1.case_sensitive}") log.debug(f"p2 oip={p2.oid_is_path} cs={p2.case_sensitive}") log.debug(f"p3 oip={p3.oid_is_path} cs={p3.case_sensitive}") roots = ("/local", "/remote") cs1 = CloudSyncMixin((p1, p3), roots, storage, sleep=None) cs2 = CloudSyncMixin((p2, p3), roots, storage, sleep=None) yield cs1, cs2 cs1.done() cs2.done() # multi remote test has one local provider with two folders, and each of those folders # syncs up with a folder on one of two remote providers. @pytest.fixture(name="multi_remote_cs") def fixture_multi_remote_cs(mock_provider_generator): storage_dict: Dict[Any, Any] = dict() storage = MockStorage(storage_dict) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = mock_provider_generator() p2 = mock_provider_generator() p3 = mock_provider_generator() roots1 = ("/local1", "/remote") roots2 = ("/local2", "/remote") cs1 = CloudSyncMixin((p1, p2), roots1, storage, sleep=None) cs2 = CloudSyncMixin((p1, p3), roots2, storage, sleep=None) yield cs1, cs2 cs1.done() cs2.done() def test_sync_rename_away(multi_remote_cs): cs1, cs2 = multi_remote_cs remote_parent = "/remote" remote_path = "/remote/stuff1" local_parent1 = "/local1" local_parent2 = "/local2" local_path11 = "/local1/stuff1" local_path21 = "/local2/stuff1" cs1.providers[LOCAL].mkdir(local_parent1) cs1.providers[REMOTE].mkdir(remote_parent) cs2.providers[LOCAL].mkdir(local_parent2) cs2.providers[REMOTE].mkdir(remote_parent) linfo1 = cs1.providers[LOCAL].create(local_path11, BytesIO(b"hello1"), None) assert linfo1 cs1.run_until_found( (LOCAL, local_path11), (REMOTE, remote_path), timeout=2) cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print(use_sigs=False)) log.info("TABLE 2\n%s", cs2.state.pretty_print(use_sigs=False)) assert len(cs1.state) == 2 # 1 dirs, 1 files # This is the meat of the test. renaming out of one cloud bed into another # Which will potentially forget to sync up the delete to remote1, leaving # the file there and also in remote2 log.debug("here") linfo2 = cs1.providers[LOCAL].rename(linfo1.oid, local_path21) log.debug("here") cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print(use_sigs=False)) log.debug("here") cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE 2\n%s", cs2.state.pretty_print(use_sigs=False)) log.debug("here") try: cs1.run_until_found( WaitFor(LOCAL, local_path11, exists=False), timeout=2) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs2.run_until_found( (LOCAL, local_path21), timeout=2) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs2.run_until_found( (REMOTE, remote_path), timeout=2) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) # If renaming out of local1 didn't properly sync, the next line will time out cs1.run_until_found( WaitFor(REMOTE, remote_path, exists=False), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise # let cleanups/discards/dedups happen if needed cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 1 # 1 dir assert len(cs2.state) == 2 # 1 file and 1 dir def test_sync_multi_local_rename_conflict(multi_local_cs): cs1, cs2 = multi_local_cs local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" cs1_parent_oid = cs1.providers[LOCAL].mkdir(local_parent) cs2_parent_oid = cs2.providers[LOCAL].mkdir(local_parent) cs1.providers[REMOTE].mkdir(remote_parent) # also creates on cs2[REMOTE] linfo1 = cs1.providers[LOCAL].create(local_path1, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path2, BytesIO(b"hello2"), None) # Allow file1 to copy up to the cloud try: log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) # file1 up log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) # file2 up and file1 down log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) # file2 down log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs1.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs2.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: raise finally: log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) # test file rename conflict other_linfo1 = cs2.providers[LOCAL].info_path(local_path1) cs1.providers[LOCAL].rename(linfo1.oid, local_path1 + "a") # rename 'stuff1' to 'stuff1a' cs2.providers[LOCAL].rename(other_linfo1.oid, local_path1 + "b") # rename 'stuff1' to 'stuff1b' cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) # let rename 'stuff1a' sync to cloud log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) # try to sync 'stuff1b' rename. Conflict? log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) # if cloud changed, let the change come down log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) # let cleanups/discards/dedups happen if needed cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) a1 = cs1.providers[LOCAL].exists_path(local_path1 + "a") a2 = cs2.providers[LOCAL].exists_path(local_path1 + "a") b1 = cs1.providers[LOCAL].exists_path(local_path1 + "b") b2 = cs2.providers[LOCAL].exists_path(local_path1 + "b") dir1 = [x.name for x in cs1.providers[LOCAL].listdir(cs1_parent_oid)] dir2 = [x.name for x in cs2.providers[LOCAL].listdir(cs2_parent_oid)] log.debug("cs1=%s", dir1) log.debug("cs2=%s", dir2) assert a1 == a2 # either stuff1a exists on both providers, or neither assert b1 == b2 # either stuff1b exists on both providers, or neither assert all("conflicted" not in x for x in dir1) assert all("conflicted" not in x for x in dir2) def test_sync_multi_local(multi_local_cs): cs1, cs2 = multi_local_cs local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" cs1.providers[LOCAL].mkdir(local_parent) cs2.providers[LOCAL].mkdir(local_parent) cs1.providers[REMOTE].mkdir(remote_parent) # also creates on cs2[REMOTE] linfo1 = cs1.providers[LOCAL].create(local_path1, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path2, BytesIO(b"hello2"), None) # rinfo1 = cs1.providers[REMOTE].create(remote_path2, BytesIO(b"hello3"), None) # rinfo2 = cs2.providers[REMOTE].create(remote_path2, BytesIO(b"hello4"), None) assert linfo1 and linfo2 # and rinfo1 and rinfo2 # Allow file1 to copy up to the cloud try: cs1.run_until_found( (LOCAL, local_path1), (REMOTE, remote_path1), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 2, cs1.state.pretty_print() # 1 dirs, 1 files (haven't gotten the second file yet) # Allow file2 to copy up to the cloud, and to sync file1 down from the cloud to local2 try: cs2.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise linfo1 = cs1.providers[LOCAL].info_path(local_path1) rinfo1 = cs1.providers[REMOTE].info_path(remote_path1) linfo2 = cs2.providers[LOCAL].info_path(local_path2) rinfo2 = cs2.providers[REMOTE].info_path(remote_path2) assert linfo1.oid assert linfo2.oid assert rinfo1.oid assert rinfo2.oid assert linfo1.hash == rinfo1.hash assert linfo2.hash == rinfo2.hash assert len(cs1.state) == 2, cs1.state.pretty_print() # still the same as before assert len(cs2.state) == 3, cs2.state.pretty_print() # cs2 now has file1 and file2, plus the dir # Allow file2 to sync down to local1 try: cs1.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise # let cleanups/discards/dedups happen if needed cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE\n%s", cs2.state.pretty_print()) def test_sync_multi_remote(multi_remote_cs): cs1, cs2 = multi_remote_cs local_parent1 = "/local1" local_parent2 = "/local2" remote_parent1 = "/remote" remote_parent2 = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path11 = "/local1/stuff1" local_path21 = "/local2/stuff1" local_path12 = "/local1/stuff2" local_path22 = "/local2/stuff2" cs1.providers[LOCAL].mkdir(local_parent1) cs1.providers[REMOTE].mkdir(remote_parent1) cs2.providers[LOCAL].mkdir(local_parent2) cs2.providers[REMOTE].mkdir(remote_parent2) linfo1 = cs1.providers[LOCAL].create(local_path11, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path21, BytesIO(b"hello2"), None) rinfo1 = cs1.providers[REMOTE].create(remote_path2, BytesIO(b"hello3"), None) rinfo2 = cs2.providers[REMOTE].create(remote_path2, BytesIO(b"hello4"), None) assert linfo1 and linfo2 and rinfo1 and rinfo2 cs1.run_until_found( (LOCAL, local_path11), (LOCAL, local_path21), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) cs1.run(until=lambda: not cs1.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 3 # 1 dirs, 2 files, 1 never synced (local2 file) try: cs2.run_until_found( (LOCAL, local_path12), (LOCAL, local_path22), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise linfo12 = cs1.providers[LOCAL].info_path(local_path12) rinfo11 = cs1.providers[REMOTE].info_path(remote_path1) linfo22 = cs2.providers[LOCAL].info_path(local_path22) rinfo21 = cs2.providers[REMOTE].info_path(remote_path1) assert linfo12.oid assert linfo22.oid assert rinfo11.oid assert rinfo21.oid assert linfo12.hash == rinfo1.hash assert linfo22.hash == rinfo2.hash # let cleanups/discards/dedups happen if needed cs2.run(until=lambda: not cs2.state.changeset_len, timeout=1) log.info("TABLE\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 3 assert len(cs2.state) == 3 def test_cs_basic(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/stuff2" local_path2 = "/local/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"hello2"), None) cs.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) log.info("TABLE 1\n%s", cs.state.pretty_print()) linfo2 = cs.providers[LOCAL].info_path(local_path2) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo2.oid assert rinfo1.oid bio = BytesIO() cs.providers[REMOTE].download(rinfo1.oid, bio) assert bio.getvalue() == b'hello' bio = BytesIO() cs.providers[LOCAL].download(linfo2.oid, bio) assert bio.getvalue() == b'hello2' assert linfo2.hash == rinfo2.hash assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") # let cleanups/discards/dedups happen if needed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert len(cs.state) == 3 assert not cs.state.changeset_len def setup_remote_local(cs, *names): remote_parent = "/remote" local_parent = "/local" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) found = [] for name in names: remote_path1 = "/remote/" + name local_path1 = "/local/" + name linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) found.append((REMOTE, remote_path1)) cs.run_until_found(*found) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) @pytest.mark.repeat(4) def test_cs_create_delete_same_name(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) cs.run(until=lambda: not cs.state.changeset_len, timeout=2) rinfo = cs.providers[REMOTE].info_path(remote_path1) cs.emgrs[LOCAL].do() cs.providers[LOCAL].delete(linfo1.oid) cs.emgrs[LOCAL].do() linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) # run local event manager only... not sync cs.emgrs[LOCAL].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") rinfo = cs.providers[REMOTE].info_path(remote_path1) bio = BytesIO() cs.providers[REMOTE].download(rinfo.oid, bio) assert bio.getvalue() == b'goodbye' # this test used to fail about 2 out of 10 times because of embracing a change that *wasn't properly indexed* # it covers cases where events arrive in unexpected orders... could be possible to get the same coverage # with a more deterministic version @pytest.mark.repeat(10) def test_cs_create_delete_same_name_heavy(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) import time, threading def creator(): i = 0 while i < 10: try: linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file" + bytes(str(i),"utf8"))) i += 1 except CloudFileExistsError: linfo1 = cs.providers[LOCAL].info_path(local_path1) if linfo1: cs.providers[LOCAL].delete(linfo1.oid) time.sleep(0.01) def done(): bio = BytesIO() rinfo = cs.providers[REMOTE].info_path(remote_path1) if rinfo: cs.providers[REMOTE].download(rinfo.oid, bio) return bio.getvalue() == b'file' + bytes(str(9),"utf8") return False thread = threading.Thread(target=creator, daemon=True) thread.start() cs.run(until=done, timeout=3) thread.join() assert done() log.info("TABLE 2\n%s", cs.state.pretty_print()) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") def test_cs_rename_heavy(cs): remote_parent = "/remote" local_parent = "/local" remote_sub = "/remote/sub" local_sub = "/local/sub" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/sub/stuff1" local_path2 = "/local/sub/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].mkdir(local_sub) cs.providers[REMOTE].mkdir(remote_sub) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file")) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs.state.pretty_print()) import time, threading oid = linfo1.oid done = False ok = True def mover(): nonlocal done nonlocal oid nonlocal ok for _ in range(10): try: oid = cs.providers[LOCAL].rename(oid, local_path2) oid = cs.providers[LOCAL].rename(oid, local_path1) time.sleep(0.001) except Exception as e: log.exception(e) ok = False done = True thread = threading.Thread(target=mover, daemon=True) thread.start() cs.run(until=lambda: done, timeout=3) thread.join() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1 ) log.info("TABLE 3\n%s", cs.state.pretty_print()) assert ok assert cs.providers[REMOTE].info_path(remote_path1) def test_cs_two_conflicts(cs): remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # the local delete/create doesn't add entries assert(len(cs.state) == 3) else: assert(len(cs.state) == 4) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) # conflicted files are discarded, not in table log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 2) assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") or cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") b1 = BytesIO() b2 = BytesIO() cs.providers[LOCAL].download_path(local_path1, b1) cs.providers[LOCAL].download_path(local_path1 + ".conflicted", b2) assert b1.getvalue() in (b'goodbye', b'world') assert b2.getvalue() in (b'goodbye', b'world') assert b1.getvalue() != b2.getvalue() @pytest.mark.repeat(10) def test_cs_subdir_rename(cs): local_dir = "/local/a" local_base = "/local/a/stuff" local_dir2 = "/local/b" remote_base = "/remote/a/stuff" remote_base2 = "/remote/b/stuff" kid_count = 4 cs.providers[LOCAL].mkdir("/local") lpoid = cs.providers[LOCAL].mkdir(local_dir) lpaths = [] rpaths = [] rpaths2 = [] for i in range(kid_count): lpath = local_base + str(i) rpath = remote_base + str(i) rpath2 = remote_base2 + str(i) cs.providers[LOCAL].create(lpath, BytesIO(b'hello')) lpaths.append(lpath) rpaths.append((REMOTE, rpath)) rpaths2.append((REMOTE, rpath2)) cs.run_until_found(*rpaths, timeout=2) log.info("TABLE 1\n%s", cs.state.pretty_print()) cs.providers[LOCAL].rename(lpoid, local_dir2) for _ in range(10): cs.do() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.run_until_found(*rpaths2, timeout=2) log.info("TABLE 2\n%s", cs.state.pretty_print()) # this test is sensitive to the order in which things are processed # so run it a few times def test_cs_rename_over(cs): remote_parent = "/remote" local_parent = "/local" fn1 = "hello1" fn2 = "hello2" local_path1 = cs.providers[LOCAL].join(local_parent, fn1) # "/local/hello1" local_path2 = cs.providers[LOCAL].join(local_parent, fn2) # "/local/hello2" remote_path1 = cs.providers[REMOTE].join(remote_parent, fn1) # "/remote/hello1" remote_path2 = cs.providers[REMOTE].join(remote_parent, fn2) # "/remote/hello2" lpoid = cs.providers[LOCAL].mkdir(local_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(local_path1.encode('utf-8'))) linfo2 = cs.providers[LOCAL].create(local_path2, BytesIO(local_path1.encode('utf-8'))) cs.run_until_found((REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) log.info("TABLE 1\n" + cs.state.pretty_print(use_sigs=False)) # rename a file over another file by deleting the target and doing the rename in quick succession cs.providers[LOCAL].delete(linfo2.oid) cs.providers[LOCAL].rename(linfo1.oid, local_path2) log.info("TABLE 2\n" + cs.state.pretty_print(use_sigs=False)) cs.run_until_found(WaitFor(side=REMOTE, path=remote_path1, exists=False), timeout=2) log.info("TABLE 3\n%s", cs.state.pretty_print(use_sigs=False)) # check the contents to make sure that path2 has path1's content new_oid = cs.providers[REMOTE].info_path(remote_path2).oid contents = BytesIO() cs.providers[REMOTE].download(new_oid, contents) assert contents.getvalue() == local_path1.encode('utf-8') # check that the folders each have only one file, and that it's path2 rpoid = cs.providers[REMOTE].info_path(remote_parent).oid ldir = list(cs.providers[LOCAL].listdir(lpoid)) rdir = list(cs.providers[REMOTE].listdir(rpoid)) log.debug("ldir = %s", ldir) log.debug("rdir = %s", rdir) assert len(ldir) == 1 assert ldir[0].path == local_path2 assert len(rdir) == 1 assert rdir[0].path == remote_path2 @pytest.mark.repeat(10) def test_cs_folder_conflicts_file(cs): remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff1/under" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) cs.providers[REMOTE].mkdir(remote_path1) cs.providers[REMOTE].create(remote_path2, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # there won't be 2 rows for /local/stuff1 is oid_is_path assert(len(cs.state) == 4) locs = cs.state.lookup_path(LOCAL, local_path1) assert locs and len(locs) == 1 # loc = locs[0] # assert loc[LOCAL].otype == FILE # assert loc[REMOTE].otype == DIRECTORY else: # deleted /local/stuff, remote/stuff, remote/stuff/under, lcoal/stuff, /local assert(len(cs.state) == 5) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 4 or len(cs.state) == 3) local_conf = cs.providers[LOCAL].info_path(local_path1 + ".conflicted") remote_conf = cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") assert local_conf and not remote_conf # file was moved out of the way for the folder assert local_conf.otype == FILE # folder won local_conf = cs.providers[LOCAL].info_path(local_path1) assert local_conf.otype == DIRECTORY @pytest.fixture(params=[(MockStorage, dict()), (SqliteStorage, 'file::memory:?cache=shared')], ids=["mock_storage", "sqlite_storage"], name="storage" ) def storage_fixture(request): return request.param def test_storage(storage): roots = ("/local", "/remote") storage_class = storage[0] storage_mechanism = storage[1] class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = MockProvider(oid_is_path=False, case_sensitive=True) p2 = MockProvider(oid_is_path=False, case_sensitive=True) storage1: Storage = storage_class(storage_mechanism) cs1: CloudSync = CloudSyncMixin((p1, p2), roots, storage1, sleep=None) old_cursor = cs1.emgrs[0].state.storage_get_data(cs1.emgrs[0]._cursor_tag) assert old_cursor is not None log.debug("cursor=%s", old_cursor) test_cs_basic(cs1) # do some syncing, to get some entries into the state table storage2 = storage_class(storage_mechanism) cs2: CloudSync = CloudSyncMixin((p1, p2), roots, storage2, sleep=None) log.debug(f"state1 = {cs1.state.entry_count()}\n{cs1.state.pretty_print()}") log.debug(f"state2 = {cs2.state.entry_count()}\n{cs2.state.pretty_print()}") def not_dirty(s: SyncState): se: SyncEntry for se in s.get_all(): assert not se.dirty def compare_states(s1: SyncState, s2: SyncState) -> List[SyncEntry]: ret = [] found = False e1: SyncEntry for e1 in s1.get_all(): e2: SyncEntry for e2 in s2.get_all(): if e1.serialize() == e2.serialize(): found = True if not found: ret.append(e1) return ret not_dirty(cs1.state) missing1 = compare_states(cs1.state, cs2.state) missing2 = compare_states(cs2.state, cs1.state) for e in missing1: log.debug(f"entry in 1 not found in 2\n{e.pretty()}") for e in missing2: log.debug(f"entry in 2 not found in 1\n{e.pretty()}") if missing1 or missing2: log.debug("TABLE 1\n%s", cs1.state.pretty_print()) log.debug("TABLE 2\n%s", cs2.state.pretty_print()) assert not missing1 assert not missing2 new_cursor = cs1.emgrs[0].state.storage_get_data(cs1.emgrs[0]._cursor_tag) log.debug("cursor=%s %s", old_cursor, new_cursor) assert new_cursor is not None assert old_cursor != new_cursor @pytest.mark.parametrize("drain", [None, LOCAL, REMOTE]) def test_cs_already_there(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"hello"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") @pytest.mark.parametrize("drain", [LOCAL, REMOTE]) def test_cs_already_there_conflict(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) cs.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") or cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") def test_conflict_recover(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/new_path" local_path2 = "/local/new_path" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) cs.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye"), None) # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash local_conflicted_path = local_path1 + ".conflicted" remote_conflicted_path = remote_path1 + ".conflicted" local_conflicted = cs.providers[LOCAL].info_path(local_conflicted_path) remote_conflicted = cs.providers[REMOTE].info_path(remote_conflicted_path) log.info("CONFLICTED TABLE\n%s", cs.state.pretty_print()) # Check that exactly one of the files is present assert bool(local_conflicted) or bool(remote_conflicted) assert bool(local_conflicted) != bool(remote_conflicted) log.info("BEFORE RENAME AWAY\n%s", cs.state.pretty_print()) # Rename the conflicted file to something new if local_conflicted: cs.providers[LOCAL].rename(local_conflicted.oid, local_path2) else: cs.providers[REMOTE].rename(remote_conflicted.oid, remote_path2) # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("AFTER RENAME AWAY\n%s", cs.state.pretty_print()) local_conflicted = cs.providers[LOCAL].info_path(local_conflicted_path) remote_conflicted = cs.providers[REMOTE].info_path(remote_conflicted_path) assert not bool(local_conflicted) and not bool(remote_conflicted) local_new = cs.providers[LOCAL].info_path(local_path2) remote_new = cs.providers[REMOTE].info_path(remote_path2) assert bool(local_new) and bool(remote_new) def test_conflict_recover_modify(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/new_path" local_path2 = "/local/new_path" # Create a clear-cut conflict cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) cs.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye"), None) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash local_conflicted_path = local_path1 + ".conflicted" remote_conflicted_path = remote_path1 + ".conflicted" local_conflicted = cs.providers[LOCAL].info_path(local_conflicted_path) remote_conflicted = cs.providers[REMOTE].info_path(remote_conflicted_path) log.info("CONFLICTED TABLE\n%s", cs.state.pretty_print()) # Check that exactly one of the files is present assert bool(local_conflicted) or bool(remote_conflicted) assert bool(local_conflicted) != bool(remote_conflicted) if local_conflicted: assert local_conflicted.hash != linfo1.hash old_hash = local_conflicted.hash else: assert remote_conflicted.hash != rinfo1.hash old_hash = remote_conflicted.hash # Write some new content log.info("BEFORE MODIFY\n%s", cs.state.pretty_print()) new_content = BytesIO(b"new content pls ignore") if local_conflicted: new_hash = cs.providers[LOCAL].upload(local_conflicted.oid, new_content).hash else: new_hash = cs.providers[REMOTE].upload(remote_conflicted.oid, new_content).hash assert new_hash != old_hash cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("AFTER MODIFY\n%s", cs.state.pretty_print()) local_conflicted = cs.providers[LOCAL].info_path(local_conflicted_path) remote_conflicted = cs.providers[REMOTE].info_path(remote_conflicted_path) assert bool(local_conflicted) or bool(remote_conflicted) assert bool(local_conflicted) != bool(remote_conflicted) # Rename the conflicted file to something new log.info("BEFORE RENAME AWAY\n%s", cs.state.pretty_print()) if local_conflicted: cs.providers[LOCAL].rename(local_conflicted.oid, local_path2) else: cs.providers[REMOTE].rename(remote_conflicted.oid, remote_path2) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("AFTER RENAME AWAY\n%s", cs.state.pretty_print()) local_conflicted = cs.providers[LOCAL].info_path(local_conflicted_path) remote_conflicted = cs.providers[REMOTE].info_path(remote_conflicted_path) assert not bool(local_conflicted) and not bool(remote_conflicted) # Check that the new path was uploaded local_new = cs.providers[LOCAL].info_path(local_path2) remote_new = cs.providers[REMOTE].info_path(remote_path2) assert bool(local_new) and bool(remote_new) assert local_new.hash == remote_new.hash # And now make sure that we're correctly processing new changes on the file newer_content = BytesIO(b"ok this is the last time fr") cs.providers[LOCAL].upload(local_new.oid, newer_content) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) local_newer = cs.providers[LOCAL].info_path(local_path2) remote_newer = cs.providers[REMOTE].info_path(remote_path2) assert bool(local_newer) and bool(remote_newer) assert local_newer.hash == remote_newer.hash @pytest.mark.parametrize('right', (True, False), ids=["right_cs", "right_in"]) @pytest.mark.parametrize('left', (True, False), ids=["left_cs", "left_in"]) def test_cs_rename_folder_case(mock_provider_creator, left, right): cs = make_cs(mock_provider_creator, (True, left), (False, right)) local_parent = "/local" remote_parent = "/remote" local_path1 = "/local/a" local_path2 = "/local/a/b" local_path3 = "/local/A" remote_path1 = "/remote/A" remote_path2 = "/remote/A/b" cs.providers[LOCAL].mkdir(local_parent) ldir = cs.providers[LOCAL].mkdir(local_path1) linfo = cs.providers[LOCAL].create(local_path2, BytesIO(b"hello"), None) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.debug("--- cs: %s ---", [e.case_sensitive for e in cs.providers]) cs.providers[LOCAL].log_debug_state() cs.providers[REMOTE].log_debug_state() cs.providers[LOCAL].rename(ldir, local_path3) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) rinfo2 = cs.providers[REMOTE].info_path(remote_path2) assert rinfo1 and rinfo2 assert rinfo1.path == remote_path1 assert rinfo2.path == remote_path2 # TODO: important tests: # 1. events coming in for path updates for conflicted files.... we should note conflict oids, and not insert them # 2. for oid_as_path... events coming in for old creations, long since deleted or otherwise overwritten (renamed away, etc) def test_cs_disconnect(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) cs.providers[REMOTE].disconnect() assert not cs.providers[REMOTE].connected cs.run(until=lambda: cs.providers[REMOTE].connected, timeout=1) assert not cs.providers[REMOTE].info_path(remote_path1) cs.run_until_found((REMOTE, remote_path1)) def test_cs_rename_tmp(cs): remote_parent = "/remote" local_parent = "/local" remote_sub = "/remote/sub" local_sub = "/local/sub" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/sub/stuff1" local_path2 = "/local/sub/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].mkdir(local_sub) cs.providers[REMOTE].mkdir(remote_sub) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 1\n%s", cs.state.pretty_print()) import time, threading done = False ok = True def mover(): nonlocal done nonlocal ok for _ in range(10): try: linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file")) linfo2 = cs.providers[LOCAL].info_path(local_path2) if linfo2: without_event = isinstance(cs.providers[LOCAL], MockProvider) if without_event: cs.providers[LOCAL]._delete(linfo2.oid, without_event=True) else: cs.providers[LOCAL].delete(linfo2.oid) cs.providers[LOCAL].rename(linfo1.oid, local_path2) time.sleep(0.001) except Exception as e: log.exception(e) ok = False done = True thread = threading.Thread(target=mover, daemon=True) thread.start() cs.run(until=lambda: done, timeout=3) thread.join() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1 ) log.info("TABLE 3\n%s", cs.state.pretty_print()) assert ok assert cs.providers[REMOTE].info_path(remote_path2) assert not cs.providers[REMOTE].info_path(remote_path2 + ".conflicted") assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") def test_cursor(cs_storage): cs = cs_storage[0] storage = cs_storage[1] local_parent = "/local" remote_parent = "/remote" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello1"), None) cs.run_until_found( (LOCAL, local_path1), (REMOTE, remote_path1), timeout=2) # let cleanups/discards/dedups happen if needed cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE\n%s", cs.state.pretty_print()) assert len(cs.state) == 2 assert not cs.state.changeset_len linfo1 = cs.providers[LOCAL].create(local_path2, BytesIO(b"hello2"), None) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = cs.providers[LOCAL] p2 = cs.providers[REMOTE] p1.current_cursor = None p2.current_cursor = None roots = cs.roots cs2 = CloudSyncMixin((p1, p2), roots, storage=storage, sleep=None) cs2.run_until_found( (LOCAL, local_path2), timeout=2) cs2.run_until_found( (REMOTE, remote_path2), timeout=2) cs2.done() @pytest.mark.repeat(3) def test_cs_rename_up(cs): remote_parent = "/remote" local_parent = "/local" remote_sub = "/remote/sub" local_sub = "/local/sub" remote_path1 = "/remote/sub/stuff1" local_path1 = "/local/sub/stuff1" remote_path2 = "/remote/stuff1" local_path2 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].mkdir(local_sub) cs.providers[REMOTE].mkdir(remote_sub) cs.do() cs.run(until=lambda: not cs.state.changeset_len, timeout=1) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file")) linfo2 = cs.providers[LOCAL].create(local_path2, BytesIO(b"file")) cs.run(until=lambda: not cs.state.changeset_len, timeout=1 ) log.info("TABLE 1\n%s", cs.state.pretty_print()) without_event = isinstance(cs.providers[LOCAL], MockProvider) if without_event: cs.providers[LOCAL]._delete(linfo2.oid, without_event=True) else: cs.providers[LOCAL].delete(linfo2.oid) cs.providers[LOCAL].rename(linfo1.oid, local_path2) cs.run_until_found( WaitFor(REMOTE, remote_path1, exists=False), WaitFor(REMOTE, remote_path2, exists=True), timeout=2) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path2 + ".conflicted") assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert cs.providers[REMOTE].info_path(remote_path2) def test_many_small_files_mkdir_perf(cs): local_root = "/local" remote_root = "/remote" cs.providers[LOCAL].mkdir(local_root) cs.providers[REMOTE].mkdir(remote_root) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) def make_files(dir_suffix: str, clear_before: bool): local_base = f"{local_root}/{dir_suffix}" local_file_base = f"{local_base}/file" + dir_suffix remote_base = f"{remote_root}/{dir_suffix}" remote_file_base = f"{remote_base}/file" + dir_suffix # Let's make some subdirs cs.providers[LOCAL].mkdir(local_base) # Optionally, give ourselves a clean slate before starting to process # all the file uploads. Since all the child file events rely on this # happening first, it's easy for performance issues to sneak in. if clear_before: cs.run(until=lambda: not cs.state.changeset_len, timeout=1) # Upload 20 x 3 KiB files. The size and number shouldn't actually # matter. content = BytesIO(b"\0" * (3 * 1024)) for i in range(20): local_file_name = local_file_base + str(i) linfo = cs.providers[LOCAL].create(local_file_name, content, None) assert linfo is not None cs.run(until=lambda: not cs.state.changeset_len, timeout=10) for i in range(20): rinfo = cs.providers[REMOTE].info_path(remote_file_base + str(i)) assert rinfo is not None local_old_api = cs.providers[LOCAL]._api remote_old_api = cs.providers[REMOTE]._api # Count the number of API hits without the clean slate with patch.object(cs.providers[LOCAL], "_api", side_effect=local_old_api) as local_no_clear, \ patch.object(cs.providers[REMOTE], "_api", side_effect=remote_old_api) as remote_no_clear: make_files("_no_clear", clear_before=False) # Count the number of API hits with the clean slate with patch.object(cs.providers[LOCAL], "_api", side_effect=local_old_api) as local_clear, \ patch.object(cs.providers[REMOTE], "_api", side_effect=remote_old_api) as remote_clear: make_files("_clear", clear_before=True) # Check that the two are approximately the same assert abs(local_no_clear.call_count - local_clear.call_count) < 3 assert abs(remote_no_clear.call_count - remote_clear.call_count) < 3 def test_cs_folder_conflicts_del(cs): local_path1 = "/local/stuff1" local_path1_u = "/local/stuff1/under" remote_path1 = "/remote/stuff1" remote_path1_u = "/remote/stuff1/under" local_path2 = "/local/stuff2" local_path2_u = "/local/stuff2/under" remote_path2 = "/remote/stuff2" remote_path2_u = "/remote/stuff2/under" remote_path3 = "/remote/stuff3" remote_path3_u = "/remote/stuff3/under" cs.providers[LOCAL].mkdir("/local") linfo1_oid = cs.providers[LOCAL].mkdir(local_path1) cs.providers[LOCAL].create(local_path1_u, BytesIO(b'fff')) cs.run_until_found( (REMOTE, remote_path1), (REMOTE, remote_path1_u), timeout=2) log.info("TABLE 0\n%s", cs.state.pretty_print()) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) # rename both at same time cs.providers[LOCAL].rename(linfo1_oid, local_path2) rinfo3_oid = cs.providers[REMOTE].rename(rinfo1.oid, remote_path3) # then delete remote rinfo3_u = cs.providers[REMOTE].info_path(remote_path3_u) cs.providers[REMOTE].delete(rinfo3_u.oid) cs.providers[REMOTE].delete(rinfo3_oid) cs.run(until=lambda: cs.state.changeset_len == 0, timeout=1) log.info("TABLE 1\n%s", cs.state.pretty_print()) assert cs.state.changeset_len == 0 # either a deletion happend or a rename... whatever if cs.providers[REMOTE].info_path(remote_path2_u): assert cs.providers[LOCAL].info_path(local_path2_u) assert cs.providers[REMOTE].info_path(remote_path2) else: assert not cs.providers[LOCAL].info_path(local_path2_u) assert not cs.providers[LOCAL].info_path(local_path2) def test_api_hit_perf(cs): local_root = "/local" remote_root = "/remote" local_file_base = f"{local_root}/file" remote_file_base = f"{remote_root}/file" cs.providers[LOCAL].mkdir(local_root) cs.providers[REMOTE].mkdir(remote_root) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) remote_old_api = cs.providers[REMOTE]._api import traceback remote_counter = 0 def remote_tb_api(*a, **kw): nonlocal remote_counter remote_counter += 1 log.debug("API HIT %s", traceback.format_stack(limit=7)) remote_old_api(*a, **kw) # upload 10 files content = BytesIO(b"hello") prev_oid = None for i in range(10): local_file_name = local_file_base + str(i) info = cs.providers[LOCAL].create(local_file_name, content, None) if prev_oid: cs.providers[LOCAL].delete(prev_oid) prev_oid = cs.providers[LOCAL].rename(info.oid, local_file_base) cs.emgrs[0].do() log.debug("RUNNING") with patch.object(cs.providers[REMOTE], "_api", side_effect=remote_tb_api): cs.run(until=lambda: not cs.state.changeset_len, timeout=2) assert abs(remote_counter) < 20 for i in range(10): remote_file_name = remote_file_base + str(i) rinfo = cs.providers[REMOTE].info_path(remote_file_name) assert rinfo is None rinfo = cs.providers[REMOTE].info_path(remote_file_base) assert rinfo def test_dir_delete_give_up(cs): # Local: dir with file inside, delete file then dir # Remote: at same time, add file to dir # Sometimes we'll process the remote file add before our dir delete, so we cannot # dir delete # In that case, we should not retry the dir delete forever local_parent = "/local" remote_parent = "/remote" local_dir = "/local/dir" remote_dir = "/remote/dir" local_f1 = "/local/dir/f1" remote_f1 = "/remote/dir/f1" local_f2 = "/local/dir/f2" remote_f2 = "/remote/dir/f2" # Setup, create initial dir and file lpoid = cs.providers[LOCAL].mkdir(local_parent) rpoid = cs.providers[REMOTE].mkdir(remote_parent) ldoid = cs.providers[LOCAL].mkdir(local_dir) lf1obj = cs.providers[LOCAL].create(local_f1, BytesIO(b"hello"), None) cs.run_until_found( (LOCAL, local_dir), (LOCAL, local_f1), (REMOTE, remote_dir), (REMOTE, remote_f1), timeout=2) # Delete local dir while adding file remotely cs.providers[LOCAL].delete(lf1obj.oid) cs.providers[LOCAL].delete(ldoid) cs.providers[REMOTE].create(remote_f2, BytesIO(b"goodbye"), None) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) assert cs.state.changeset_len == 0 ldir = list(cs.providers[LOCAL].listdir(lpoid)) rdir = list(cs.providers[REMOTE].listdir(rpoid)) # dirs should still exist on both sides assert len(rdir) == 1 assert rdir[0].path == remote_dir assert len(ldir) == 1 assert ldir[0].path == local_dir def test_replace_dir(cs): local = cs.providers[LOCAL] remote = cs.providers[REMOTE] local.mkdir("/local") remote.mkdir("/remote") def get_oid(prov, path): return prov.info_path(path).oid # Make first set local.mkdir("/local/Test") local.create("/local/Test/Excel.xlsx", BytesIO(b"aaa")) # Make "copy" of files local.mkdir("/local/Test2") linfo = local.create("/local/Test2/Excel.xlsx", BytesIO(b"aab")) cs.run(until=lambda: not cs.state.changeset_len, timeout=1) # Now simulate a dir replace (e.g. shutil rmtree/rename) local.delete(get_oid(local, "/local/Test/Excel.xlsx")) local.delete(get_oid(local, "/local/Test")) local.rename(get_oid(local, "/local/Test2"), "/local/Test") cs.run(until=lambda: not cs.state.changeset_len, timeout=1) log.info("END TABLE\n%s", cs.state.pretty_print()) # Check that the file got synced rinfo = remote.info_path("/remote/Test/Excel.xlsx") assert rinfo # Check that the *correct* file got synced assert linfo.hash == rinfo.hash # That was the important one, what follows are just checks for consistent # internal state # Check that there aren't any weird duplicate entries on the remote bad_rinfo_dir = remote.info_path("/remote/Test2") assert bad_rinfo_dir is None bad_rinfo_file = remote.info_path("/remote/Test2/Excel.xlsx") assert bad_rinfo_file is None # And do the same for local bad_linfo_dir = local.info_path("/local/Test2") assert bad_linfo_dir is None bad_linfo_file = local.info_path("/local/Test2/Excel.xlsx") assert bad_linfo_file is None def test_out_of_space(cs): local = cs.providers[LOCAL] remote = cs.providers[REMOTE] remote.set_quota(1024) local.mkdir("/local") remote.mkdir("/remote") local.create("/local/foo", BytesIO(b'0' * 1025)) with pytest.raises(TimeoutError): cs.run(until=lambda: not cs.state.changeset_len, timeout=0.25) log.info("END TABLE\n%s", cs.state.pretty_print()) assert cs.smgr.in_backoff() assert cs.state.changeset_len def test_cs_prioritize(cs): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" remote_path1 = "/remote/stuff1" local_path2 = "/local/stuff2" remote_path2 = "/remote/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) lp1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rp2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"hello2")) # aging 3 seconds... nothing should get processed cs.aging = 4 cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() ent1 = cs.state.lookup_oid(LOCAL, lp1.oid) ent2 = cs.state.lookup_oid(REMOTE, rp2.oid) assert ent1[LOCAL].changed > 0 assert ent2[REMOTE].changed > 0 # ensure ent2 is later... regardless of clock granularity ent2[REMOTE].changed = ent1[LOCAL].changed + 0.01 prev_len = cs.state.changeset_len cs.do() # nothing is happening because aging is too long assert cs.state.changeset_len == prev_len # this should also prioritize the remote, even though the local doesn't exist cs.prioritize(LOCAL, local_path2) log.info("BEFORE TABLE\n%s", cs.state.pretty_print()) cs.run_until_found( (LOCAL, local_path2) ) log.info("AFTER TABLE\n%s", cs.state.pretty_print()) assert cs.providers[LOCAL].info_path(local_path2) assert not cs.providers[REMOTE].info_path(remote_path1) MERGE = 2 @pytest.mark.parametrize("side_locked", [ (LOCAL, []), (LOCAL, [LOCAL]), (REMOTE, []), (REMOTE, [REMOTE]), (MERGE, []), (MERGE, [LOCAL, REMOTE]), # (MERGE, REMOTE), # (MERGE, LOCAL), ]) def test_hash_mess(cs, side_locked): (side, locks) = side_locked local = cs.providers[LOCAL] remote = cs.providers[REMOTE] assert not remote.oid_is_path local.mkdir("/local") remote.mkdir("/remote") def get_oid(prov, path): return prov.info_path(path).oid # Make first set local.mkdir("/local/") linfo = local.create("/local/foo", BytesIO(b"aaa")) cs.run_until_found((REMOTE, "/remote/foo")) rinfo = remote.info_path("/remote/foo") renamed_path = ("/local/foo-l", "/remote/foo-r") local_oid = local.rename(linfo.oid, "/local/foo-l") remote_oid = remote.rename(rinfo.oid, "/remote/foo-r") local.upload(local_oid, BytesIO(b"zzz1")) remote.upload(remote_oid, BytesIO(b"zzz2")) f3 = BytesIO(b'merged') if side == LOCAL: cs.smgr._resolve_conflict = lambda f1, f2: (f1, False) elif side == REMOTE: cs.smgr._resolve_conflict = lambda f1, f2: (f2, False) elif side == MERGE: cs.smgr._resolve_conflict = lambda f1, f2: (f3, False) log.info("START TABLE\n%s", cs.state.pretty_print()) if locks: def _called(msg): _called.count += 1 # type: ignore raise CloudTemporaryError("CALLED %s" % msg) _called.count = 0 # type: ignore with patch("cloudsync.tests.fixtures.mock_provider.CloudTemporaryError", new=_called): for locked in locks: cs.providers[locked].locked_for_test.add(renamed_path[locked]) cs.run(until=lambda: _called.count > 0, timeout=1) # type: ignore for locked in locks: cs.providers[locked].locked_for_test.discard(renamed_path[locked]) cs.run(until=lambda: not cs.state.changeset_len, timeout=0.25) log.info("END TABLE\n%s", cs.state.pretty_print()) l_r = local.info_path("/local/foo-r") l_l = local.info_path("/local/foo-l") r_l = remote.info_path("/remote/foo-l") r_r = remote.info_path("/remote/foo-r") assert l_r is not None or l_l is not None assert r_r is not None or r_l is not None assert bool(l_r) == bool(r_r) assert bool(r_l) == bool(l_l) PKL9O^28--cloudsync/tests/test_events.pyimport time from io import BytesIO import threading import pytest from cloudsync import EventManager, SyncState, LOCAL @pytest.fixture(name="manager") def fixture_manager(mock_provider_generator): # TODO extend this to take any provider provider = mock_provider_generator() state = SyncState((provider, provider), shuffle=True) yield EventManager(provider, state, LOCAL) def test_event_basic(manager): provider = manager.provider state = manager.state info = provider.create("/dest", BytesIO(b'hello')) assert not state.lookup_path(LOCAL, "/dest") oid = None # this is normally a blocking function that runs forever def done(): nonlocal oid states = state.get_all() if states: oid = list(states)[0][LOCAL].oid return state.lookup_oid(LOCAL, oid) return False # loop the sync until the file is found manager.run(timeout=1, until=done) assert oid info = provider.info_oid(oid) assert info.path == "/dest" def test_events_shutdown_event_shouldnt_process(manager): handle = threading.Thread(target=manager.run, kwargs={'sleep': .3}, daemon=True) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) try: manager.events.__next__() except StopIteration: assert False try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() def test_events_shutdown_force_process_event(manager): handle = threading.Thread(target=manager.run, kwargs={'sleep': .3}, daemon=True) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) manager.do() try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() PKL9O`ggcloudsync/tests/test_mux.pyimport threading from cloudsync.muxer import Muxer from typing import Callable, List def test_simple_mux(): def gen(): yield from range(4) m1 = Muxer(gen) m2 = Muxer(gen) assert len(list(m1)) == 4 assert len(list(m2)) == 4 def test_thready_mux(): threads = 10 count = 1000 def gen(): yield from range(count) def counter(m): def inner(): inner.count = 0 # type: ignore for _ in m: inner.count += 1 # type: ignore return inner m: List[Muxer] = [None] * threads c: List[Callable] = [None] * threads t: List[threading.Thread] = [None] * threads for i in range(threads): m[i] = Muxer(gen) c[i] = counter(m[i]) t[i] = threading.Thread(target=c[i], daemon=True) for i in range(threads): t[i].start() for i in range(threads): t[i].join() assert c[i].count == count # type: ignore def test_later_mux(): def gen(): yield from range(4) m1 = Muxer(gen) assert next(m1) == 0 m2 = Muxer(gen) assert len(m1.listeners) == 2 assert len(list(m1)) == 3 assert len(list(m2)) == 3 def test_restart_mux(): def gen(): yield from range(4) m1 = Muxer(gen, restart=True) m2 = Muxer(gen, restart=True) assert len(m1.listeners) == 2 assert len(list(m1)) == 4 assert len(list(m2)) == 8 assert len(list(m1)) == 8 assert len(list(m2)) == 8 def test_del(): def gen(): yield from range(4) m1 = Muxer(gen) _ = next(m1) m2 = Muxer(gen) lm2 = list(m2) assert len(m2.listeners) == 2 assert len(lm2) == 3 m2.__del__() assert len(m1.listeners) == 1 _ = list(m1) assert gen in Muxer.already m1.__del__() assert len(m1.listeners) == 0 assert gen not in Muxer.already PKL9O$*cloudsync/tests/test_oauth_redir_server.pyimport logging import time import threading import requests from unittest.mock import Mock, patch from cloudsync.oauth import OAuthRedirServer from cloudsync.oauth.apiserver import ApiServer log = logging.getLogger(__name__) def resp_gen(success: bool, error_msg: str) -> str: time.sleep(2) # We are making a biiiiiig response that takes way too long if success: return f'Success' else: return f'Failure: {error_msg}' shutdown_signal = threading.Event() class EventApiServer(ApiServer): def __init__(self, *args, **kwargs): # This signal ensures that the test function only starts shutting down the oauth server after the request is # received by the OAuth server super().__init__(*args, **kwargs) def __call__(self, *args, **kwargs): shutdown_signal.set() return super().__call__(*args, **kwargs) @patch('cloudsync.oauth.redir_server.ApiServer', EventApiServer) def test_oauth_redir_server(): srv = OAuthRedirServer(html_generator=resp_gen) on_success = Mock() on_failure = Mock() srv.run(on_success=on_success, on_failure=on_failure, use_predefined_ports=False) port = srv.port() def send_req(): res = requests.get(url=f'http://127.0.0.1:{port}/auth/', params={ 'state': ['-T_wMR7edzQAc8i3UiH3Fg=='], 'error_description': ['Long error descrption'], 'error': ['badman'] }) assert res.status_code == 200 t = threading.Thread(target=send_req, daemon=True) t.start() shutdown_signal.wait() srv.shutdown() t.join(4) assert not t.is_alive() on_success.assert_not_called() on_failure.assert_called_once() PKL9Oߵ cloudsync/tests/test_provider.pyimport os import time import logging from io import BytesIO from unittest.mock import patch from typing import Union, NamedTuple, Optional, Generator, TYPE_CHECKING, List, cast import pytest import cloudsync from cloudsync import Event, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError, CloudOutOfSpaceError, FILE from cloudsync.tests.fixtures import Provider, mock_provider_instance from cloudsync.runnable import time_helper from cloudsync.types import OInfo # from cloudsync.providers import GDriveProvider, DropboxProvider log = logging.getLogger(__name__) # this is, apparently, the only way to deal with mixins, see: https://github.com/python/mypy/issues/5837 if TYPE_CHECKING: # we know that the providerhelper will always be mixed in with a provider ProviderBase = Provider else: # but we can't actually derive from it or stuff will break ProviderBase = object class ProviderHelper(ProviderBase): def __init__(self, prov): self.api_retry = True self.prov = prov self.test_root = getattr(self.prov, "test_root", None) self.event_timeout = getattr(self.prov, "event_timeout", 20) self.event_sleep = getattr(self.prov, "event_sleep", 1) self.creds = getattr(self.prov, "creds", {}) self.prov_api_func = self.prov._api self.prov._api = lambda *ar, **kw: self.__api_retry(self._api, *ar, **kw) self.prov.connect(self.creds) assert prov.connection_id if not self.test_root: # if the provider class doesn't specify a testing root # then just make one up self.test_root = "/" + os.urandom(16).hex() self.prov.mkdir(self.test_root) def _api(self, *ar, **kw): return self.prov_api_func(*ar, **kw) def __api_retry(self, func, *ar, **kw): # the cloud providers themselves should *not* have their own backoff logic # rather, they should punt rate limit and temp errors to the sync system # since we're not testing the sync system here, we need to make our own if not self.api_retry: return func(*ar, **kw) for _ in time_helper(timeout=self.event_timeout, sleep=self.event_sleep, multiply=2): try: return func(*ar, **kw) except CloudTemporaryError: log.info("api retry %s %s %s", func, ar, kw) # TEST-ROOT WRAPPER def __getattr__(self, k): return getattr(self.prov, k) def events(self) -> Generator[Event, None, None]: for e in self.prov.events(): if self.__filter_root(e): yield e def walk(self, path, since=None): path = self.__add_root(path) log.debug("WALK %s", path) for e in self.prov.walk(path): if self.__filter_root(e): yield e def download(self, *args, **kwargs): return self.__strip_root(self.prov.download(*args, **kwargs)) def create(self, path, file_like, metadata=None): path = self.__add_root(path) log.debug("CREATE %s", path) return self.__strip_root(self.prov.create(path, file_like, metadata)) def upload(self, *args, **kwargs): return self.__strip_root(self.prov.upload(*args, **kwargs)) def rename(self, oid, path): path = self.__add_root(path) return self.__strip_root(self.prov.rename(oid, path)) def mkdir(self, path): path = self.__add_root(path) return self.__strip_root(self.prov.mkdir(path)) def delete(self, *args, **kwargs): return self.__strip_root(self.prov.delete(*args, **kwargs)) def exists_oid(self, oid): return self.prov.exists_oid(oid) def exists_path(self, path): path = self.__add_root(path) return self.prov.exists_path(path) def info_path(self, path: str) -> Optional[OInfo]: path = self.__add_root(path) return self.__strip_root(self.prov.info_path(path)) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: return self.__strip_root(self.prov.info_oid(oid)) def listdir(self, oid): for e in self.prov.listdir(oid): if self.__filter_root(e): yield e def __add_root(self, path): return self.join(self.test_root, path) def __filter_root(self, obj): if hasattr(obj, "path"): raw_path = obj.path if not raw_path: info = self.prov.info_oid(obj.oid) if info: raw_path = info.path if not raw_path: # pathless objects always get passed through # so isolation is not perfect return True if not self.is_subpath(self.test_root, raw_path): return False self.__strip_root(obj) return True def __strip_root(self, obj): if hasattr(obj, "path"): path = obj.path if path: relative = self.is_subpath(self.test_root, path) assert relative path = relative if not path.startswith(self.sep): path = self.sep + path obj.path = path return obj # HELPERS def temp_name(self, name="tmp", *, folder=None): fname = self.join(folder or self.sep, os.urandom(16).hex() + "." + name) return fname def events_poll(self, timeout=None, until=None) -> Generator[Event, None, None]: if timeout is None: timeout = self.event_timeout if timeout == 0: yield from self.events() return for _ in time_helper(timeout, sleep=self.event_sleep, multiply=2): got = False for e in self.events(): yield e got = True if not until and got: break elif until and until(): break def __cleanup(self, oid): try: for info in self.prov.listdir(oid): if info.otype == FILE: log.debug("cleaning %s", info) self.delete(info.oid) else: self.__cleanup(info.oid) log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileNotFoundError: pass def test_cleanup(self, timeout=None, until=None): info = self.prov.info_path(self.test_root) self.__cleanup(info.oid) info = self.prov.info_path(self.test_root) if info: try: log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileExistsError: # deleting the root might now be supported pass def prime_events(self): try: for e in self.events_poll(timeout=1): log.debug("ignored event %s", e) except TimeoutError: pass @property def current_cursor(self): return self.prov.current_cursor @current_cursor.setter def current_cursor(self, val): self.prov.current_cursor = val def mixin_provider(prov): assert prov assert isinstance(prov, Provider) prov = ProviderHelper(prov) # type: ignore yield prov prov.test_cleanup() @pytest.fixture def provider_params(): return None class ProviderConfig: def __init__(self, name, param=(), param_id=None): if param_id is None: param_id = name self.name = name if name == "mock": assert param self.param = param self.param_id = param_id def __repr__(self): return "%s(%s)" % (type(self), self.__dict__) @pytest.fixture def config_provider(request, provider_config): # try: # request.raiseerror("foo") # except Exception as e: # FixtureLookupError = type(e) if provider_config.name == "external": # if there's a fixture available, use it return request.getfixturevalue("cloudsync_provider") # deferring imports to prevent needing deps we don't want to require for everyone elif provider_config.name == "mock": return mock_provider_instance(*provider_config.param) elif provider_config.name == "gdrive": from .providers.gdrive import gdrive_provider return gdrive_provider() elif provider_config.name == "dropbox": from .providers.dropbox import dropbox_provider return dropbox_provider() else: assert False, "Must provide a valid --provider name or use the -p " known_providers = ('gdrive', 'external', 'dropbox', 'mock') def configs_from_name(name): provs: List[ProviderConfig] = [] if name == "mock": provs += [ProviderConfig("mock", (False, True), "mock_oid_cs")] provs += [ProviderConfig("mock", (True, True), "mock_path_cs")] else: provs += [ProviderConfig(name)] return provs def configs_from_keyword(kw): provs: List[ProviderConfig] = [] # crappy approximation of pytest evaluation routine, because false = {} for known_prov in known_providers: false[known_prov] = False ok: Union[bool, List[bool]] for known_prov in known_providers: if known_prov == kw or '[' + known_prov + ']' == kw: ok = True else: ids = false.copy() ids[known_prov] = True try: ok = eval(kw, {}, ids) except NameError: ok = False except Exception as e: log.error("%s %s", type(e), e) ok = False if type(ok) is list: ok = any(cast(List[bool], ok)) if ok: provs += configs_from_name(known_prov) return provs _registered = False def pytest_generate_tests(metafunc): global _registered if not _registered: for known_prov in known_providers: metafunc.config.addinivalue_line( "markers", known_prov ) _registered = True if "provider_config" in metafunc.fixturenames: provs: List[ProviderConfig] = [] for e in metafunc.config.getoption("provider", []): for n in e.split(","): provs += configs_from_name(n) if not provs: kw = metafunc.config.getoption("keyword", "") if kw: provs += configs_from_keyword(kw) if not provs: provs += configs_from_name("mock") ids = [p.param_id for p in provs] marks = [pytest.param(p, marks=[getattr(pytest.mark, p.name)]) for p in provs] metafunc.parametrize("provider_config", marks, ids=ids) @pytest.fixture(name="provider") def provider_fixture(config_provider): yield from mixin_provider(config_provider) def test_join(mock_provider): assert "/a/b/c" == mock_provider.join("a", "b", "c") assert "/a/c" == mock_provider.join("a", None, "c") assert "/a/b/c" == mock_provider.join("/a", "/b", "/c") assert "/a/c" == mock_provider.join("a", "/", "c") def test_connect(provider): assert provider.connected provider.disconnect() assert not provider.connected # todo: maybe assert provider.creds here... because creds should probably be a fcs of provider provider.reconnect() assert provider.connected def test_create_upload_download(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) info2 = provider.upload(info1.oid, data()) assert info1.hash assert info2.hash assert info1.oid == info2.oid assert info1.hash == info2.hash assert provider.exists_path(dest) dest = BytesIO() provider.download(info2.oid, dest) dest.seek(0) assert dest.getvalue() == dat def test_rename(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) dest2 = provider.temp_name("dest2") provider.rename(info1.oid, dest2) assert provider.exists_path(dest2) assert not provider.exists_path(dest) # test that renaming a folder renames the children folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() file_name = os.urandom(16).hex() file_path1 = provider.join(folder_name1, file_name) file_path2 = provider.join(folder_name2, file_name) sub_folder_name = os.urandom(16).hex() sub_folder_path1 = provider.join(folder_name1, sub_folder_name) sub_folder_path2 = provider.join(folder_name2, sub_folder_name) sub_file_name = os.urandom(16).hex() sub_file_path1 = provider.join(sub_folder_path1, sub_file_name) sub_file_path2 = provider.join(sub_folder_path2, sub_file_name) folder_oid = provider.mkdir(folder_name1) sub_folder_oid = provider.mkdir(sub_folder_path1) file_info = provider.create(file_path1, data()) sub_file_info = provider.create(sub_file_path1, data()) assert provider.exists_path(file_path1) assert not provider.exists_path(file_path2) assert provider.exists_path(sub_file_path1) assert not provider.exists_path(sub_file_path2) assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) new_oid = provider.rename(folder_oid, folder_name2) assert provider.exists_path(file_path2) assert not provider.exists_path(file_path1) assert provider.exists_path(sub_file_path2) assert not provider.exists_path(sub_file_path1) assert provider.exists_oid(new_oid) if not provider.oid_is_path: assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) assert provider.info_oid(file_info.oid).path == file_path2 assert provider.info_oid(sub_file_info.oid).path == sub_file_path2 else: assert not provider.exists_oid(file_info.oid) assert not provider.exists_oid(sub_file_info.oid) # move to sub dest = provider.temp_name("movy") sub_file_name = os.urandom(16).hex() sub_file_path3 = provider.join(sub_folder_path2, sub_file_name) info1 = provider.create(dest, data()) provider.rename(info1.oid, sub_file_path3) def test_mkdir(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") provider.mkdir(dest) info = provider.info_path(dest) assert info.otype == cloudsync.DIRECTORY sub_f = provider.temp_name("dest", folder=dest) log.debug("parent = %s, sub = %s", dest, sub_f) with pytest.raises(CloudFileExistsError): provider.create(dest, data(), None) assert provider.exists_path(dest) log.debug("folder %s exists", dest) provider.create(sub_f, data(), None) def test_walk(provider): temp = BytesIO(os.urandom(32)) folder = provider.temp_name("folder") provider.mkdir(folder) subfolder = provider.join(folder, provider.temp_name("subfolder")) provider.mkdir(subfolder) dest0 = provider.temp_name("dest0") dest1 = provider.join(folder, provider.temp_name("dest1")) dest2 = provider.join(subfolder, provider.temp_name("dest2")) oids = {} info = provider.create(dest0, temp, None) oids[dest0] = info.oid info = provider.create(dest1, temp, None) oids[dest1] = info.oid info = provider.create(dest2, temp, None) oids[dest2] = info.oid got_event = False found = {} for e in provider.walk("/"): if e.otype == cloudsync.DIRECTORY: continue log.debug("WALK %s", e) path = e.path if path is None: path = provider.info_oid(e.oid).path assert oids[path] == e.oid found[path] = True assert e.mtime assert e.exists got_event = True for x in [dest0, dest1, dest2]: assert found.get(x, False) is True log.debug("found %s", x) assert got_event def check_event_path(event: Event, provider, target_path): # confirms that the path in the event matches the target_path # if the provider doesn't provide the path in the event, look it up by the oid in the event # if we can't get the path, that's OK if the file doesn't exist event_path = event.path if event_path is None: try: event_path = provider.info_oid(event.oid).path assert event_path == target_path except CloudFileNotFoundError: if event.exists: raise ## event tests use "prime events" to discard unrelated events, and ensure that the cursor is "ready" def test_event_basic(provider): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") provider.prime_events() log.debug("create event") info1 = provider.create(dest, temp, None) assert info1 is not None # TODO: check info1 for more things received_event = None event_count = 0 done = False for e in provider.events_poll(until=lambda: done): log.debug("got event %s", e) # you might get events for the root folder here or other setup stuff if e.exists: if not e.path: info = provider.info_oid(e.oid) if info: e.path = info.path if e.path == dest: received_event = e event_count += 1 done = True for e in provider.events(): if not e.path: info = provider.info_oid(e.oid) if info: e.path = info.path if e.path == dest: event_count += 1 assert event_count == 1 assert received_event is not None assert received_event.oid path = received_event.path if path is None: path = provider.info_oid(received_event.oid).path assert path == dest assert received_event.mtime assert received_event.exists deleted_oid = received_event.oid log.debug("delete event") provider.delete(oid=deleted_oid) provider.delete(oid=deleted_oid) # Tests that deleting a non-existing file does not raise a FNFE received_event = None for e in provider.events_poll(until=lambda: received_event is not None): log.debug("event %s", e) if e.exists and e.path is None: info2 = provider.info_oid(e.oid) if info2: e.path = info2.path else: e.exists = False # assert not e.exists or e.path is not None # This is actually OK, google will do this legitimately assert e.otype is not None log.debug("event %s", e) if (not e.exists and e.oid == deleted_oid) or (e.path and path in e.path): received_event = e assert received_event is not None assert received_event.oid assert not received_event.exists if received_event.path is not None: assert received_event.path == dest assert received_event.oid == deleted_oid assert received_event.mtime def test_event_del_create(provider): temp = BytesIO(os.urandom(32)) temp2 = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") provider.prime_events() info1 = provider.create(dest, temp) provider.delete(info1.oid) info2 = provider.create(dest, temp2) last_event = None saw_first_delete = False saw_first_create = False disordered = False done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path # always possible to get events for other things if not (path == dest or e.oid == info1.oid): continue last_event = e if e.oid == info1.oid: if e.exists: saw_first_create = True if saw_first_delete and not provider.oid_is_path: log.debug("disordered!") disordered = True else: saw_first_delete = True if e.exists and e.oid == info2.oid: if provider.oid_is_path: if saw_first_delete and saw_first_create: done = True else: done = True # the important thing is that we always get a create after the delete event assert last_event, "Event loop timed out before getting any events" assert done, "Event loop timed out after the delete, but before the create, " \ "saw_first_delete=%s, saw_first_create=%s, disordered=%s" % (saw_first_delete, saw_first_create, disordered) assert last_event.exists is True # The provider may compress out the first create, or compress out the first create and delete, or deliver both # So, if we saw the first create, make sure we got the delete. If we didn't see the first create, # it doesn't matter if we saw the first delete. if saw_first_create: assert saw_first_delete assert not disordered def test_event_rename(provider): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") dest2 = provider.temp_name("dest") dest3 = provider.temp_name("dest") provider.prime_events() info1 = provider.create(dest, temp) oid2 = provider.rename(info1.oid, dest2) if provider.oid_is_path: info1.oid = provider.info_path(dest2).oid oid3 = provider.rename(info1.oid, dest3) if provider.oid_is_path: info1.oid = provider.info_path(dest3).oid seen = set() last_event = None second_to_last = None done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): if provider.oid_is_path: assert e.path log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path last_event = e seen.add(e.oid) if provider.oid_is_path: # 2 and 3 are in order if path == dest2: second_to_last = True if path == dest3 and (second_to_last or not provider.oid_is_path): done = True else: done = info1.oid in seen if provider.oid_is_path: # providers with path based oids need to send intermediate renames accurately and in order assert len(seen) > 2 assert last_event.path == dest3 assert last_event.prior_oid == oid2 else: # oid based providers just need to let us know something happend to that oid assert info1.oid in seen def test_api_failure(provider): # assert that the cloud # a) uses an api function # b) does not trap CloudTemporaryError's def side_effect(*a, **k): raise CloudTemporaryError("fake disconnect") with patch.object(provider, "_api", side_effect=side_effect): with patch.object(provider, "api_retry", False): with pytest.raises(CloudTemporaryError): provider.exists_path("/notexists") def test_file_not_found(provider): # Test that operations on nonexistent file system objects raise CloudFileNotFoundError # when appropriate, and don't when inappropriate dat = os.urandom(32) def data(): return BytesIO(dat) test_file_deleted_path = provider.temp_name("dest1") # Created, then deleted test_file_deleted_info = provider.create(test_file_deleted_path, data(), None) test_file_deleted_oid = test_file_deleted_info.oid provider.delete(test_file_deleted_oid) test_folder_deleted_path = provider.temp_name("dest1") # Created, then deleted test_folder_deleted_oid = provider.mkdir(test_folder_deleted_path) provider.delete(test_folder_deleted_oid) test_path_made_up = provider.temp_name("dest2") # Never created test_oid_made_up = "never created" # TODO: consider mocking info_path to always return None, and then call all the provider methods # to see if they are handling the None, and not raising exceptions other than FNF # Tests: # exists_path # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_path(test_file_deleted_path) is False assert provider.exists_path(test_folder_deleted_path) is False assert provider.exists_path(test_path_made_up) is False # exists_oid # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_oid(test_folder_deleted_oid) is False assert provider.exists_oid(test_oid_made_up) is False # info_path # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_path(test_file_deleted_path) is None assert provider.info_path(test_folder_deleted_path) is None assert provider.info_path(test_path_made_up) is None # info_oid # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_oid(test_file_deleted_oid) is None assert provider.info_oid(test_folder_deleted_oid) is None assert provider.info_oid(test_oid_made_up) is None # hash_oid # deleted file returns None # never existed file returns None # if getattr(provider, "hash_oid", False): # TODO implement hash_oid in gdrive, then don't have this be conditional # assert provider.hash_oid(test_file_deleted_oid) is None # assert provider.hash_oid(test_oid_made_up) is None # upload # to a deleted file raises FNF, or untrashes the file, either is OK # to a made up oid raises FNF # TODO: uploading to a deleted file might not raise an FNF, it might just untrash the file assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_path(test_file_deleted_path) is False try: info = provider.upload(test_file_deleted_oid, data(), None) # This succeeded so the file must exist now, at the same oid as before assert info.oid == test_file_deleted_oid assert provider.exists_path(test_file_deleted_path) is True assert provider.exists_oid(test_file_deleted_oid) is True re_delete = True except CloudFileNotFoundError: re_delete = False pass if re_delete: provider.delete(test_file_deleted_oid) with pytest.raises(CloudFileNotFoundError): provider.upload(test_oid_made_up, data(), None) # create # to a non-existent folder, raises FNF # to a previously deleted folder, raises FNF with pytest.raises(CloudFileNotFoundError): provider.create(test_path_made_up + "/junk", data(), None) with pytest.raises(CloudFileNotFoundError): provider.create(test_folder_deleted_path + "/junk", data(), None) # upload: to the OID of a deleted folder, raises FNFE with pytest.raises(CloudFileNotFoundError): provider.upload(test_folder_deleted_oid, data(), None) # download # on a deleted oid raises FNF # on a made up oid raises FNF with pytest.raises(CloudFileNotFoundError): provider.download(test_file_deleted_oid, data()) with pytest.raises(CloudFileNotFoundError): provider.download(test_oid_made_up, data()) # rename # from a deleted oid raises FNF # from a made up oid raises FNF # to a non-existent folder raises [something], conditionally # to a previously deleted folder raises # check the rename source to see if there are others with pytest.raises(CloudFileNotFoundError): provider.rename(test_file_deleted_oid, test_file_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_folder_deleted_oid, test_folder_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_oid_made_up, test_path_made_up) # mkdir # to a non-existent folder raises FNF # to a previously deleted folder as parent folder raises FNF # to a previously deleted file as parent folder raises FNF with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_path_made_up + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_folder_deleted_path + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_file_deleted_path + "/junk") # delete # on a deleted file oid does not raise # on a deleted folder oid does not raise # on a made up oid does not raise provider.delete(test_file_deleted_oid) provider.delete(test_folder_deleted_oid) provider.delete(test_oid_made_up) # delete: create a file, delete it, then create a new file at that path, then re-delete the deleted oid, raises FNFE temp_path = provider.temp_name() info1 = provider.create(temp_path, BytesIO(b"Hello")) provider.delete(info1.oid) info2 = provider.create(temp_path, BytesIO(b"world")) if provider.oid_is_path: assert provider.exists_oid(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) else: assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) provider.delete(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) # listdir # on a deleted file raises FNF # on a deleted folder raises FNF # on a made up path raises FNF with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_file_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_folder_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_oid_made_up)) # TODO: Google drive raises FNF when it can't find the root... can we test for that here? def test_file_exists(provider): dat = os.urandom(32) def data(da=dat): return BytesIO(da) def create_and_delete_file(): create_and_delete_file_name = provider.temp_name() file_info = provider.create(create_and_delete_file_name, data(), None) provider.delete(file_info.oid) return create_and_delete_file_name, file_info.oid def create_and_delete_folder(): create_and_delete_folder_name = provider.temp_name() create_and_delete_folder_oid = provider.mkdir(create_and_delete_folder_name) provider.delete(create_and_delete_folder_oid) return create_and_delete_folder_name, create_and_delete_folder_oid def create_and_rename_file(): file_name1 = provider.temp_name() file_name2 = provider.temp_name() assert file_name1 != file_name2 file_info1 = provider.create(file_name1, data(), None) provider.rename(file_info1.oid, file_name2) return file_name1, file_info1.oid def create_and_rename_folder(): folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() assert folder_name1 != folder_name2 folder_oid1 = provider.mkdir(folder_name1) provider.rename(folder_oid1, folder_name2) return folder_name1, folder_oid1 def create_file(create_file_name=None): if create_file_name is None: create_file_name = provider.temp_name() file_info = provider.create(create_file_name, data(), None) return create_file_name, file_info.oid def create_folder(create_folder_name=None): if create_folder_name is None: create_folder_name = provider.temp_name() create_folder_oid = provider.mkdir(create_folder_name) return create_folder_name, create_folder_oid # Test that operations on existent file system objects raise CloudExistsError # when appropriate, and don't when inappropriate # api methods to check for FileExists: # vulnerable to existing paths: # mkdir, create, rename # Possible issues to potentially check each of the vulnerable api methods: # target path has a component in the parent folder that already exists as a file # target path exists # target path exists, but the type of the existing object at that location is different from expected # target path exists, but the type of the existing object at that location is what was expected # target path existed, but was deleted, different type as source # target path existed, but was deleted, same type as source # target path existed, but was renamed, different type as source # target path existed, but was renamed, same type as source # # vulnerable to existing OIDs: # upload, delete # Possible issues to potentially check each of the vulnerable api methods: # target OID exists, but the type of the existing object at that location is different from expected # target OID existed, but was trashed, should un-trash the object # target OID is a non-empty folder, delete should raise FEx # # The enumerated tests: # mkdir: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name + "/junk") # mkdir: where target path exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name) # mkdir: where target path exists as a folder, does not raise name1, oid1 = create_folder() oid2 = provider.mkdir(name1) assert oid1 == oid2 # mkdir: creating a file, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: creating a folder, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as file, but was renamed name1, oid1 = create_and_rename_file() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as folder, but was renamed name1, oid1 = create_and_rename_folder() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # upload: where target OID is a folder, raises FEx _, oid = create_folder() with pytest.raises(CloudFileExistsError): provider.upload(oid, data(), None) # delete: a non-empty folder, raises FEx name1, oid1 = create_folder() create_file(name1 + "/junk") with pytest.raises(CloudFileExistsError): provider.delete(oid1) # create: where target path exists, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name) def get_contents(oid): temp_contents = BytesIO() provider.download(oid, temp_contents) temp_contents.seek(0) return temp_contents.getvalue() # create: creating a file, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() _, oid2 = create_file(name1) assert oid1 != oid2 or provider.oid_is_path if provider.oid_is_path: assert provider.exists_oid(oid1) else: assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) # piggyback test -- uploading to the deleted oid should not step on the file that replaced it at that path if not provider.oid_is_path: try: new_contents = b"yo" # bytes(os.urandom(16).hex(), 'utf-8') new_info = provider.upload(oid1, BytesIO(new_contents)) assert new_info.oid != oid1 assert new_info.oid != oid2 assert not provider.exists_oid(oid1) contents2 = get_contents(oid2) assert contents2 == new_contents contents1 = get_contents(oid1) assert contents1 == new_contents except CloudFileNotFoundError: pass # create: creating a folder, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() _, oid2 = create_file(name1) assert oid1 != oid or provider.oid_is_path # create: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name + "/junk") # create: target path existed as folder, but was renamed name, _ = create_and_rename_folder() create_file(name) # create: target path existed as file, but was renamed name, _ = create_and_rename_file() create_file(name) # rename: rename folder over empty folder succeeds name1, oid1 = create_folder() create_file(name1 + "/junk") name2, oid2 = create_folder() assert oid1 != oid2 contents1 = [x.name for x in provider.listdir(oid1)] provider.rename(oid1, name2) if provider.oid_is_path: log.debug("oid1 %s, oid2 %s", oid1, oid2) assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid2)] else: assert provider.exists_oid(oid1) assert not provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid1)] assert contents1 == contents2 # rename: rename folder over non-empty folder raises FEx _, oid1 = create_folder() name2, oid2 = create_folder() assert oid1 != oid2 create_file(name2 + "/junk") with pytest.raises(CloudFileExistsError): provider.rename(oid1, name2) # rename: target has a parent folder that already exists as a file, raises FEx folder_name, _ = create_file() # notice that I am creating a file, and calling it a folder with pytest.raises(CloudFileExistsError): create_file(folder_name + "/junk") # rename: renaming file over empty folder, raises FEx folder_name, folder_oid = create_folder() file_name, file_oid = create_file() other_file_name, other_file_oid = create_file() with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # rename: renaming file over non-empty folder, raises FEx create_file(folder_name + "/test") with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(folder_oid, file_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(file_oid, other_file_name) # reuse the same file and folder from the last test # rename: create a file, delete it, then rename a file to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_file() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename file to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_file() provider.rename(oid2, deleted_folder_name) # rename: create a file, delete it, then rename a folder to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_folder() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename folder to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_folder() provider.rename(oid2, deleted_folder_name) # rename: target folder path existed, but was renamed away, folder type as source name1, oid1 = create_and_rename_folder() name2, oid2 = create_folder() provider.rename(oid2, name1) # rename: target folder path existed, but was renamed away, file type as source name1, oid1 = create_folder() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, folder type as source name1, oid1 = create_file() name2, oid2 = create_folder() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, file type as source name1, oid1 = create_file() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) def test_cursor(provider): # get the ball rolling provider.create("/file1", BytesIO(b"hello")) for i in provider.events(): log.debug("event = %s", i) current_csr1 = provider.current_cursor # do something to create an event info = provider.create("/file2", BytesIO(b"there")) log.debug(f"current={provider.current_cursor} latest={provider.latest_cursor}") found = False for e in provider.events_poll(timeout=600, until=lambda: found): log.debug("event = %s", e) if e.oid == info.oid: found = True assert found current_csr2 = provider.current_cursor log.debug(f"current={provider.current_cursor} latest={provider.latest_cursor}") assert current_csr1 != current_csr2 # check that we can go backwards provider.current_cursor = current_csr1 log.debug(f"current={provider.current_cursor} latest={provider.latest_cursor}") found = False for i in provider.events_poll(timeout=10, until=lambda: found): log.debug("event = %s", i) if i.oid == info.oid: found = True assert found # TODO: test that renaming A over B replaces B's OID with A's OID, and B's OID is trashed def test_listdir(provider): outer = provider.temp_name() root = provider.dirname(outer) temp_name = provider.is_subpath(root, outer) outer_oid_rm = provider.mkdir(outer) assert [] == list(provider.listdir(outer_oid_rm)) provider.delete(outer_oid_rm) outer_oid = provider.mkdir(outer) assert provider.exists_path(outer) assert provider.exists_oid(outer_oid) inner = outer + temp_name inner_oid = provider.mkdir(inner) assert provider.exists_oid(inner_oid) provider.create(outer + "/file1", BytesIO(b"hello")) provider.create(outer + "/file2", BytesIO(b"there")) provider.create(inner + "/file3", BytesIO(b"world")) contents = [x.name for x in provider.listdir(outer_oid)] assert len(contents) == 3 expected = ["file1", "file2", temp_name[1:]] assert sorted(contents) == sorted(expected) def test_upload_to_a_path(provider): temp_name = provider.temp_name() info = provider.create(temp_name, BytesIO(b"test")) assert info.hash # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): info = provider.upload(temp_name, BytesIO(b"test2")) def test_upload_zero_bytes(provider): temp_name = provider.temp_name() info = provider.create(temp_name, BytesIO(b"")) info2 = provider.upload(info.oid, BytesIO(b"")) dest = BytesIO() provider.download(info.oid, dest) assert info assert info2 assert info.hash == info2.hash def test_delete_doesnt_cross_oids(provider): temp_name = provider.temp_name() info1 = provider.create(temp_name, BytesIO(b"test1")) provider.delete(info1.oid) info2 = provider.create(temp_name, BytesIO(b"test2")) if not provider.oid_is_path: assert info1.oid != info2.oid assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) if not provider.oid_is_path: provider.delete(info1.oid) assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) @pytest.mark.parametrize("otype", ["file", "folder"]) def test_rename_case_change(provider, otype): temp_namel = provider.temp_name().lower() temp_nameu = temp_namel.upper() if otype == "file": infol = provider.create(temp_namel, BytesIO(b"test")) else: l_oid = provider.mkdir(temp_namel) infol = provider.info_oid(l_oid) assert infol.path == temp_namel new_oid = provider.rename(infol.oid, temp_nameu) assert new_oid infou = provider.info_oid(new_oid) assert infou.path == temp_nameu infopu = provider.info_path(temp_nameu) infopl = provider.info_path(temp_namel) assert infopu assert infopu.path == temp_nameu if provider.case_sensitive: assert not infopl else: assert infopl assert infopl.path == temp_nameu def test_quota_limit(mock_provider): mock_provider.set_quota(1024) mock_provider.create("/foo", BytesIO(b'0' * 1024)) with pytest.raises(CloudOutOfSpaceError): mock_provider.create("/bar", BytesIO(b'0' * 2)) assert not mock_provider.info_path("/bar") def test_special_characters(provider): fname = "" for i in range(32, 127): char = str(chr(i)) if char in (provider.sep, provider.alt_sep): continue if char in """<>:"/\\|?*""": continue fname = fname + str(chr(i)) fname = "/fn-" + fname log.debug("fname = %s", fname) contents = b"hello world" info = provider.create(fname, BytesIO(contents)) log.debug("info = %s", info) info2 = provider.info_path(fname) assert info2.oid == info.oid catch = BytesIO() provider.download(info.oid, catch) assert catch.getvalue() == contents # also test rename, mkdir, info_path, exists_path fname2 = fname + ".2" log.debug("fname2 = %s", fname2) new_oid = provider.rename(info.oid, fname2) info3 = provider.info_oid(new_oid) assert provider.exists_path(fname2) assert provider.info_path(fname2).oid == new_oid dirname = fname + ".dir" diroid = provider.mkdir(dirname) dirinfo = provider.info_path(dirname) assert dirinfo.otype == cloudsync.DIRECTORY assert dirinfo.oid == diroid newfname2 = provider.join(dirname, fname2) new_oid2 = provider.rename(new_oid, newfname2) newfname2info = provider.info_path(newfname2) assert newfname2info.oid == new_oid2 PKL9Otcloudsync/tests/test_state.pyimport logging from io import BytesIO from cloudsync import SyncState, LOCAL, REMOTE, FILE, DIRECTORY log = logging.getLogger(__name__) def test_state_basic(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_state_rename(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_state_rename2(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, "123") state.update(LOCAL, FILE, path="foo2", oid="456", prior_oid="123") assert state.lookup_path(LOCAL, path="foo2") assert state.lookup_oid(LOCAL, "456") assert not state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="456") assert not state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_state_rename3(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) ahash = "ah" bhash = "bh" state.update(LOCAL, FILE, path="a", oid="a", hash=ahash) state.update(LOCAL, FILE, path="b", oid="b", hash=bhash) infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") assert infoa[LOCAL].hash == ahash assert infob[LOCAL].hash == bhash # rename in a circle state.update(LOCAL, FILE, path="c", oid="c", prior_oid="a") log.debug("TABLE 0:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="a", oid="a", prior_oid="b") log.debug("TABLE 1:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="b", oid="b", prior_oid="c") log.debug("TABLE 2:\n%s", state.pretty_print(use_sigs=False)) assert state.lookup_path(LOCAL, "a") assert state.lookup_path(LOCAL, "b") infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") # hashes should be flipped assert infoa[LOCAL].hash == bhash assert infob[LOCAL].hash == ahash state.assert_index_is_correct() def test_state_multi(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_state_kids(mock_provider): # annoyingly, the state manager now interacts with the provider # this means that the state manager needs to know how to get an oid # TODO: make a layer that knows about providers and state, and ANOTHER layer that just knows about state # that way we can go back to have a pure state/storage manager providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, DIRECTORY, path="/dir", oid="123") assert state.lookup_path(LOCAL, path="/dir") state.update(LOCAL, FILE, path="/dir/foo", oid="124") assert state.lookup_path(LOCAL, path="/dir/foo") new_oid = mock_provider.mkdir("/dir2") mock_provider.create("/dir2/foo", BytesIO(b'hi')) state.update(LOCAL, DIRECTORY, path="/dir2", oid=new_oid, prior_oid="123") log.debug("TABLE:\n%s", state.pretty_print(use_sigs=False)) state.assert_index_is_correct() assert len(state) == 2 assert state.lookup_path(LOCAL, "/dir2/foo") def test_state_split(mock_provider): # annoyingly, the state manager now interacts with the provider # this means that the state manager needs to know how to get an oid # TODO: make a layer that knows about providers and state, and ANOTHER layer that just knows about state # that way we can go back to have a pure state/storage manager providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, DIRECTORY, path="/dir", oid="123") ent = state.lookup_oid(LOCAL, "123") # oid/path updated ent[REMOTE].oid = "999" ent[REMOTE].path = "/rem" assert state.lookup_oid(LOCAL, "123") assert state.lookup_path(LOCAL, "/dir") assert state.lookup_path(REMOTE, "/rem") assert state.lookup_oid(REMOTE, "999") (defer, _ds, repl, _rs) = state.split(ent) assert state.lookup_oid(LOCAL, "123") is repl assert state.lookup_path(LOCAL, "/dir") assert state.lookup_path(REMOTE, "/rem") assert state.lookup_oid(REMOTE, "999") is defer state.assert_index_is_correct() def test_state_alter_oid(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers, shuffle=True) state.update(LOCAL, FILE, path="123", oid="123", hash=b"123") ent1 = state.lookup_oid(LOCAL, "123") assert ent1 in state.changes state.update(LOCAL, FILE, path="456", oid="456", hash=b"456") ent2 = state.lookup_oid(LOCAL, "456") assert ent2 in state.changes ent1[LOCAL].oid = "456" assert state.lookup_oid(LOCAL, "456") is ent1 assert state.lookup_oid(LOCAL, "123") is None assert ent2 not in state.get_all() ent1[LOCAL].oid = "456" ent2[LOCAL].oid = "123" assert state.lookup_oid(LOCAL, "123") is ent2 assert ent2 in state.changes PKL9Otemp, c->a, b->c, temp->b log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=templ, oid=tmp1oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp1, oid=lp1oid, hash=linfo3.hash, prior_oid=linfo3.oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp3, oid=lp3oid, hash=linfo2.hash, prior_oid=linfo2.oid) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp2, oid=lp2oid, hash=linfo1.hash, prior_oid=tmp1oid) log.debug("TABLE 4:\n%s", sync.state.pretty_print()) assert len(sync.state.get_all()) == 3 sync.providers[REMOTE].log_debug_state("MIDDLE") # type: ignore sync.run(until=lambda: not sync.state.changeset_len, timeout=1) sync.providers[REMOTE].log_debug_state("AFTER") # type: ignore i1 = sync.providers[REMOTE].info_path(rp1) i2 = sync.providers[REMOTE].info_path(rp2) i3 = sync.providers[REMOTE].info_path(rp3) ldir = sorted([x.name for x in sync.providers[LOCAL].listdir(l_parent_oid)]) rdir = sorted([x.name for x in sync.providers[REMOTE].listdir(r_parent_oid)]) log.debug("ldir=%s", ldir) log.debug("rdir=%s", rdir) log.debug("path %s has info %s", rp1, i1) log.debug("path %s has info %s", rp2, i2) log.debug("path %s has info %s", rp3, i3) assert i1 assert i2 assert i3 assert i1.hash == rinfo3.hash assert i2.hash == rinfo1.hash assert i3.hash == rinfo2.hash def test_sync_conflict_path_combine(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path3 = "/local/stuff" remote_path3 = "/remote/stuff" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo2 = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo1.oid, hash=linfo1.hash) sync.change_state(REMOTE, FILE, path=remote_path2, oid=rinfo2.oid, hash=rinfo2.hash) sync.run_until_found((REMOTE, remote_path1), (LOCAL, local_path2)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) new_oid1 = sync.providers[LOCAL].rename(linfo1.oid, local_path3) prior_oid = sync.providers[LOCAL].oid_is_path and linfo1.oid sync.change_state(LOCAL, FILE, path=local_path3, oid=new_oid1, prior_oid=prior_oid) new_oid2 = sync.providers[REMOTE].rename(rinfo2.oid, remote_path3) prior_oid = sync.providers[REMOTE].oid_is_path and rinfo2.oid sync.change_state(REMOTE, FILE, path=remote_path3, oid=new_oid2, prior_oid=prior_oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) ok = lambda: ( sync.providers[REMOTE].exists_path("/remote/stuff.conflicted") or sync.providers[LOCAL].exists_path("/local/stuff.conflicted") ) sync.run(until=ok, timeout=3) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert ok() def test_create_then_move(sync): # TODO: combine with the reverse test remote_parent = "/remote" local_parent = "/local" local_folder = "/local/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_file1, BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=local_file1, oid=linfo1.oid, hash=linfo1.hash) sync.run_until_found((REMOTE, remote_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[LOCAL].mkdir(local_folder) sync.change_state(LOCAL, DIRECTORY, path=local_folder, oid=folder_oid, hash=None) new_oid = sync.providers[LOCAL].rename(linfo1.oid, local_file2) sync.change_state(LOCAL, FILE, path=local_file2, oid=new_oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((REMOTE, remote_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def test_create_then_move_reverse(sync): # TODO: see if this can be combined with the reverse test remote_parent = "/remote" local_parent = "/local" remote_folder = "/remote/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" oid = sync.providers[LOCAL].mkdir(local_parent) oid = sync.providers[REMOTE].mkdir(remote_parent) rinfo1 = sync.providers[REMOTE].create(remote_file1, BytesIO(b"hello")) sync.change_state(REMOTE, FILE, path=remote_file1, oid=rinfo1.oid, hash=rinfo1.hash) sync.run_until_found((LOCAL, local_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[REMOTE].mkdir(remote_folder) sync.change_state(REMOTE, DIRECTORY, path=remote_folder, oid=folder_oid, hash=None) sync.providers[REMOTE].rename(rinfo1.oid, remote_file2) sync.change_state(REMOTE, FILE, path=remote_file2, oid=rinfo1.oid, hash=rinfo1.hash) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((LOCAL, local_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def _test_rename_folder_with_kids(sync, source, dest): parent = ["/local", "/remote"] folder1 = ["/local/folder1", "/remote/folder1"] file1 = ["/local/folder1/file", "/remote/folder1/file"] folder2 = ["/local/folder2", "/remote/folder2"] file2 = ["/local/folder2/file", "/remote/folder2/file"] for loc in (source, dest): sync.providers[loc].mkdir(parent[loc]) folder_oid = sync.providers[source].mkdir(folder1[source]) sync.change_state(source, DIRECTORY, path=folder1[source], oid=folder_oid, hash=None) file_info: OInfo = sync.providers[source].create(file1[source], BytesIO(b"hello")) sync.change_state(source, FILE, path=file1[source], oid=file_info.oid, hash=None) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.run_until_found((dest, folder1[dest])) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) new_oid = sync.providers[source].rename(folder_oid, folder2[source]) sync.change_state(source, DIRECTORY, path=folder2[source], oid=new_oid, hash=None, prior_oid=folder_oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.run_until_found( (source, file2[source]), (dest, file2[dest]) ) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) @pytest.mark.parametrize("ordering", [(LOCAL, REMOTE), (REMOTE, LOCAL)]) def test_rename_folder_with_kids(sync_sh, ordering): _test_rename_folder_with_kids(sync_sh, *ordering) def test_aging(sync): local_parent = "/local" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/file2" remote_file2 = "/remote/file2" sync.providers[LOCAL].mkdir(local_parent) linfo1 = sync.providers[LOCAL].create(local_file1, BytesIO(b"hello")) # aging slows things down sync.aging = 0.2 sync.change_state(LOCAL, FILE, path=local_file1, oid=linfo1.oid, hash=linfo1.hash) sync.do() sync.do() sync.do() log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert not sync.providers[REMOTE].info_path(local_file1) sync.run_until_found((REMOTE, remote_file1), timeout=2) assert sync.providers[REMOTE].info_path(remote_file1) sync.aging = 0 linfo2 = sync.providers[LOCAL].create(local_file2, BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=local_file2, oid=linfo2.oid, hash=linfo2.hash) sync.do() sync.do() sync.do() log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert sync.providers[REMOTE].info_path(remote_file2) # but withotu it, things are fast def test_remove_folder_with_kids(sync_sh): sync = sync_sh parent = ["/local", "/remote"] folder1 = ["/local/folder1", "/remote/folder1"] file1 = ["/local/folder1/file", "/remote/folder1/file"] for loc in (LOCAL, REMOTE): sync.providers[loc].mkdir(parent[loc]) folder_oid = sync.providers[LOCAL].mkdir(folder1[LOCAL]) sync.change_state(LOCAL, DIRECTORY, path=folder1[LOCAL], oid=folder_oid, hash=None) file_info: OInfo = sync.providers[LOCAL].create(file1[LOCAL], BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=file1[LOCAL], oid=file_info.oid, hash=None) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.run_until_found((REMOTE, file1[REMOTE])) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.providers[LOCAL].delete(file_info.oid) sync.providers[LOCAL].delete(folder_oid) sync.change_state(LOCAL, DIRECTORY, oid=file_info.oid, exists=False) sync.change_state(LOCAL, DIRECTORY, oid=folder_oid, exists=False) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.run_until_found(WaitFor(REMOTE, file1[REMOTE], exists=False), WaitFor(REMOTE, folder1[REMOTE], exists=False)) def test_dir_rm(sync): remote_parent = "/remote" local_parent = "/local" local_dir = Provider.join(local_parent, "dir") remote_dir = Provider.join(remote_parent, "dir") local_file = Provider.join(local_dir, "file") remote_file = Provider.join(remote_dir, "file") lparent = sync.providers[LOCAL].mkdir(local_parent) rparent = sync.providers[REMOTE].mkdir(remote_parent) ldir = sync.providers[LOCAL].mkdir(local_dir) rdir = sync.providers[REMOTE].mkdir(remote_dir) lfile = sync.providers[LOCAL].create(local_file, BytesIO(b"hello")) sync.change_state(LOCAL, DIRECTORY, path=local_dir, oid=ldir) sync.change_state(LOCAL, FILE, path=local_file, oid=lfile.oid, hash=lfile.hash) sync.run_until_found((REMOTE, remote_file), (REMOTE, remote_dir)) rfile = sync.providers[REMOTE].info_path(remote_file) sync.providers[REMOTE].delete(rfile.oid) sync.providers[REMOTE].delete(rdir) # Directory delete - should punt because of children sync.aging = 0 sync.change_state(REMOTE, DIRECTORY, path=remote_dir, oid=rdir, exists=False) sync.do() assert len(list(sync.providers[LOCAL].listdir(ldir))) == 1 # Next action should be on deleted child (detected in above) sync.do() assert len(list(sync.providers[LOCAL].listdir(ldir))) == 0 # Now it should successfully rmdir sync.do() assert len(list(sync.providers[LOCAL].listdir(lparent))) == 0 sync.state.assert_index_is_correct() # TODO: test to confirm that a file that is both a rename and an update will be both renamed and updated # TODO: test to confirm that a sync with an updated path name that is different but matches the old name will be ignored (eg: a/b -> a\b) PKL9O'cloudsync/tests/test_utils.pyfrom cloudsync.utils import debug_args def test_debug_args(): res = debug_args([1,2,3]) assert res == [1,2,3] res = debug_args([1,2,3], {1:2}, True) assert res == [[1,2,3],{1:2}, True] res = debug_args({"k":b'0'*100}) assert res == {"k":b'0'*61 + b'...'} PKL9OMM$cloudsync/tests/fixtures/__init__.pyfrom .util import * from .mock_provider import * from .mock_storage import * PKL9O`G4FF)cloudsync/tests/fixtures/mock_provider.pyimport os import time import copy import logging from hashlib import md5 from typing import Dict, List, Any, Optional, Generator, Set import pytest from cloudsync.event import Event from cloudsync.provider import Provider from cloudsync.types import OInfo, OType, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError, CloudTokenError, \ CloudDisconnectedError, CloudCursorError, CloudOutOfSpaceError, CloudTemporaryError from cloudsync.utils import debug_sig log = logging.getLogger(__name__) class MockFSObject: # pylint: disable=too-few-public-methods FILE = 'mock file' DIR = 'mock dir' def __init__(self, path, object_type, oid_is_path, contents=None, mtime=None): # self.display_path = path # TODO: used for case insensitive file systems if contents is None and type == MockFSObject.FILE: contents = b"" self.path = path.rstrip("/") self.contents = contents self.oid = path if oid_is_path else str(id(self)) self.exists = True self.type = object_type if self.type == self.FILE: # none is not valid for empty file if self.contents is None: self.contents = b'' self.mtime = mtime or time.time() @property def otype(self): if self.type == self.FILE: return OType.FILE else: return OType.DIRECTORY def hash(self) -> Optional[bytes]: if self.type == self.DIR: return None return md5(self.contents).digest() def update(self): self.mtime = time.time() def copy(self): return copy.copy(self) def __repr__(self): return "MockFSObject: %s(%s) %s %s %s" % (self.path, len(self.contents or ""), self.oid, self.exists, self.type) class MockEvent: # pylint: disable=too-few-public-methods ACTION_CREATE = "provider create" ACTION_RENAME = "provider rename" ACTION_UPDATE = "provider modify" ACTION_DELETE = "provider delete" def __init__(self, action, target_object: MockFSObject, prior_oid=None): self._target_object = copy.copy(target_object) self._action = action self._prior_oid = prior_oid self._timestamp = time.time() def serialize(self): ret_val = {"action": self._action, "id": self._target_object.oid, "object type": self._target_object.type, "path": self._target_object.path, "mtime": self._target_object.mtime, "prior_oid": self._prior_oid, "trashed": not self._target_object.exists, } return ret_val class MockProvider(Provider): default_sleep = 0.01 connected = True name = "Mock" # TODO: normalize names to get rid of trailing slashes, etc. def __init__(self, oid_is_path: bool, case_sensitive: bool, quota: int = None): """Constructor for MockProvider :param oid_is_path: Act as a filesystem or other oid-is-path provider :param case_sensitive: Paths are case sensistive """ super().__init__() log.debug("mock mode: o:%s, c:%s", oid_is_path, case_sensitive) self.oid_is_path = oid_is_path self.case_sensitive = case_sensitive self._fs_by_path: Dict[str, MockFSObject] = {} self._fs_by_oid: Dict[str, MockFSObject] = {} self._events: List[MockEvent] = [] self._latest_cursor = -1 self._cursor = -1 self._quota = quota self.locked_for_test: Set[str] = set() self._total_size = 0 self._type_map = { MockFSObject.FILE: OType.FILE, MockFSObject.DIR: OType.DIRECTORY, } self.event_timeout = 1 self.event_sleep = 0.001 self.creds = {"key": "val"} self.connection_id = os.urandom(2).hex() def disconnect(self): self.connected = False def reconnect(self): if not self.creds: raise CloudTokenError() self.connected = True def _register_event(self, action, target_object, prior_oid=None): event = MockEvent(action, target_object, prior_oid) self._events.append(event) target_object.update() self._latest_cursor = len(self._events) - 1 def _get_by_oid(self, oid): self._api() return self._fs_by_oid.get(oid, None) def normalize(self, path): if self.case_sensitive: return path else: return path.lower() def _get_by_path(self, path): path = self.normalize(path) # TODO: normalize the path, support case insensitive lookups, etc self._api() return self._fs_by_path.get(path, None) def _store_object(self, fo: MockFSObject): # TODO: support case insensitive storage assert fo.path == fo.path.rstrip("/") if fo.path in self.locked_for_test: raise CloudTemporaryError("path %s is locked for test" % (fo.path)) if fo.oid in self._fs_by_oid and self._fs_by_oid[fo.oid].contents: self._total_size -= len(self._fs_by_oid[fo.oid].contents) if fo.contents and self._quota is not None and self._total_size + len(fo.contents) > self._quota: raise CloudOutOfSpaceError("out of space in mock") self._fs_by_path[self.normalize(fo.path)] = fo self._fs_by_oid[fo.oid] = fo if fo.contents: self._total_size += len(fo.contents) def set_quota(self, quota: int): self._quota = quota def _unstore_object(self, fo: MockFSObject): # TODO: do I need to check if the path and ID exist before del to avoid a key error, # or perhaps just catch and swallow that exception? del self._fs_by_path[self.normalize(fo.path)] del self._fs_by_oid[fo.oid] if fo.contents: self._total_size -= len(fo.contents) def _translate_event(self, pe: MockEvent, cursor) -> Event: event = pe.serialize() provider_type = event.get("object type", None) standard_type = self._type_map.get(provider_type, None) assert standard_type oid = event.get("id", None) mtime = event.get("mtime", None) trashed = event.get("trashed", None) prior_oid = event.get("prior_oid", None) path = None if self.oid_is_path: path = event.get("path") retval = Event(standard_type, oid, path, None, not trashed, mtime, prior_oid, new_cursor=cursor) return retval def _api(self, *args, **kwargs): if not self.connected: raise CloudDisconnectedError() pass @property def latest_cursor(self): return self._latest_cursor @property def current_cursor(self): return self._cursor @current_cursor.setter def current_cursor(self, val): if val is None: val = self.latest_cursor if not isinstance(val, int) and val is not None: raise CloudCursorError(val) self._cursor = val def events(self) -> Generator[Event, None, None]: self._api() # log.error("GETTING EVENTS : %s", self._cursor) while self._cursor < self._latest_cursor: self._cursor += 1 pe = self._events[self._cursor] yield self._translate_event(pe, self._cursor) def walk(self, path, since=None): # TODO: implement "since" parameter self._api() for obj in list(self._fs_by_oid.values()): if self.is_subpath(path, obj.path, strict=False): yield Event(obj.otype, obj.oid, obj.path, obj.hash(), obj.exists, obj.mtime) def upload(self, oid, file_like, metadata=None) -> OInfo: self._api() file = self._fs_by_oid.get(oid, None) if file is None or not file.exists: raise CloudFileNotFoundError(oid) if file.type != MockFSObject.FILE: raise CloudFileExistsError("Only files may be uploaded, and %s is not a file" % file.path) if file.path in self.locked_for_test: raise CloudTemporaryError("path %s is locked for test" % (file.path)) contents = file_like.read() file.contents = contents self._register_event(MockEvent.ACTION_UPDATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def listdir(self, oid) -> Generator[DirInfo, None, None]: folder_obj = self._get_by_oid(oid) if not (folder_obj and folder_obj.exists and folder_obj.type == MockFSObject.DIR): raise CloudFileNotFoundError(oid) path = folder_obj.path for obj in self._fs_by_oid.values(): if obj.exists: relative = self.is_subpath(path, obj.path, strict=True) if relative: relative = relative.lstrip("/") if "/" not in relative: yield DirInfo(otype=obj.otype, oid=obj.oid, hash=obj.hash(), path=obj.path, name=relative) def create(self, path, file_like, metadata=None) -> OInfo: # TODO: store the metadata file = self._get_by_path(path) if file is not None and file.exists: raise CloudFileExistsError("Cannot create, '%s' already exists" % file.path) self._verify_parent_folder_exists(path) if file is None or not file.exists: file = MockFSObject(path, MockFSObject.FILE, self.oid_is_path) file.contents = file_like.read() file.exists = True self._store_object(file) log.debug("created %s %s", debug_sig(file.oid), file.type) self._register_event(MockEvent.ACTION_CREATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def download(self, oid, file_like): self._api() file = self._fs_by_oid.get(oid, None) if file is None or file.exists is False: raise CloudFileNotFoundError(oid) if file.type == MockFSObject.DIR: raise CloudFileExistsError("is a directory") file_like.write(file.contents) def rename(self, oid, path) -> str: log.debug("renaming %s -> %s", debug_sig(oid), path) self._api() # TODO: folders are implied by the path of the file... # actually check to make sure the folder exists and raise a FileNotFound if not object_to_rename = self._fs_by_oid.get(oid, None) if not (object_to_rename and object_to_rename.exists): raise CloudFileNotFoundError(oid) possible_conflict = self._get_by_path(path) if possible_conflict and possible_conflict.oid == oid: possible_conflict = None self._verify_parent_folder_exists(path) if possible_conflict and possible_conflict.exists: if possible_conflict.type != object_to_rename.type: log.debug("rename %s:%s conflicts with existing object of another type", debug_sig(oid), object_to_rename.path) raise CloudFileExistsError(path) if possible_conflict.type == MockFSObject.DIR: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError(path) except StopIteration: pass # Folder is empty, rename over it no problem else: raise CloudFileExistsError(path) log.debug("secretly deleting empty folder %s", path) self.delete(possible_conflict.oid) if object_to_rename.path == path: return oid prior_oid = None if self.oid_is_path: prior_oid = object_to_rename.oid if object_to_rename.type == MockFSObject.FILE: self._rename_single_object(object_to_rename, path, event=True) else: # object to rename is a directory old_path = object_to_rename.path for obj in set(self._fs_by_oid.values()): if self.is_subpath(old_path, obj.path, strict=True): new_obj_path = self.replace_path(obj.path, old_path, path) self._rename_single_object(obj, new_obj_path, event=False) # only parent generates event self._rename_single_object(object_to_rename, path, event=True) if self.oid_is_path: assert object_to_rename.oid != prior_oid, "rename %s to %s" % (prior_oid, path) else: assert object_to_rename.oid == oid, "rename %s to %s" % (object_to_rename.oid, oid) return object_to_rename.oid def _rename_single_object(self, source_object: MockFSObject, destination_path, *, event): destination_path = destination_path.rstrip("/") # This will assume all validation has already been done, and just rename the thing # without trying to rename contents of folders, just rename the object itself log.debug("renaming %s to %s", source_object.path, destination_path) prior_oid = source_object.oid if self.oid_is_path else None self._unstore_object(source_object) source_object.path = destination_path if self.oid_is_path: source_object.oid = destination_path self._store_object(source_object) self._register_event(MockEvent.ACTION_RENAME, source_object, prior_oid) log.debug("rename complete %s", source_object.path) self.log_debug_state() def mkdir(self, path) -> str: self._verify_parent_folder_exists(path) file = self._get_by_path(path) if file and file.exists: if file.type == MockFSObject.FILE: raise CloudFileExistsError(path) else: log.debug("Skipped creating already existing folder: %s", path) return file.oid new_fs_object = MockFSObject(path, MockFSObject.DIR, self.oid_is_path) self._store_object(new_fs_object) self._register_event(MockEvent.ACTION_CREATE, new_fs_object) return new_fs_object.oid def delete(self, oid): return self._delete(oid) def _delete(self, oid, without_event=False): log.debug("delete %s", debug_sig(oid)) self._api() file = self._fs_by_oid.get(oid, None) if file and file.path in self.locked_for_test: raise CloudTemporaryError("path %s is locked for test" % (file.path)) log.debug("got %s", file) if file and file.exists: if file.otype == OType.DIRECTORY: try: next(self.listdir(file.oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, file.path)) except StopIteration: pass # Folder is empty, delete it no problem else: path = file.path if file else "" log.debug("Deleting non-existent oid %s:%s ignored", debug_sig(oid), path) return file.exists = False # todo: rename on top of another file needs to be allowed... and not require deletes # until mock provider supports this... we need a "special delete" # for tests that test this behavior if not without_event: self._register_event(MockEvent.ACTION_DELETE, file) def exists_oid(self, oid): self._api() file = self._fs_by_oid.get(oid, None) return file is not None and file.exists def exists_path(self, path) -> bool: file = self._get_by_path(path) return file is not None and file.exists def hash_oid(self, oid) -> Any: file = self._fs_by_oid.get(oid, None) if file and file.exists: return file.hash() else: return None @staticmethod def hash_data(file_like) -> bytes: return md5(file_like.read()).digest() def info_path(self, path: str) -> Optional[OInfo]: file: MockFSObject = self._get_by_path(path) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: self._api() file: MockFSObject = self._fs_by_oid.get(oid, None) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) # @staticmethod # def _slurp(path): # with open(path, "rb") as x: # return x.read() # # @staticmethod # def _burp(path, contents): # with open(path, "wb") as x: # x.write(contents) def log_debug_state(self, msg=""): files = list(self.walk("/")) log.debug("%s: mock provider state %s:%s", msg, len(files), files) ################### def mock_provider_instance(oid_is_path, case_sensitive): prov = MockProvider(oid_is_path, case_sensitive) prov.event_timeout = 1 prov.event_sleep = 0.001 return prov @pytest.fixture(name="mock_provider", params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider_fixture(request): return mock_provider_instance(*request.param) @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider_generator(request): return lambda oid_is_path=None, case_sensitive=None: \ mock_provider_instance( request.param[0] if oid_is_path is None else oid_is_path, request.param[1] if case_sensitive is None else case_sensitive) @pytest.fixture def mock_provider_creator(): return mock_provider_instance def test_mock_basic(): """ basic spot-check, more tests are in test_providers with mock as one of the providers """ from io import BytesIO m = MockProvider(False, True) info = m.create("/hi.txt", BytesIO(b'hello')) assert info.hash assert info.oid b = BytesIO() m.download(info.oid, b) assert b.getvalue() == b'hello' PKL9OZ@Z''(cloudsync/tests/fixtures/mock_storage.pyfrom threading import Lock from typing import Dict, Any, Tuple, Optional import logging from cloudsync import Storage, LOCAL, REMOTE log = logging.getLogger(__name__) class MockStorage(Storage): # Does not actually persist the data... but it's just a mock top_lock = Lock() lock_dict: Dict[str, Lock] = dict() def __init__(self, storage_dict: Dict[str, Dict[int, bytes]]): self.storage_dict = storage_dict self.cursor: int = 0 # the next eid def _get_internal_storage(self, tag: str) -> Tuple[Lock, Dict[int, bytes]]: with self.top_lock: lock: Lock = self.lock_dict.setdefault(tag, Lock()) return lock, self.storage_dict.setdefault(tag, dict()) def create(self, tag: str, serialization: bytes) -> Any: lock, storage = self._get_internal_storage(tag) with lock: current_index = self.cursor self.cursor += 1 storage[current_index] = serialization return current_index def update(self, tag: str, serialization: bytes, eid: Any): lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) storage[eid] = serialization return 1 def delete(self, tag: str, eid: Any): lock, storage = self._get_internal_storage(tag) log.debug("deleting eid%s", eid) with lock: if eid not in storage: log.debug("ignoring delete: id %s doesn't exist" % eid) return del storage[eid] def read_all(self, tag: str) -> Dict[Any, bytes]: lock, storage = self._get_internal_storage(tag) with lock: ret: Dict[Any, bytes] = storage.copy() return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) return storage[eid] PKL9O"" cloudsync/tests/fixtures/util.pyfrom typing import NamedTuple, Union, Sequence, List, cast, Any, Tuple import logging from cloudsync.provider import Provider from cloudsync.runnable import time_helper from cloudsync import CloudFileNotFoundError log = logging.getLogger(__name__) log.setLevel(logging.INFO) TIMEOUT = 4 WaitForArg = Union[Tuple[int, str], 'WaitFor'] class WaitFor(NamedTuple): side: int = None path: str = None hash: bytes = None oid: str = None exists: bool = True @staticmethod def is_found(files: Sequence[WaitForArg], providers: Tuple[Provider, Provider], errs: List[str]): ok = True errs.clear() for f in files: if type(f) is tuple: info = WaitFor(side=f[0], path=f[1]) else: info = cast(WaitFor, f) try: other_info = providers[info.side].info_path(info.path) except CloudFileNotFoundError: other_info = None if other_info is None: if info.exists is False: log.debug("waiting not exists %s", info.path) continue log.debug("waiting exists %s", info.path) errs.append("file not found %s" % info.path) ok = False break if info.exists is False: errs.append("file exists %s" % info.path) ok = False break if info.hash and info.hash != other_info.hash: log.debug("waiting hash %s", info.path) errs.append("mismatch hash %s" % info.path) ok = False break return ok class RunUntilHelper: def run_until_found(self: Any, *files: WaitForArg, timeout=TIMEOUT): log.debug("running until found") errs: List[str] = [] found = lambda: WaitFor.is_found(files, self.providers, errs) self.run(timeout=timeout, until=found) if not found(): raise TimeoutError("timed out while waiting: %s" % errs) PKL9O%cloudsync/tests/providers/__init__.pyPKL9OK$cloudsync/tests/providers/dropbox.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError, CloudTokenError from cloudsync.providers.dropbox import DropboxProvider from cloudsync.oauth import OAuthConfig def dropbox_creds(): token_set = os.environ.get("DROPBOX_TOKEN") if not token_set: return None tokens = token_set.split(",") creds = { "key": tokens[random.randrange(0, len(tokens))], } return creds def bad_dropbox_creds(): creds = { "key": 'im a bad bad key', } return creds def app_id(): return os.environ.get("DROPBOX_APP_ID", None) def app_secret(): return os.environ.get("DROPBOX_APP_SECRET", None) def dropbox_provider(): cls = DropboxProvider # duck type in testing parameters cls.event_timeout = 20 # type: ignore cls.event_sleep = 2 # type: ignore cls.creds = dropbox_creds() # type: ignore return cls(app_id=app_id(), app_secret=app_secret()) @pytest.fixture def cloudsync_provider(): return dropbox_provider() def connect_test(want_oauth: bool, creds=None): if creds is None: creds = dropbox_creds() # if not creds: # pytest.skip('requires dropbox token and client secret') if want_oauth: creds.pop("key", None) # triggers oauth to get a new refresh token sync_root = "/" + os.urandom(16).hex() gd = DropboxProvider(app_id=app_id(), app_secret=app_secret()) try: gd.connect(creds) except CloudTokenError: if not want_oauth: raise creds = gd.authenticate() gd.connect(creds) assert gd.client gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass return gd.api_key def test_connect(): connect_test(False) @pytest.mark.manual def test_oauth_connect(): connect_test(True) @pytest.mark.manual def test_oauth_connect_given_bad_creds(): api_key = connect_test(True, bad_dropbox_creds()) bad_api_key = "x" + api_key[1:] with pytest.raises(CloudTokenError): connect_test(False, {"key": bad_api_key}) PKL9O)y#cloudsync/tests/providers/gdrive.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.gdrive import GDriveProvider # move this to provider ci_creds() function? def gdrive_creds(): token_set = os.environ.get("GDRIVE_TOKEN") if not token_set: return None tokens = token_set.split(",") creds = { "refresh_token": tokens[random.randrange(0, len(tokens))], } return creds def on_success(auth_dict=None): assert auth_dict is not None and isinstance(auth_dict, dict) def app_id(): return os.environ.get("GDRIVE_APP_ID", None) def app_secret(): return os.environ.get("GDRIVE_APP_SECRET", None) def gdrive_provider(): cls = GDriveProvider # duck type in testing parameters cls.event_timeout = 60 # type: ignore cls.event_sleep = 2 # type: ignore cls.creds = gdrive_creds() # type: ignore return cls(app_id=app_id(), app_secret=app_secret()) @pytest.fixture def cloudsync_provider(): gdrive_provider() def connect_test(want_oauth: bool): creds = gdrive_creds() if not creds: pytest.skip('requires gdrive token') if want_oauth: creds.pop("refresh_token", None) # triggers oauth to get a new refresh token sync_root = "/" + os.urandom(16).hex() gd = GDriveProvider(app_id=app_id(), app_secret=app_secret()) gd.connect(creds) assert gd.client gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass def test_connect(): connect_test(False) @pytest.mark.manual def test_oauth_connect(): connect_test(True) PKL9O2!cloudsync/tests/providers/mock.pyimport pytest from ..fixtures.mock_provider import MockProvider @pytest.fixture def cloudsync_provider(): cls = MockProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = {} return cls PK!H!J4P+cloudsync-0.2.16.dist-info/entry_points.txtN+I/N.,()J/M)Kr3%%H ~ Everyone is permitted to copy and distribute verbatim copies of this licensedocument, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, “this License” refers to version 3 of the GNU Lesser General Public License, and the “GNU GPL” refers to version 3 of the GNU General Public License. “The Library” refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An “Application” is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A “Combined Work” is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the “Linked Version”. The “Minimal Corresponding Source” for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The “Corresponding Application Code” for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License “or any later version” applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. PK!HPO cloudsync-0.2.16.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H7_"#cloudsync-0.2.16.dist-info/METADATAUKoFWL dl9UZܠ"TN{bEȅ>,IIRoy~B fYjÕ&f r\a2ʷ  WL@ɖ ^7;X-00Az\Ն$U7$Ҵ$#LrUL<=79JC_f>9 Ca:1֌ 5;~hm.x@kXA|Ei}C>-lJXj+{+X+8!B5>!O#>/>] Bqh jxD hs J` q>ܾϵ*5k.K2Y:<>IP+e!הOR|Rޟ̀iυQ#nrA%u~s%W7VDfv?A3L Es'7ClYh hTI7mqwRKfnSL_;|*!/YgE 7q^ٔ?Qe^MjfhFc> }\] IЏ} UZI/>~ oKciv_uǧϾJ*m\Uch&aN+L)Ԣ)hI;ukeeξGR exfNŔs6eF7PBGҭы  p*kM۪&[-Fa;Dx%%Ct RXRYoyE{#&hp$5neF1l]+VJZ1Y,11Gqyrvr3E+`XtE_5(sIהkXKm/PW# ԉNWLC+^,s c*hQ4*`PK̙OϦ2it[P&C~^ PK!HlN!cloudsync-0.2.16.dist-info/RECORDIZ`>" "ȪE9 'ݜ0oK˴ ĿʙdJ퐋vNt'5 p,/ xP@AaBiz!cvY,͍0۲ D3O~3h wzoekҏm<_2D$p!. *%?Zi웬NH9v)J pG).ʭdɎmaǡ;Űy?Ct]Qx!-9wF)|KtsG0II+\;\ǽYkZ]}`AeyQe{YB+T$2 UՆ_|9JW:rG8 #SVH;Bumݠ,A<0ZϨj,y6M4+D܁hx+|KY0y:^\0ugD!6n8] cDe`[W%~/<҅v:0SBvHMg ‰7_'{A[| F[?|9.;1G!.)hxUq`eT0pa[|Drخl$m(m7ώm'םkxh 6! ~UE S$7nId&[t_sd֣v}5sQSA5m&ZW^+wG?]fwߒTjsZ=jaw \Z13ro*LnB|@Gl" !z;>ߤ]'jtCe'< JZBս8kwh!#?u*Ҿ NX d0bc ta پ򴋽 Y5gs(dĿΪ:Yjr0xw~=-kf?F{OĀ8&fİxkfQr.uq}i(0#Z)&6s, "빟ĺnW3jG߈!lԍ- p Qp_j::P!ߐ*1VIj2<]* \(#uxS /D~6&. HҬL#uM 58ގte oA1.*++1 lu_:㔬vX"l}{[l.?2! o $ȏQ qO`Ɍstș/f @)=Ia$i9)kMr=K1k+@f"V7CY'Zn tj#wI+qRHx̯/Ktu'}n o6W#&uTFu#UV.OG=Hn;";=g_ n ]$gwEp/%H1aV݄\UiY3N=n#vǹ.?UO`rM^upQFvO]ʫMxۃȦ;?viTJ?6k7up|1;}X,םO5PloTyk&1d<չhDgH­ĝD1s ж/ŷy*Ju{*!̼9*u:yy-y!_YH< $)ME|MhA:q fnwOAPKL9OѧHSScloudsync/__init__.pyPKL9Os+cloudsync/command.pyPKL9Oڦcloudsync/cs.pyPKL9OAmm=%cloudsync/event.pyPKL9O`#JJ=cloudsync/exceptions.pyPKL9OÖ&##Y@cloudsync/log.pyPKL9OmcAcloudsync/muxer.pyPKL9O$$Hcloudsync/provider.pyPKL9Omcloudsync/py.typedPKL9O`X mcloudsync/runnable.pyPKL9Odmwcloudsync/scramble.pyPKL9O2ycloudsync/types.pyPKL9OTաf}cloudsync/utils.pyPKL9Oc`88Zcloudsync/oauth/__init__.pyPKL9Oh'&&˂cloudsync/oauth/apiserver.pyPKL9OW{|cloudsync/oauth/oauth_config.pyPKL9OrZZcloudsync/oauth/redir_server.pyPKL9O 牀Hcloudsync/providers/__init__.pyPKL9O;N__cloudsync/providers/dropbox.pyPKL9Oa8hcxcx%cloudsync/providers/gdrive.pyPKL9O/kcloudsync/sync/__init__.pyPKL9Ot6!Bcloudsync/sync/manager.pyPKL9O~* * ccloudsync/sync/sqlite_storage.pyPKL9OpGmcloudsync/sync/state.pyPKL9O555cloudsync/tests/__init__.pyPKL9OA%{{cloudsync/tests/conftest.pyPKL9Ov&bcloudsync/tests/pytest.iniPKL9OA%''cloudsync/tests/test_cs.pyPKL9O^28--Wcloudsync/tests/test_events.pyPKL9O`ggcloudsync/tests/test_mux.pyPKL9O$*`cloudsync/tests/test_oauth_redir_server.pyPKL9Oߵ Ucloudsync/tests/test_provider.pyPKL9Ot>cloudsync/tests/test_state.pyPKL9Ocloudsync/tests/fixtures/mock_provider.pyPKL9OZ@Z''(8cloudsync/tests/fixtures/mock_storage.pyPKL9O"" cloudsync/tests/fixtures/util.pyPKL9O%cloudsync/tests/providers/__init__.pyPKL9OK$Hcloudsync/tests/providers/dropbox.pyPKL9O)y#0cloudsync/tests/providers/gdrive.pyPKL9O2!Fcloudsync/tests/providers/mock.pyPK!H!J4P+^cloudsync-0.2.16.dist-info/entry_points.txtPKL9O[0}WW"ۧcloudsync-0.2.16.dist-info/LICENSEPK!HPO rcloudsync-0.2.16.dist-info/WHEELPK!H7_"#cloudsync-0.2.16.dist-info/METADATAPK!HlN!cloudsync-0.2.16.dist-info/RECORDPK00