PKc$OZcloudsync/__init__.py""" cloudsync enables simple cloud file-level sync with a variety of cloud providers External modules: cloudsync.Event cloudsync.Provider cloudsync.Sync Example: import cloudsync prov = cloudsync.Provider('GDrive', token="237846234236784283") info = prov.upload(file, "/dest") print ("id of /dest is %s, hash of /dest is %s" % (info.id, info.hash)) Command-line example: cloudsync -p gdrive --token "236723782347823642786" -f ~/gdrive-folder --daemon """ __version__ = "0.1.17" # must be imported first from .log import logger # import modules into top level for convenience from .provider import * from .event import * from .sync import * from .exceptions import * from .types import * from .cs import * from .command import main if __name__ == "__main__": main() PKc$O=h##cloudsync/apiserver.pyimport re import json import traceback import time import urllib.parse as urlparse import threading import logging from enum import Enum import unittest # TODO: this is an inappropriate default server, default should be wsgiref builtin import waitress # TODO: caller should specify the mechanism for channel empty detection from waitress.channel import HTTPChannel log = logging.getLogger(__name__) class ApiServerLogLevel(Enum): NONE = 0 # do not log calls CALLS = 1 # log calls but not their args ARGS = 2 # log calls with args class ApiError(Exception): def __init__(self, code, msg=None, desc=None): super().__init__() self.code = code self.msg = str(msg) or "UNKNOWN" self.desc = desc def __str__(self): return f"{self.code}, {self.msg}" @classmethod def from_json(cls, error_json): return cls(error_json.get('code', 500), msg=error_json.get('msg', None), desc=error_json.get('desc', None)) def api_route(path): def outer(func): if not hasattr(func, "_routes"): setattr(func, "_routes", []) func._routes += [path] return func return outer def sanitize_for_status(e): e = e.replace("\r\n", " ") e = e.replace("\n", " ") e = e.replace("\r", " ") e = e[0:100] return e class ApiServer: def __init__(self, addr, port, headers=None, log_level=ApiServerLogLevel.ARGS): """ Create a new server on address, port. Port can be zero. from apiserver import ApiServer, ApiError, api_route Create your handlers by inheriting from ApiServer and tagging them with @api_route("/path"). Alternately you can use the ApiServer() directly, and call add_handler("path", function) Raise errors by raising ApiError(code, message, description=None) Return responses by simply returning a dict() or str() object Parameter to handlers is a dict() Query arguments are shoved into the dict via urllib.parse_qs """ self.__addr = addr self.__port = port self.__headers = headers if headers else [] self.__log_level = log_level self.__server = waitress.server.create_server(self, host=self.__addr, port=self.__port, clear_untrusted_proxy_headers=False) self.__started = False self.__routes = {} # routed methods map into handler for meth in type(self).__dict__.values(): if hasattr(meth, "_routes"): for route in meth._routes: # pylint: disable=protected-access self.add_route(route, meth) def add_route(self, path, meth, content_type='application/json'): self.__routes[path] = (meth, content_type) def port(self): """Get my port""" sa = self.__server.socket.getsockname() return sa[1] def address(self): """Get my ip address""" sa = self.__server.socket.getsockname() return sa[0] def uri(self, path): """Make a URI pointing at myself""" if path[0] == "/": path = path[1:] uri = "http://" + self.__addr + ":" + str(self.port()) + "/" + path return uri def serve_forever(self): self.__started = True try: self.__server.run() except OSError: pass def __del__(self): self.shutdown() def shutdown(self): try: if self.__started: timeout = time.time() + 2 for channel in self.__server.active_channels.values(): channel: HTTPChannel while channel.total_outbufs_len > 0 and time.time() < timeout: time.sleep(.01) # give any connections with a non-empty output buffer a chance to drain self.__server.socket.close() self.__server.asyncore.close_all() except Exception: log.exception("exception during shutdown") def __call__(self, env, start_response): # pylint: disable=too-many-locals, too-many-branches, too-many-statements content = b"{}" length = env.get("CONTENT_LENGTH", 0) if length: content = env['wsgi.input'].read(int(length)) if content: try: info = json.loads(content) except Exception: raise ApiError(400, "Invalid JSON " + str(content, "utf-8")) else: info = {} url = '' try: url = env.get('PATH_INFO', '/') if self.__log_level == ApiServerLogLevel.CALLS or self.__log_level == ApiServerLogLevel.ARGS: log.debug('Processing URL %s', url) handler_tmp = self.__routes.get(url) if not handler_tmp: if url[-1] == "/": tmp = url[0:-1] handler_tmp = self.__routes.get(tmp) if not handler_tmp: m = re.match(r"(.*?/)[^/]+$", url) if m: # adding a route "/" handles /foo # adding a route "/foo/bar/" handles /foo/bar/baz handler_tmp = self.__routes.get(m[1]) query = env.get('QUERY_STRING') if query: params = urlparse.parse_qs(query) else: params = {} info.update(params) if handler_tmp: handler, content_type = handler_tmp try: response = handler(env, info) if response is None: response = "" if isinstance(response, dict): response = json.dumps(response) response = bytes(str(response), "utf-8") headers = self.__headers + [('Content-Type', content_type), ("Content-Length", str(len(response)))] start_response('200 OK', headers) yield response except ApiError: raise except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) except Exception as e: raise ApiError(500, type(e).__name__ + " : " + str(e), traceback.format_exc()) else: raise ApiError(404, f"No handler for {url}") except ApiError as e: try: log.error("GET %s : ERROR : %s", url, e) response = json.dumps({"code": e.code, "msg": e.msg, "desc": e.desc}) start_response(str(e.code) + ' ' + sanitize_for_status(e.msg), [('Content-Type', 'application/json'), ("Content-Length", str(len(response)))]) yield bytes(response, "utf-8") except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) class TestApiServer(unittest.TestCase): @staticmethod def test_nostart(): httpd = ApiServer('127.0.0.1', 0) httpd.shutdown() def test_basic(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, req): # pylint: disable=no-self-argument,no-self-use return "HERE" + str(req) @api_route("/json") def json(ctx, req): # pylint: disable=no-self-argument,no-self-use _ = req return {"obj": 1} httpd = MyServer('127.0.0.1', 0) httpd.add_route("/foo", lambda ctx, x: "FOO" + x["x"][0]) try: print("serving on ", httpd.address(), httpd.port()) threading.Thread(target=httpd.serve_forever, daemon=True).start() import requests response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.text, "HERE{}") response = requests.post(httpd.uri("/notfound"), data='{}') self.assertEqual(response.status_code, 404) response = requests.get(httpd.uri("/foo?x=4")) self.assertEqual(response.text, "FOO4") finally: httpd.shutdown() def test_error(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, unused_req): # pylint: disable=no-self-argument,no-self-use raise ApiError(501, "BLAH") httpd = MyServer('127.0.0.1', 0) try: print("serving on ", httpd.address(), httpd.port()) thread = threading.Thread(target=httpd.serve_forever, daemon=True) thread.start() import requests response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.status_code, 501) finally: httpd.shutdown() if thread: thread.join(timeout=2) assert not thread.is_alive() if __name__ == "__main__": unittest.main() PKc$Os+cloudsync/command.py#import argparse import os def main(): # parser = argparse.ArgumentParser(description='Process some integers.') # parser.add_argument('command', type=str, help='Command to run') # parser.add_argument('args', nargs=argparse.REMAINDER) # args = parser.parse_args() raise NotImplementedError() if __name__ == "__main__": import sys sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) main() PKc$Op&&cloudsync/cs.pyimport threading import logging from typing import Optional, Tuple from .sync import SyncManager, SyncState, Storage from .runnable import Runnable from .event import EventManager from .provider import Provider from .log import TRACE log = logging.getLogger(__name__) class CloudSync(Runnable): def __init__(self, providers: Tuple[Provider, Provider], roots: Tuple[str, str] = None, storage: Optional[Storage] = None, sleep: Optional[Tuple[int, int]] = None, ): if not roots and self.translate == CloudSync.translate: # pylint: disable=comparison-with-callable raise ValueError("Either override the translate() function, or pass in a pair of roots") self.providers = providers self.roots = roots if sleep is None: sleep = (providers[0].default_sleep, providers[1].default_sleep) # The tag for the SyncState will isolate the state of a pair of providers along with the sync roots state = SyncState(providers, storage, tag=self.storage_label()) smgr = SyncManager(state, providers, self.translate, self.resolve_conflict, sleep=sleep) # for tests, make these accessible self.state = state self.smgr = smgr # the label for each event manager will isolate the cursor to the provider/login combo for that side self.emgrs: Tuple[EventManager, EventManager] = ( EventManager(smgr.providers[0], state, 0, roots[0]), EventManager(smgr.providers[1], state, 1, roots[1]) ) self.sthread = threading.Thread(target=smgr.run, kwargs={'sleep': 0.1}) self.ethreads = ( threading.Thread(target=self.emgrs[0].run, kwargs={'sleep': sleep[0]}), threading.Thread(target=self.emgrs[1].run, kwargs={'sleep': sleep[1]}) ) log.info("initialized sync: %s", self.storage_label()) def lockattr(k, _v): if k not in self.__dict__: raise AttributeError("%s not an attribute" % k) self.__setattr__ = lockattr @property def aging(self): return self.smgr.aging @aging.setter def aging(self, val): self.smgr.aging = val def storage_label(self): # if you're using a pure translate, and not roots, you don't have to override the storage label # just don't resuse storage for the same pair of providers roots = self.roots or ('?', '?') assert self.providers[0].connection_id is not None assert self.providers[1].connection_id is not None return f"{self.providers[0].name}:{self.providers[0].connection_id}:{roots[0]}."\ f"{self.providers[1].name}:{self.providers[1].connection_id}:{roots[1]}" def walk(self): roots = self.roots or ('/', '/') for index, provider in enumerate(self.providers): for event in provider.walk(roots[index]): self.emgrs[index].process_event(event) def translate(self, index, path): if not self.roots: raise ValueError("Override translate function or provide root paths") relative = self.providers[1-index].is_subpath(self.roots[1-index], path) if not relative: log.log(TRACE, "%s is not subpath of %s", path, self.roots[1-index]) return None return self.providers[index].join(self.roots[index], relative) @staticmethod def resolve_conflict(_f1, _f2): # Input: # - f1 and f2 are file-likes that will block on read, and can possibly pull data from the network, internet, etc # - f1 and f2 also support the .path property to get a relative path to the file # - f1 and f2 also support the .side property # # Return Values: # # - A "merged" file-like which should be used as the data to replace both f1/f2 with # - One of f1 or f2, which is selected as the correct version # - "None", meaning there is no good resolution return None def start(self): self.sthread.start() self.ethreads[0].start() self.ethreads[1].start() def stop(self): log.info("stopping sync: %s", self.storage_label()) self.smgr.stop() self.emgrs[0].stop() self.emgrs[1].stop() # for tests, make this manually runnable def do(self): self.smgr.do() self.emgrs[0].do() self.emgrs[1].do() def done(self): self.smgr.done() self.emgrs[0].done() self.emgrs[1].done() PKc$O2;?cloudsync/event.pyimport logging from typing import TYPE_CHECKING, Optional from dataclasses import dataclass from .exceptions import CloudTemporaryError from .runnable import Runnable from .muxer import Muxer from .types import OType if TYPE_CHECKING: from cloudsync.sync import SyncState from cloudsync import Provider log = logging.getLogger(__name__) @dataclass class Event: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id path: Optional[str] # path hash: Optional[bytes] # fsobject hash (better name: ohash) exists: Optional[bool] mtime: Optional[float] = None prior_oid: Optional[str] = None # path basesd systems use this on renames new_cursor: Optional[str] = None class EventManager(Runnable): def __init__(self, provider: "Provider", state: "SyncState", side, walk_root=None): log.debug("provider %s, root %s", provider.name, walk_root) self.provider = provider assert self.provider.connection_id self.label = f"{self.provider.name}:{self.provider.connection_id}" self.events = Muxer(provider.events, restart=True) self.state = state self.side = side self.shutdown = False self._cursor_tag = self.label + "_cursor" self.cursor = self.state.storage_get_cursor(self._cursor_tag) self.walk_root = None if not self.cursor: self.walk_root = walk_root self.cursor = provider.current_cursor if self.cursor: self.state.storage_update_cursor(self._cursor_tag, self.cursor) else: log.debug("retrieved existing cursor %s for %s", self.cursor, self.provider.name) # TODO!!!! provider.current_cursor = self.cursor def do(self): if self.walk_root: log.debug("walking all %s/%s files as events, because no working cursor on startup", self.provider.name, self.walk_root) for event in self.provider.walk(self.walk_root): self.process_event(event) self.walk_root = None for event in self.events: if not event: log.error("%s got BAD event %s", self.label, event) continue self.process_event(event) current_cursor = self.provider.current_cursor if current_cursor != self.cursor: self.state.storage_update_cursor(self._cursor_tag, current_cursor) self.cursor = current_cursor def _drain(self): # for tests, delete events for _ in self.events: pass def process_event(self, event: Event): with self.state.lock: log.debug("%s got event %s", self.label, event) path = event.path exists = event.exists otype = event.otype if not event.path and not self.state.lookup_oid(self.side, event.oid): try: info = self.provider.info_oid(event.oid) if info: if info.otype != event.otype: log.warning("provider %s gave a bad event: %s != %s, using %s", self.provider.name, info.path, event.otype, info.otype) path = info.path otype = info.otype except CloudTemporaryError: pass self.state.update(self.side, otype, event.oid, path=path, hash=event.hash, exists=exists, prior_oid=event.prior_oid) def stop(self): self.events.shutdown = True self.shutdown = True super().stop() PKOá2;;cloudsync/exceptions.pyclass CloudException(Exception): pass class CloudFileNotFoundError(CloudException): pass class CloudTemporaryError(CloudException): pass class CloudFileExistsError(CloudException): pass class CloudTokenError(CloudException): pass class CloudDisconnectedError(CloudException): pass PKc$OÖ&##cloudsync/log.py# add TRACE named level, because libraries need it import logging logger = logging.getLogger(__package__) if isinstance(logging.getLevelName('TRACE'), str): logging.addLevelName(5, 'TRACE') # ses docs, this actually gets a number, because reasons TRACE = logging.getLevelName('TRACE') PKc$OC4cloudsync/muxer.pyimport queue from threading import Lock from collections import namedtuple class Muxer: Entry = namedtuple('Entry', 'genref listeners, lock') already = {} top_lock = Lock() def __init__(self, func, key=None, restart=False): self.restart = restart self.func = func self.queue = queue.Queue() self.shutdown = False self.key = key or func with self.top_lock: if self.key not in self.already: self.already[self.key] = self.Entry([func()], [], Lock()) ent = self.already[self.key] self.genref = ent.genref self.lock = ent.lock self.listeners = ent.listeners self.listeners.append(self) def __iter__(self): return self def __next__(self): try: e = self.queue.get_nowait() except queue.Empty: with self.lock: try: e = self.queue.get_nowait() except queue.Empty: try: e = next(self.genref[0]) for other in self.listeners: if other is not self: other.queue.put(e) except StopIteration: if self.restart and not self.shutdown: self.genref[0] = self.func() raise return e def __del__(self): with self.top_lock: try: self.listeners.remove(self) except ValueError: pass if not self.listeners and self.key in self.already: del self.already[self.key] PKc$OPLLcloudsync/oauth_redir_server.pyimport logging import sys import socket import threading import errno # from src import config from .apiserver import ApiServer # from src.osutil import is_windows # from src.vapiserver import VApiClient log = logging.getLogger(__name__) def is_windows(): return sys.platform in ("win32", "cygwin") class OAuthFlowException(Exception): pass class OAuthBadTokenException(Exception): pass class OAuthRedirServer: PORT_MIN = 52400 PORT_MAX = 52450 GUI_TIMEOUT = 15 def __init__(self, on_success, display_name, use_predefined_ports=False, on_failure=None): self.on_success = on_success self.on_failure = on_failure self.display_name = display_name # self.template = os.path.join(config.get().assetdir, 'oauth_result_template.html') log.debug('Creating oauth redir server') if use_predefined_ports: # Some providers (Dropbox) don't allow us to just use localhost # redirect. For these providers, we define a range of # 127.0.0.1:(PORT_MIN..PORT_MAX) as valid redir URLs for port in range(self.PORT_MIN, self.PORT_MAX): try: if is_windows(): # Windows is dumb and will be weird if we just try to # connect to the port directly. Check to see if the # port is responsive before claiming it as our own try: socket.create_connection(('127.0.0.1', port), 0.001) continue except socket.timeout: pass log.debug('Attempting to start api server on port %d', port) self.api_server = ApiServer('127.0.0.1', port) break except OSError: pass else: self.api_server = ApiServer('127.0.0.1', 0) if not self.api_server: raise OSError(errno.EADDRINUSE, "Unable to open any port in range 52400-52405") self.api_server.add_route('/auth/', self.auth_redir_success, content_type='text/html') self.api_server.add_route('/favicon.ico', lambda x, y: "", content_type='text/html') self.__thread = threading.Thread(target=self.api_server.serve_forever, daemon=True) self.__thread.start() log.info('Listening on %s', self.api_server.uri('/auth/')) def auth_redir_success(self, env, info): log.debug('In auth_redir_success with env=%s, info=%s', env, info) err = "" if info and ('error' in info or 'error_description' in info): err = info['error'] if 'error' in info else \ info['error_description'][0] if isinstance(err, list): err = err[0] if self.on_failure: self.on_failure(err) return self.auth_failure(err) try: self.on_success(auth_dict=info) except OAuthFlowException: log.warning('Got a page request when not in flow', exc_info=True) err = "No pending OAuth. This can happen if you refreshed this tab. " except Exception as e: log.exception('Failed to authenticate') err = 'Unknown error: %s' % e return self.auth_failure(err) if err else self.auth_success() def auth_success(self): return self.display_name + ": OAuth Success" def auth_failure(self, msg): return self.display_name + ": OAuth Failure:" + msg def shutdown(self): if self.api_server: self.api_server.shutdown() self.__thread = None def uri(self, *args, **kwargs): return self.api_server.uri(*args, **kwargs) __all__ = ['OAuthFlowException', 'OAuthBadTokenException', 'OAuthRedirServer'] PKc$OP cloudsync/provider.pyfrom abc import ABC, abstractmethod import os import re import logging from typing import TYPE_CHECKING, Generator, Optional from cloudsync.types import OInfo, DIRECTORY, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError if TYPE_CHECKING: from cloudsync.event import Event log = logging.getLogger(__name__) class Provider(ABC): # pylint: disable=too-many-public-methods sep: str = '/' # path delimiter alt_sep: str = '\\' # alternate path delimiter oid_is_path: bool = False case_sensitive: bool = True win_paths: bool = False connection_id: Optional[str] = None default_sleep: int = 0.01 @abstractmethod def _api(self, *args, **kwargs): ... def connect(self, creds): # pylint: disable=unused-argument # some providers don't need connections, so just don't implement/overload this method # providers who implement connections need to set the connection_id to a value # that is unique to each connection, so that connecting to this provider # under multiple userid's will produce different connection_id's. One # suggestion is to just set the connection_id to the user's login_id self.connection_id = os.urandom(16).hex() @property @abstractmethod def name(self): ... @property @abstractmethod def latest_cursor(self): ... @property @abstractmethod def current_cursor(self): ... @abstractmethod def events(self) -> Generator["Event", None, None]: ... @abstractmethod def walk(self, path, since=None): # Test that the root path does not show up in the walk ... @abstractmethod def upload(self, oid, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def create(self, path, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def download(self, oid, file_like): ... @abstractmethod def rename(self, oid, path) -> str: # TODO: test that a renamed file can be renamed again # TODO: test that renaming a folder renames the children in the state file ... @abstractmethod def mkdir(self, path) -> str: ... @abstractmethod def delete(self, oid): ... @abstractmethod def exists_oid(self, oid): ... @abstractmethod def exists_path(self, path) -> bool: ... @abstractmethod def listdir(self, oid) -> Generator[DirInfo, None, None]: ... def hash_oid(self, oid) -> Optional[bytes]: # TODO add a test to FNFE info = self.info_oid(oid) return info.hash if info else None @staticmethod @abstractmethod def hash_data(file_like) -> bytes: ... @abstractmethod def info_path(self, path: str) -> Optional[OInfo]: ... @abstractmethod def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: ... # CONVENIENCE def download_path(self, path, io): info = self.info_path(path) if not info or not info.oid: raise CloudFileNotFoundError() self.download(info.oid, io) # HELPER @classmethod def join(cls, *paths): res = "" rl = [] for path in paths: if not path or path == cls.sep: continue if isinstance(path, str): rl = rl + [path.strip(cls.sep).strip(cls.alt_sep)] continue for sub_path in path: if sub_path is None or sub_path == cls.sep or sub_path == cls.alt_sep: continue rl = rl + [sub_path.strip(cls.sep)] if not rl: return cls.sep res = cls.sep.join(rl) if not cls.win_paths or res[1] != ':': res = cls.sep + res return res def split(self, path): # todo cache regex index = path.rfind(self.sep) if self.alt_sep: index = max(index, path.rfind(self.alt_sep)) if index == -1: return path, "" if index == 0: return self.sep, path[index+1:] return path[:index], path[index+1:] def normalize_path(self, path: str): norm_path = path.rstrip(self.sep) if self.sep in ["\\", "/"]: parts = re.split(r'[\\/]+', norm_path) else: parts = re.split(r'[%s]+' % self.sep, norm_path) norm_path = self.join(*parts) if not self.case_sensitive: norm_path = norm_path.lower() return norm_path def is_subpath(self, folder, target, sep=None, alt_sep=None, strict=False): sep = sep or self.sep alt_sep = alt_sep or self.alt_sep if alt_sep: folder = folder.replace(alt_sep, sep) target = target.replace(alt_sep, sep) # Will return True for is-same-path in addition to target folder_full = str(folder) folder_full = folder_full.rstrip(sep) target_full = str(target) target_full = target_full.rstrip(sep) # .lower() instead of normcase because normcase will also mess with separators if not self.case_sensitive: folder_full_case = folder_full.lower() target_full_case = target_full.lower() else: folder_full_case = folder_full target_full_case = target_full # target is same as folder, or target is a subpath (ensuring separator is there for base) if folder_full_case == target_full_case: return False if strict else sep elif len(target_full) > len(folder_full) and target_full[len(folder_full)] == sep: if target_full_case.startswith(folder_full_case): return target_full[len(folder_full):] else: return False return False def replace_path(self, path, from_dir, to_dir): relative = self.is_subpath(from_dir, path) if relative: retval = to_dir + (relative if relative != self.sep else "") return retval if relative != "" else self.sep raise ValueError("replace_path used without subpath") def paths_match(self, patha, pathb): if patha is None and pathb is None: return True elif patha is None or pathb is None: return False return self.normalize_path(patha) == self.normalize_path(pathb) def dirname(self, path: str): ret, _ = self.split(path) return ret def basename(self, path: str): _, ret = self.split(path) return ret def _verify_parent_folder_exists(self, path): parent_path = self.dirname(path) if parent_path != self.sep: parent_obj = self.info_path(parent_path) if parent_obj is None: # perhaps this should separate "FileNotFound" and "non-folder parent exists" # and raise different exceptions raise CloudFileNotFoundError(parent_path) if parent_obj.otype != DIRECTORY: raise CloudFileExistsError(parent_path) def mkdirs(self, path): log.debug("mkdirs %s", path) try: oid = self.mkdir(path) # todo update state except CloudFileExistsError: # todo: mabye CloudFileExistsError needs to have an oid and/or path in it # at least optionally info = self.info_path(path) if info and info.otype == DIRECTORY: oid = info.oid else: raise except CloudFileNotFoundError: ppath, _ = self.split(path) if ppath == path: raise log.debug("mkdirs parent, %s", ppath) unused_oid = self.mkdirs(ppath) try: oid = self.mkdir(path) # todo update state except CloudFileNotFoundError: # when syncing, file exists seems to hit better conflict code for these sorts of situations # but this is a guess. todo: scenarios that make this happen raise CloudFileExistsError("f'ed up mkdir") return oid PKc$OC@YYcloudsync/runnable.pyimport time from abc import ABC, abstractmethod import threading import logging log = logging.getLogger(__name__) def time_helper(timeout, sleep=None, multiply=1): forever = not timeout end = forever or time.monotonic() + timeout while forever or end >= time.monotonic(): yield True if sleep is not None: time.sleep(sleep) sleep = sleep * multiply class Runnable(ABC): stopped = False wakeup = False def run(self, *, timeout=None, until=None, sleep=0.01): self.stopped = False self.wakeup = False endtime = sleep + time.monotonic() for _ in time_helper(timeout, sleep=.01): while time.monotonic() < endtime and not self.stopped and not self.wakeup: time.sleep(max(0, min(.01, endtime - time.monotonic()))) self.wakeup = False if self.stopped: break try: self.do() except Exception: log.exception("unhandled exception in %s", self.__class__) if self.stopped or (until is not None and until()): break endtime = sleep + time.monotonic() if self.stopped: self.done() def wake(self): self.wakeup = True @abstractmethod def do(self): ... def stop(self): self.stopped = True # pylint: disable=attribute-defined-outside-init def done(self): pass def test_runnable(): class TestRun(Runnable): def __init__(self): self.cleaned = False self.called = 0 def do(self): self.called += 1 def done(self): self.cleaned = True testrun = TestRun() testrun.run(timeout=0.02, sleep=0.001) assert testrun.called testrun.called = 0 testrun.run(until=lambda: testrun.called == 1) assert testrun.called == 1 thread = threading.Thread(target=testrun.run) thread.start() testrun.stop() thread.join(timeout=1) assert testrun.stopped == 1 PKc$Odmcloudsync/scramble.pyimport random def scramble(gen, buffer_size): buf = [] i = iter(gen) while True: try: e = next(i) buf.append(e) if len(buf) >= buffer_size: choice = random.randint(0, len(buf)-1) buf[-1], buf[choice] = buf[choice], buf[-1] yield buf.pop() except StopIteration: random.shuffle(buf) yield from buf return PKc$OUoȷcloudsync/types.pyfrom typing import Optional from enum import Enum from dataclasses import dataclass class OType(Enum): DIRECTORY = "dir" FILE = "file" NOTKNOWN = "trashed" DIRECTORY = OType.DIRECTORY FILE = OType.FILE NOTKNOWN = OType.NOTKNOWN # only allowed for deleted files! @dataclass class OInfo: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id hash: Optional[bytes] # fsobject hash (better name: ohash) path: Optional[str] # path @dataclass class DirInfo(OInfo): name: Optional[str] = None mtime: Optional[float] = None PKc$O 牀cloudsync/providers/__init__.pyfrom .gdrive import GDriveProvider from ..tests.fixtures.mock_provider import MockProvider from .dropbox import DropboxProvider PKc$O5>KKcloudsync/providers/dropbox.pyimport io import os import time import logging import threading from hashlib import sha256 from typing import Generator, Optional import requests import arrow import dropbox from dropbox import Dropbox, exceptions, files from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, \ CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError log = logging.getLogger(__name__) class _FolderIterator: def __init__(self, api, path, *, recursive, cursor=None): self.api = api self.path = path self.ls_res = None if not cursor: self.ls_res = self.api('files_list_folder', path=self.path, recursive=recursive, limit=200) else: self.ls_res = self.api('files_list_folder_continue', cursor) def __iter__(self): return self def __next__(self): if self.ls_res: if not self.ls_res.entries and self.ls_res.has_more: self.ls_res = self.api( 'files_list_folder_continue', self.ls_res.cursor) if self.ls_res and self.ls_res.entries: ret = self.ls_res.entries.pop() ret.cursor = self.ls_res.cursor return ret raise StopIteration() @property def cursor(self): return self.ls_res and self.ls_res.cursor class DropboxProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 _max_simple_upload_size = 15 * 1024 * 1024 _upload_block_size = 10 * 1024 * 1024 name = "Dropbox" def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() @property def connected(self): return self.client is not None def get_quota(self): space_usage = self._api('users_get_space_usage') account = self._api('users_get_current_account') if space_usage.allocation.is_individual(): used = space_usage.used allocated = space_usage.allocation.get_individual().allocated else: team_allocation = space_usage.allocation.get_team() used, allocated = team_allocation.used, team_allocation.allocated res = { 'used': used, 'total': allocated, 'login': account.email, 'uid': account.account_id[len('dbid:'):] } return res def connect(self, creds): log.debug('Connecting to dropbox') if not self.client: self.api_key = creds.get('key', self.api_key) if not self.api_key: raise CloudTokenError() self.client = Dropbox(self.api_key) try: quota = self.get_quota() self.connection_id = quota['login'] except exceptions.AuthError: self.disconnect() raise CloudTokenError() except Exception as e: log.debug("error connecting %s", e) self.disconnect() raise CloudDisconnectedError() def _api(self, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") log.debug("_api: %s (%s %s)", method, args, kwargs) with self.mutex: try: return getattr(self.client, method)(*args, **kwargs) except exceptions.ApiError as e: if isinstance(e.error, (files.ListFolderError, files.GetMetadataError, files.ListRevisionsError)): if e.error.is_path() and isinstance(e.error.get_path(), files.LookupError): inside_error: files.LookupError = e.error.get_path() if inside_error.is_malformed_path(): log.debug('Malformed path when executing %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'Malformed path when executing %s(%s)' % (method, kwargs)) if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.DeleteError): if e.error.is_path_lookup(): inside_error: files.LookupError = e.error.get_path_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.RelocationError): if e.error.is_from_lookup(): inside_error: files.LookupError = e.error.get_from_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s,%s)' % (method, args, kwargs)) if e.error.is_to(): inside_error: files.WriteError = e.error.get_to() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) log.debug("here") if isinstance(e.error, files.CreateFolderError): if e.error.is_path() and isinstance(e.error.get_path(), files.WriteError): inside_error: files.WriteError = e.error.get_path() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) except (exceptions.InternalServerError, exceptions.RateLimitError, requests.exceptions.ReadTimeout): raise CloudTemporaryError() except dropbox.stone_validators.ValidationError as e: log.debug("f*ed up api error: %s", e) if "never created" in str(e): raise CloudFileNotFoundError() if "did not match" in str(e): log.warning("oid error %s", e) raise CloudFileNotFoundError() raise except requests.exceptions.ConnectionError as e: log.exception('api error handled exception %s:%s', "dropbox", e.__class__.__name__) self.disconnect() raise CloudDisconnectedError() @property def root_id(self): return "" def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('files_list_folder_get_latest_cursor', self.root_id, recursive=True, include_deleted=True, limit=200) if res: return res.cursor else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor def _events(self, cursor, path=None): # pylint: disable=too-many-branches if path and path != "/": info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) oid = info.oid else: oid = self.root_id for res in _FolderIterator(self._api, oid, recursive=True, cursor=cursor): exists = True log.debug("event %s", res) if isinstance(res, files.DeletedMetadata): # dropbox doesn't give you the id that was deleted # we need to get the ids of every revision # then find out which one was the latest before the deletion time # then get the oid for that revs = self._api('files_list_revisions', res.path_lower, limit=10) if revs is None: # dropbox will give a 409 conflict if the revision history was deleted # instead of raising an error, this gets converted to revs==None log.info("revs is none for %s %s", oid, path) continue log.debug("revs %s", revs) deleted_time = revs.server_deleted if deleted_time is None: # not really sure why this happens, but this event isn't useful without it log.error("revs %s has no deleted time?", revs) continue latest_time = None for ent in revs.entries: assert ent.server_modified is not None if ent.server_modified <= deleted_time and \ (latest_time is None or ent.server_modified >= latest_time): oid = ent.id latest_time = ent.server_modified if not oid: log.error( "skipping deletion %s, because we don't know the oid", res) continue exists = False otype = None ohash = None elif isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None oid = res.id else: otype = FILE ohash = res.content_hash oid = res.id path = res.path_display event = Event(otype, oid, path, ohash, exists, time.time()) yield event if getattr(res, "cursor", False): self.__cursor = res.cursor def events(self) -> Generator[Event, None, None]: yield from self._events(self.current_cursor) def walk(self, path, since=None): yield from self._events(None, path=path) def listdir(self, oid) -> Generator[DirInfo, None, None]: yield from self._listdir(oid, recursive=False) def _listdir(self, oid, *, recursive) -> Generator[DirInfo, None, None]: info = self.info_oid(oid) for res in _FolderIterator(self._api, oid, recursive=recursive): if isinstance(res, files.DeletedMetadata): continue if isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None else: otype = FILE ohash = res.content_hash path = res.path_display oid = res.id relative = self.is_subpath(info.path, path).lstrip("/") if relative: yield DirInfo(otype, oid, ohash, path, name=relative) def create(self, path: str, file_like, metadata=None) -> OInfo: self._verify_parent_folder_exists(path) if self.exists_path(path): raise CloudFileExistsError(path) return self._upload(path, file_like, metadata) def upload(self, oid: str, file_like, metadata=None) -> OInfo: if oid.startswith(self.sep): raise CloudFileNotFoundError("Called upload with a path instead of an OID: %s" % oid) if not self.exists_oid(oid): raise CloudFileNotFoundError(oid) return self._upload(oid, file_like, metadata) def _upload(self, oid, file_like, metadata=None) -> OInfo: res = None metadata = metadata or {} file_like.seek(0, io.SEEK_END) size = file_like.tell() file_like.seek(0) if size < self._max_simple_upload_size: res = self._api('files_upload', file_like.read(), oid, mode=files.WriteMode('overwrite')) else: cursor = None while True: data = file_like.read(self._upload_block_size) if not data: if cursor: local_mtime = arrow.get(metadata.get('mtime', time.time())).datetime commit = files.CommitInfo(path=oid, mode=files.WriteMode.overwrite, autorename=False, client_modified=local_mtime, mute=True) res = self._api( 'files_upload_session_finish', data, cursor, commit ) break if not cursor: self._api('files_upload_session_start', data) cursor = files.UploadSessionCursor( res.session_id, len(data)) else: self._api('files_upload_session_append_v2', data, cursor) cursor.offset += len(data) if res is None: raise CloudFileExistsError() ret = OInfo(otype=FILE, oid=res.id, hash=res.content_hash, path=res.path_display) log.debug('upload result is %s', ret) return ret def download(self, oid, file_like): ok = self._api('files_download', oid) if not ok: raise CloudFileNotFoundError() res, content = ok for data in content.iter_content(self._upload_block_size): file_like.write(data) return OInfo(otype=FILE, oid=oid, hash=res.content_hash, path=res.path_display) def _attempt_rename_folder_over_empty_folder(self, info: OInfo, path: str) -> None: if info.otype != DIRECTORY: raise CloudFileExistsError(path) possible_conflict = self.info_path(path) if possible_conflict.otype == DIRECTORY: try: next(self._listdir(possible_conflict.oid, recursive=False)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: pass # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) self._api('files_move_v2', info.oid, path) return else: # conflict is a file, and we already know that the rename is on a folder raise CloudFileExistsError(path) def rename(self, oid, path): try: self._api('files_move_v2', oid, path) except CloudFileExistsError: old_info = self.info_oid(oid) if self.paths_match(old_info.path, path): new_info = self.info_path(path) if oid == new_info.oid and old_info.path != path: temp_path = path + "." + os.urandom(16).hex() self._api('files_move_v2', oid, temp_path) self.rename(oid, path) return oid if old_info.otype == DIRECTORY: self._attempt_rename_folder_over_empty_folder(old_info, path) else: raise return oid @staticmethod def hash_data(file_like) -> bytes: # get a hash from a filelike that's the same as the hash i natively use binstr = b'' while True: data = file_like.read(4 * 1024 * 1024) if not data: break binstr += sha256(data).digest() return sha256(binstr).hexdigest() def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ, unused-argument # TODO: check if a regular filesystem lets you mkdir over a non-empty folder... self._verify_parent_folder_exists(path) if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError() log.debug("Skipped creating already existing folder: %s", path) return info.oid res = self._api('files_create_folder_v2', path) log.debug("dbx mkdir %s", res) res = res.metadata return res.id def delete(self, oid): info = self.info_oid(oid) if not info: return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self._listdir(oid, recursive=False)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files_delete_v2', oid) except CloudFileNotFoundError: # shouldn't happen because we are checking above... return def exists_oid(self, oid) -> bool: return bool(self.info_oid(oid)) def info_path(self, path: str) -> Optional[OInfo]: if path == "/": return OInfo(DIRECTORY, "", None, "/") try: log.debug("res info path %s", path) res = self._api('files_get_metadata', path) log.debug("res info path %s", res) oid = res.id if oid[0:3] != 'id:': log.warning("invalid oid %s from path %s", oid, path) if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash path = res.path_display or path return OInfo(otype, oid, fhash, path) except CloudFileNotFoundError: return None def exists_path(self, path) -> bool: return self.info_path(path) is not None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: if oid == "": otype = DIRECTORY fhash = None path = "/" else: try: res = self._api('files_get_metadata', oid) log.debug("res info oid %s", res) path = res.path_display if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash except CloudFileNotFoundError: return None return OInfo(otype, oid, fhash, path) PKc$Oboocloudsync/providers/gdrive.pyimport time import logging import threading import webbrowser import hashlib from ssl import SSLError import json from typing import Generator, Optional import arrow from googleapiclient.discovery import build # pylint: disable=import-error from googleapiclient.errors import HttpError # pylint: disable=import-error from httplib2 import Http, HttpLib2Error from oauth2client import client # pylint: disable=import-error from oauth2client.client import OAuth2WebServerFlow, HttpAccessTokenRefreshError, OAuth2Credentials # pylint: disable=import-error from googleapiclient.http import _should_retry_response # This is necessary because google masks errors from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload # pylint: disable=import-error from cloudsync.oauth_redir_server import OAuthRedirServer from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError class GDriveFileDoneError(Exception): pass log = logging.getLogger(__name__) logging.getLogger('googleapiclient.discovery').setLevel(logging.WARN) class GDriveInfo(DirInfo): # pylint: disable=too-few-public-methods pids = [] def __init__(self, *a, pids=None, **kws): super().__init__(*a, **kws) if pids is None: pids = [] self.pids = pids class GDriveProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 provider = 'googledrive' name = 'Google Drive' _scope = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.activity.readonly' ] _client_id = '433538542924-ehhkb8jn358qbreg865pejbdpjnm31c0.apps.googleusercontent.com' _client_secret = 'Y-GBVYGO2v5V9cV4WsjMSXDV' _redir = 'urn:ietf:wg:oauth:2.0:oob' _token_uri = 'https://accounts.google.com/o/oauth2/token' _folder_mime_type = 'application/vnd.google-apps.folder' _io_mime_type = 'application/octet-stream' def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.refresh_token = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() self._ids = {"/": "root"} self._trashed_ids = {} self._flow = None self._redir_server: Optional[OAuthRedirServer] = None self._oauth_done = threading.Event() @property def connected(self): return self.client is not None def get_display_name(self): return self.name def initialize(self, localhost_redir=True): if localhost_redir: try: if not self._redir_server: self._redir_server = OAuthRedirServer(self._on_oauth_success, self.get_display_name(), use_predefined_ports=False, on_failure=self._on_oauth_failure) self._flow = OAuth2WebServerFlow(client_id=self._client_id, client_secret=self._client_secret, scope=self._scope, redirect_uri=self._redir_server.uri('/auth/')) except OSError: log.exception('Unable to use redir server. Falling back to manual mode') localhost_redir = False if not localhost_redir: self._flow = OAuth2WebServerFlow(client_id=self._client_id, client_secret=self._client_secret, scope=self._scope, redirect_uri=self._redir) url = self._flow.step1_get_authorize_url() self._oauth_done.clear() webbrowser.open(url) return localhost_redir, url def _on_oauth_success(self, auth_dict): auth_string = auth_dict['code'][0] try: res: OAuth2Credentials = self._flow.step2_exchange(auth_string) self.refresh_token = res.refresh_token self.api_key = res.access_token self._oauth_done.set() except Exception: log.exception('Authentication failed') raise def _on_oauth_failure(self, err): log.error("oauth failure: %s", err) self._oauth_done.set() def authenticate(self): try: self.initialize() self._oauth_done.wait() return {"refresh_token": self.refresh_token, "api_key": self.api_key, "client_secret": self._client_secret, "client_id": self._client_id, } finally: if self._redir_server: self._redir_server.shutdown() self._redir_server = None def get_quota(self): # https://developers.google.com/drive/api/v3/reference/about res = self._api('about', 'get', fields='storageQuota,user') quota = res['storageQuota'] user = res['user'] usage = int(quota['usage']) if 'limit' in quota and quota['limit']: limit = int(quota['limit']) else: # It is possible for an account to have unlimited space - pretend it's 1TB limit = 1024 * 1024 * 1024 * 1024 res = { 'used': usage, 'total': limit, 'login': user['emailAddress'], 'uid': user['permissionId'] } return res def connect(self, creds): log.debug('Connecting to googledrive') if not self.client: api_key = creds.get('api_key', self.api_key) refresh_token = creds.get('refresh_token', self.refresh_token) if not refresh_token: new_creds = self.authenticate() api_key = new_creds.get('api_key', None) refresh_token = new_creds.get('refresh_token', None) kwargs = {} try: with self.mutex: creds = client.GoogleCredentials(access_token=api_key, client_id=creds.get( 'client_id', self._client_id), client_secret=creds.get( 'client_secret', self._client_secret), refresh_token=refresh_token, token_expiry=None, token_uri=self._token_uri, user_agent=self.user_agent) creds.refresh(Http()) self.client = build( 'drive', 'v3', http=creds.authorize(Http())) kwargs['api_key'] = creds.access_token if getattr(creds, 'refresh_token', None): refresh_token = creds.refresh_token self.refresh_token = refresh_token self.api_key = api_key quota = None try: quota = self.get_quota() except SSLError: # pragma: no cover # Seeing some intermittent SSL failures that resolve on retry log.warning('Retrying intermittent SSLError') quota = self.get_quota() self.connection_id = quota['login'] except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() return self.client @staticmethod def _get_reason_from_http_error(e): # gets a default something (actually the message, not the reason) using their secret interface reason = e._get_reason() # pylint: disable=protected-access # parses the JSON of the content to get the reason from where it really lives in the content try: # this code was copied from googleapiclient/http.py:_should_retry_response() data = json.loads(e.content.decode('utf-8')) if isinstance(data, dict): reason = data['error']['errors'][0]['reason'] else: reason = data[0]['error']['errors']['reason'] except (UnicodeDecodeError, ValueError, KeyError): log.warning('Invalid JSON content from response: %s', e.content) return reason def _api(self, resource, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") with self.mutex: try: if resource == 'media': res = args[0] args = args[1:] else: res = getattr(self.client, resource)() meth = getattr(res, method)(*args, **kwargs) if resource == 'media' or (resource == 'files' and method == 'get_media'): ret = meth else: ret = meth.execute() log.debug("api: %s %s (%s) -> %s", method, args, kwargs, ret) return ret except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() except SSLError as e: if "WRONG_VERSION" in str(e): # httplib2 used by google's api gives this weird error for no discernable reason raise CloudTemporaryError(str(e)) raise except HttpError as e: if str(e.resp.status) == '416': raise GDriveFileDoneError() if str(e.resp.status) == '404': raise CloudFileNotFoundError('File not found when executing %s.%s(%s)' % ( resource, method, kwargs )) reason = self._get_reason_from_http_error(e) if str(e.resp.status) == '403' and str(reason) == 'parentNotAFolder': raise CloudFileExistsError("Parent Not A Folder") if (str(e.resp.status) == '403' and reason in ('userRateLimitExceeded', 'rateLimitExceeded', )) \ or str(e.resp.status) == '429': raise CloudTemporaryError("rate limit hit") # At this point, _should_retry_response() returns true for error codes >=500, 429, and 403 with # the reason 'userRateLimitExceeded' or 'rateLimitExceeded'. 403 without content, or any other # response is not retried. We have already taken care of some of those cases above, but we call this # below to catch the rest, and in case they improve their library with more conditions. If we called # meth.execute() above with a num_retries argument, all this retrying would happen in the google api # library, and we wouldn't have to think about retries. should_retry = _should_retry_response(e.resp.status, e.content) if should_retry: raise CloudTemporaryError("unknown error %s" % e) raise except (TimeoutError, HttpLib2Error): self.disconnect() raise CloudDisconnectedError("disconnected on timeout") @property def root_id(self): if not self.__root_id: res = self._api('files', 'get', fileId='root', fields='id', ) self.__root_id = res['id'] self._ids['/'] = self.__root_id return self.__root_id def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('changes', 'getStartPageToken') if res: return res.get('startPageToken') else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor def events(self) -> Generator[Event, None, None]: # pylint: disable=too-many-locals, too-many-branches page_token = self.current_cursor while page_token is not None: # log.debug("looking for events, timeout: %s", timeout) response = self._api('changes', 'list', pageToken=page_token, spaces='drive', includeRemoved=True, includeItemsFromAllDrives=True, supportsAllDrives=True) new_cursor = response.get('newStartPageToken', None) for change in response.get('changes'): log.debug("got event %s", change) # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T16:57:06.779Z', # 'removed': False, 'fileId': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'file': {'kind': 'drive#file', # 'id': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'name': 'dest', 'mimeType': 'application/octet-stream'}} # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T20:02:14.156Z', # 'removed': True, 'fileId': '1lhRe0nDplA6I5JS18642rg0KIbYN66lR'} ts = arrow.get(change.get('time')).float_timestamp oid = change.get('fileId') exists = not change.get('removed') fil = change.get('file') if fil: if fil.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE else: otype = None ohash = None path = self._path_oid(oid, use_cache=False) event = Event(otype, oid, path, ohash, exists, ts, new_cursor=new_cursor) remove = [] for cpath, coid in self._ids.items(): if coid == oid: if cpath != path: remove.append(cpath) if path and otype == DIRECTORY and self.is_subpath(path, cpath): remove.append(cpath) for r in remove: self._ids.pop(r, None) if path: self._ids[path] = oid log.debug("converted event %s as %s", change, event) yield event if new_cursor and page_token and new_cursor != page_token: self.__cursor = new_cursor page_token = response.get('nextPageToken') def _walk(self, path, oid): for ent in self.listdir(oid): current_path = self.join(path, ent.name) event = Event(otype=ent.otype, oid=ent.oid, path=current_path, hash=ent.hash, exists=True, mtime=time.time()) log.debug("walk %s", event) yield event if ent.otype == DIRECTORY: if self.exists_oid(ent.oid): yield from self._walk(current_path, ent.oid) def walk(self, path, since=None): info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) yield from self._walk(path, info.oid) def __prep_upload(self, path, metadata): # modification time mtime = metadata.get("modifiedTime", time.time()) mtime = arrow.get(mtime).isoformat() gdrive_info = { 'modifiedTime': mtime } # mime type, if provided mime_type = metadata.get("mimeType", None) if mime_type: gdrive_info['mimeType'] = mime_type # path, if provided if path: _, name = self.split(path) gdrive_info['name'] = name # misc properties, if provided app_props = metadata.get("appProperties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['appProperties'] = app_props # misc properties, if provided app_props = metadata.get("properties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['properties'] = app_props log.debug("info %s", gdrive_info) return gdrive_info def upload(self, oid, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(None, metadata) ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' res = self._api('files', 'update', body=gdrive_info, fileId=oid, media_body=ul, fields=fields) log.debug("response from upload %s", res) if not res: raise CloudTemporaryError("unknown response from drive on upload") md5 = res.get('md5Checksum', None) # can be none if the user tries to upload to a folder if md5 is None: possible_conflict = self._info_oid(oid) if possible_conflict and possible_conflict.otype == DIRECTORY: raise CloudFileExistsError("Can only upload to a file: %s" % possible_conflict.path) return OInfo(otype=FILE, oid=res['id'], hash=md5, path=None) def create(self, path, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(path, metadata) if self.exists_path(path): raise CloudFileExistsError() ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' parent_oid = self.get_parent_id(path) gdrive_info['parents'] = [parent_oid] res = self._api('files', 'create', body=gdrive_info, media_body=ul, fields=fields) log.debug("response from create %s : %s", path, res) if not res: raise CloudTemporaryError("unknown response from drive on upload") self._ids[path] = res['id'] log.debug("path cache %s", self._ids) return OInfo(otype=FILE, oid=res['id'], hash=res['md5Checksum'], path=path) def download(self, oid, file_like): req = self._api('files', 'get_media', fileId=oid) dl = MediaIoBaseDownload(file_like, req, chunksize=4 * 1024 * 1024) done = False while not done: try: _, done = self._api('media', 'next_chunk', dl) except GDriveFileDoneError: done = True def rename(self, oid, path): # pylint: disable=too-many-locals, too-many-branches pid = self.get_parent_id(path) add_pids = [pid] if pid == 'root': add_pids = [self.root_id] info = self._info_oid(oid) if info is None: log.debug("can't rename, oid doesn't exist %s", oid) raise CloudFileNotFoundError(oid) remove_pids = info.pids old_path = info.path _, name = self.split(path) body = {'name': name} if self.exists_path(path): possible_conflict = self.info_path(path) if FILE in (info.otype, possible_conflict.otype): if possible_conflict.oid != oid: # it's OK to rename a file over itself, frex, to change case raise CloudFileExistsError(path) else: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) if not old_path: for cpath, coid in list(self._ids.items()): if coid == oid: old_path = cpath if add_pids == remove_pids: add_pids = "" remove_pids = "" else: add_pids = ",".join(add_pids) remove_pids = ",".join(remove_pids) self._api('files', 'update', body=body, fileId=oid, addParents=add_pids, removeParents=remove_pids, fields='id') if old_path: for cpath, coid in list(self._ids.items()): relative = self.is_subpath(old_path, cpath) if relative: new_cpath = self.join(path, relative) self._ids.pop(cpath) self._ids[new_cpath] = coid log.debug("renamed %s -> %s", oid, body) return oid def listdir(self, oid) -> Generator[GDriveInfo, None, None]: query = f"'{oid}' in parents" try: res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, name, mimeType, trashed)', pageToken=None) except CloudFileNotFoundError: if self._info_oid(oid): return log.debug("listdir oid gone %s", oid) raise if not res or not res['files']: if self.exists_oid(oid): return raise CloudFileNotFoundError(oid) log.debug("listdir got res %s", res) for ent in res['files']: fid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent['name'] trashed = ent.get('trashed', False) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE if not trashed: yield GDriveInfo(otype, fid, fhash, None, pids=pids, name=name) def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError(path) log.debug("Skipped creating already existing folder: %s", path) return info.oid pid = self.get_parent_id(path) _, name = self.split(path) file_metadata = { 'name': name, 'parents': [pid], 'mimeType': self._folder_mime_type, } if metadata: file_metadata.update(metadata) res = self._api('files', 'create', body=file_metadata, fields='id') fileid = res.get('id') self._ids[path] = fileid return fileid def delete(self, oid): info = self.info_oid(oid) if not info: log.debug("deleted non-existing oid %s", oid) return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self.listdir(oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files', 'delete', fileId=oid) except CloudFileNotFoundError: log.debug("deleted non-existing oid %s", oid) for currpath, curroid in list(self._ids.items()): if curroid == oid: self._trashed_ids[currpath] = self._ids[currpath] del self._ids[currpath] def exists_oid(self, oid): return self._info_oid(oid) is not None def info_path(self, path: str) -> Optional[OInfo]: if path == "/": return self.info_oid(self.root_id) try: parent_id = self.get_parent_id(path) _, name = self.split(path) query = f"'{parent_id}' in parents and name='{name}'" res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, mimeType, trashed, name)', pageToken=None) except CloudFileNotFoundError: return None if not res['files']: return None ent = res['files'][0] if ent.get('trashed'): # TODO: # need to write a tests that moves files to the trash, as if a user moved the file to the trash # then assert it shows up "file not found" in all queries return None log.debug("res is %s", res) oid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent.get('name') # query is insensitive to certain features of the name # ....cache correct basename path = self.join(self.dirname(path), name) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE self._ids[path] = oid return GDriveInfo(otype, oid, fhash, path, pids=pids) def exists_path(self, path) -> bool: if path in self._ids: return True return self.info_path(path) is not None def get_parent_id(self, path): if not path: return None parent, _ = self.split(path) if parent == path: return self._ids.get(parent) # get the latest version of the parent path # it may have changed, or case may be different, etc. info = self.info_path(parent) if not info: raise CloudFileNotFoundError("parent %s must exist" % parent) # cache the latest version return self._ids[info.path] def _path_oid(self, oid, info=None, use_cache=True) -> Optional[str]: """convert oid to path""" if use_cache: for p, pid in self._ids.items(): if pid == oid: return p for p, pid in self._trashed_ids.items(): if pid == oid: return p # todo, better cache, keep up to date, etc. if not info: info = self._info_oid(oid) if info and info.pids and info.name: ppath = self._path_oid(info.pids[0]) if ppath: path = self.join(ppath, info.name) self._ids[path] = oid return path return None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: info = self._info_oid(oid) if info is None: return None # expensive path = self._path_oid(oid, info, use_cache=use_cache) ret = OInfo(info.otype, info.oid, info.hash, path) log.debug("info oid ret: %s", ret) return ret @staticmethod def hash_data(file_like) -> bytes: # get a hash from a filelike that's the same as the hash i natively use md5 = hashlib.md5() for c in iter(lambda: file_like.read(32768), b''): md5.update(c) return md5.hexdigest() def _info_oid(self, oid) -> Optional[GDriveInfo]: try: res = self._api('files', 'get', fileId=oid, fields='name, md5Checksum, parents, mimeType, trashed', ) except CloudFileNotFoundError: return None log.debug("info oid %s", res) if res.get('trashed'): return None pids = res.get('parents') fhash = res.get('md5Checksum') name = res.get('name') if res.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE return GDriveInfo(otype, oid, fhash, None, pids=pids, name=name) PKO_lcloudsync/sync/__init__.py__all__ = ['SyncManager', 'SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] from .manager import * from .state import * PKd$O{{cloudsync/sync/manager.py#pylint: disable=too-many-lines import os import logging import tempfile import shutil import time from typing import Tuple, Optional from cloudsync.provider import Provider __all__ = ['SyncManager'] from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError, CloudTemporaryError from cloudsync.types import DIRECTORY, FILE from cloudsync.runnable import Runnable from cloudsync.log import TRACE from .state import SyncState, SyncEntry, SideState, TRASHED, EXISTS, LOCAL, REMOTE, UNKNOWN from .util import debug_sig log = logging.getLogger(__name__) # useful for converting oids and pointer nubmers into digestible nonces FINISHED = 1 REQUEUE = 0 def other_side(index): return 1-index class ResolveFile(): def __init__(self, info, provider): self.info = info self.provider = provider self.path = info.path self.side = info.side self.otype = info.otype self.temp_file = info.temp_file if self.otype == FILE: assert info.temp_file self.__fh = None @property def fh(self): if not self.__fh: if not os.path.exists(self.info.temp_file): try: with open(self.info.temp_file, "wb") as f: self.provider.download(self.info.oid, f) except Exception as e: log.debug("error downloading %s", e) try: os.unlink(self.info.temp_file) except FileNotFoundError: pass raise self.__fh = open(self.info.temp_file, "rb") return self.__fh def read(self, *a): return self.fh.read(*a) def write(self, buf): return self.fh.write(buf) def close(self): return self.fh.close() def seek(self, *a): return self.fh.seek(*a) class SyncManager(Runnable): # pylint: disable=too-many-public-methods, too-many-instance-attributes def __init__(self, state, providers: Tuple[Provider, Provider], translate, resolve_conflict, sleep=None): self.state: SyncState = state self.providers: Tuple[Provider, Provider] = providers self.translate = translate self.__resolve_conflict = resolve_conflict self.tempdir = tempfile.mkdtemp(suffix=".cloudsync") if not sleep: # these are the event sleeps, but really we need more info than this sleep = (self.providers[LOCAL].default_sleep, self.providers[REMOTE].default_sleep) self.sleep = sleep # TODO: we need sync_aging, backoff_min, backoff_max, backoff_mult documented with an interface and tests! #### self.min_backoff = 0 self.max_backoff = 0 self.backoff = 0 max_sleep = max(sleep) # on sync fail, use the worst time for backoff self.aging = max_sleep / 5 # how long before even trying to sync self.min_backoff = max_sleep / 10.0 # event sleep of 15 seconds == 1.5 second backoff on failures self.max_backoff = max_sleep * 4.0 # escalating up to a 1 minute wait time self.backoff = self.min_backoff self.mult_backoff = 2 assert len(self.providers) == 2 def set_resolver(self, resolver): self.__resolve_conflict = resolver def do(self): with self.state.lock: sync: SyncEntry = self.state.change(self.aging) if sync: try: self.sync(sync) self.state.storage_update(sync) self.backoff = self.min_backoff except CloudTemporaryError as e: log.error( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted) sync.punt() time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) except Exception as e: log.exception( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted, stack_info=True) sync.punt() time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) def done(self): log.info("cleanup %s", self.tempdir) shutil.rmtree(self.tempdir) def get_latest_state(self, ent): log.log(TRACE, "before update state %s", ent) for i in (LOCAL, REMOTE): if not ent[i].changed: continue info = self.providers[i].info_oid(ent[i].oid, use_cache=False) if not info: continue ent[i].exists = EXISTS ent[i].hash = info.hash ent[i].otype = info.otype if ent[i].otype == FILE: if ent[i].hash is None: ent[i].hash = self.providers[i].hash_oid(ent[i].oid) if ent[i].exists == EXISTS: if ent[i].hash is None: log.warning("Cannot sync %s, since hash is None", ent[i]) if ent[i].path != info.path: ent[i].path = info.path self.update_entry(ent, oid=ent[i].oid, side=i, path=info.path) log.log(TRACE, "after update state %s", ent) def path_conflict(self, ent): # both are synced if ent[0].path and ent[1].path and ((ent[0].sync_hash and ent[1].sync_hash) # pylint: disable=too-many-boolean-expressions or (ent[0].otype == DIRECTORY and ent[1].otype == DIRECTORY)): return not self.providers[0].paths_match(ent[0].path, ent[0].sync_path) and \ not self.providers[1].paths_match(ent[1].path, ent[1].sync_path) return False def sync(self, sync): self.get_latest_state(sync) if sync.hash_conflict(): log.debug("handle hash conflict") self.handle_hash_conflict(sync) return if self.path_conflict(sync): log.debug("handle path conflict") self.handle_path_conflict(sync) return log.log(TRACE, "table\r\n%s", self.state.pretty_print()) for i in (LOCAL, REMOTE): if sync[i].changed: if sync[i].hash is None and sync[i].otype == FILE and sync[i].exists == EXISTS: log.debug("ignore %s", sync) # no hash for file, ignore it self.finished(i, sync) break # if the other side changed hash, handle it first if sync[i].hash == sync[i].sync_hash: other = other_side(i) if sync[other].changed and sync[other].hash != sync[other].sync_hash: continue response = self.embrace_change(sync, i, other_side(i)) if response == FINISHED: self.finished(i, sync) break def temp_file(self): if not os.path.exists(self.tempdir): # in case user deletes it... recreate self.tempdir = tempfile.mkdtemp(suffix=".cloudsync") # prefer big random name over NamedTemp which can infinite loop ret = os.path.join(self.tempdir, os.urandom(16).hex()) log.debug("tempdir %s -> %s", self.tempdir, ret) return ret def finished(self, side, sync): sync[side].changed = 0 # todo: changing the state above should signal this call below self.state.finished(sync) # todo: likewise... clearing the changebit is enough to know to clear temps self.clean_temps(sync) @staticmethod def clean_temps(sync): # todo: move this to the sync obj for side in (LOCAL, REMOTE): if sync[side].temp_file: try: os.unlink(sync[side].temp_file) except FileNotFoundError: pass except OSError as e: log.debug("exception unlinking %s", e) except Exception as e: # any exceptions here are pointless log.warning("exception unlinking %s", e) sync[side].temp_file = None def download_changed(self, changed, sync): sync[changed].temp_file = sync[changed].temp_file or self.temp_file() assert sync[changed].oid if os.path.exists(sync[changed].temp_file): return True try: partial_temp = sync[changed].temp_file + ".tmp" log.debug("%s download %s to %s", self.providers[changed], sync[changed].oid, partial_temp) with open(partial_temp, "wb") as f: self.providers[changed].download(sync[changed].oid, f) os.rename(partial_temp, sync[changed].temp_file) return True except PermissionError as e: raise CloudTemporaryError("download or rename exception %s" % e) except CloudFileNotFoundError: log.debug("download from %s failed fnf, switch to not exists", self.providers[changed].name) sync[changed].exists = TRASHED return False def get_folder_file_conflict(self, sync, translated_path, synced): # if a non-dir file exists with the same name on the sync side syents = list(self.state.lookup_path(synced, translated_path)) conflicts = [ent for ent in syents if ent[synced].exists != TRASHED and ent != sync and ent[synced].otype != DIRECTORY] nc = [] for ent in conflicts: info = self.providers[synced].info_oid(ent[synced].oid) if not info: ent.exists = TRASHED else: nc.append(ent) return nc[0] if nc else None def mkdir_synced(self, changed, sync, translated_path): synced = other_side(changed) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] if ents: for ent in ents: if ent[changed].otype == DIRECTORY: # these we can toss, they are other folders # keep the current one, since it exists for sure log.debug("discard %s", ent) self.discard_entry(ent) ents = [ent for ent in ents if not ent.discarded] ents = [ent for ent in ents if TRASHED not in ( ent[changed].exists, ent[synced].exists)] if ents: if not sync.punted: log.debug("punt mkdir") sync.punt() return REQUEUE self.rename_to_fix_conflict(sync, synced, translated_path) try: log.debug("translated %s as path %s", sync[changed].path, translated_path) # could have made a dir that already existed on my side or other side chents = list(self.state.lookup_path(changed, sync[changed].path)) notme_chents = [ent for ent in chents if ent != sync] for ent in notme_chents: # dup dirs on remote side can be ignored if ent[synced].otype == DIRECTORY: log.debug("discard duplicate dir entry %s", ent) self.discard_entry(ent) cent = self.get_folder_file_conflict(sync, translated_path, synced) if cent: log.debug("resolve %s conflict with %s", translated_path, cent) self.resolve_conflict((sync[changed], cent[synced])) return FINISHED # make the dir oid = self.providers[synced].mkdirs(translated_path) log.debug("mkdir %s as path %s oid %s", self.providers[synced].name, translated_path, debug_sig(oid)) # did i already have that oid? if so, chuck it already_dir = self.state.lookup_oid(synced, oid) if already_dir and already_dir != sync and already_dir[synced].otype == DIRECTORY: log.debug("discard %s", already_dir) self.discard_entry(already_dir) sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=oid, path=translated_path) return FINISHED except CloudFileNotFoundError: if not sync.punted: sync.punt() return REQUEUE log.debug("mkdir %s : %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].name, translated_path) raise NotImplementedError("TODO mkdir, and make state etc") def upload_synced(self, changed, sync): assert sync[changed].temp_file synced = other_side(changed) try: info = self.providers[synced].upload( sync[synced].oid, open(sync[changed].temp_file, "rb")) log.debug("upload to %s as path %s", self.providers[synced].name, sync[synced].sync_path) sync[synced].hash = info.hash sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = sync[synced].path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path) return True except FileNotFoundError: log.info("FNF during upload %s:%s", sync[synced].sync_path, sync[changed].temp_file) return False except CloudFileNotFoundError: info = self.providers[synced].info_oid(sync[synced].oid) if not info: log.debug("convert to unsynced") sync[synced].exists = TRASHED else: # you got an FNF during upload, but when you queried... it existed # basically this is just a "retry" or something log.warning("Upload to %s failed fnf, info: %s", self.providers[synced].name, info) return False except CloudFileExistsError: # this happens if the remote oid is a folder log.debug("split bc upload to folder") defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) return self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def _create_synced(self, changed, sync, translated_path): synced = other_side(changed) log.debug("create on %s as path %s", self.providers[synced].name, translated_path) try: with open(sync[changed].temp_file, "rb") as f: info = self.providers[synced].create(translated_path, f) log.debug("created %s", info) except CloudFileExistsError: log.debug("exists error %s", translated_path) info = self.providers[synced].info_path(translated_path) if not info: raise with open(sync[changed].temp_file, "rb") as f: existing_hash = self.providers[synced].hash_data(f) if existing_hash != info.hash: raise log.debug("use existing %s", info) except Exception as e: log.debug("failed to create %s, %s", translated_path, e) raise assert info.hash assert sync[changed].hash sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = translated_path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path, hash=info.hash) def update_entry(self, ent, side, oid, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin # updates entry without marking as changed unless explicit # used internally self.state.update_entry(ent, side, oid, path=path, hash=hash, exists=exists, changed=changed, otype=otype) def change_state(self, side, otype, oid, *, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin # looks up oid and changes state, marking changed as if it's an event # used only for testing self.state.update(side, otype, oid, path=path, hash=hash, exists=exists, prior_oid=prior_oid) def create_synced(self, changed, sync, translated_path): synced = other_side(changed) try: self._create_synced(changed, sync, translated_path) return FINISHED except CloudFileNotFoundError: # parent presumably exists parent = self.providers[changed].dirname(sync[changed].path) log.debug("make %s first before %s", parent, sync[changed].path) ents = self.state.lookup_path(changed, parent) if not ents: info = self.providers[changed].info_path(parent) if info: self.state.update(changed, DIRECTORY, info.oid, path=parent) else: log.info("no info and no dir, ignoring?") else: if not ents[0][changed].changed: self.update_entry(ents[0], changed, ents[0][changed].oid, changed=True) log.debug("updated entry %s", parent) sync.punt() return REQUEUE except CloudFileExistsError: # there's a file or folder in the way, let that resolve if possible log.debug("can't create %s, try punting", translated_path) if sync.punted > 0: info = self.providers[synced].info_path(translated_path) if not info: log.debug("got a file exists, and then it didn't exist %s", sync) sync.punt() return REQUEUE sync[synced].oid = info.oid sync[synced].hash = info.hash sync[synced].path = translated_path self.update_entry(sync, synced, info.oid, path=translated_path) # maybe it's a hash conflict sync.punt() return REQUEUE else: # maybe it's a name conflict sync.punt() return REQUEUE def __resolve_file_likes(self, side_states): fhs = [] for ss in side_states: assert type(ss) is SideState if not ss.temp_file and ss.otype == FILE: ss.temp_file = self.temp_file() fhs.append(ResolveFile(ss, self.providers[ss.side])) assert ss.oid return fhs def __safe_call_resolver(self, fhs): if DIRECTORY in (fhs[0].otype, fhs[1].otype): if fhs[0].otype != fhs[1].otype: if fhs[0].otype == DIRECTORY: fh = fhs[0] else: fh = fhs[1] # for simplicity: directory conflicted with file, always favors directory return (fh, True) is_file_like = lambda f: hasattr(f, "read") and hasattr(f, "close") ret = None try: ret = self.__resolve_conflict(*fhs) if ret: if len(ret) != 2: log.error("bad return value for resolve conflict %s", ret) ret = None if not is_file_like(ret[0]): log.error("bad return value for resolve conflict %s", ret) ret = None except Exception as e: log.exception("exception during conflict resolution %s", e) if ret is None: # we defer to the remote... since this can prevent loops if fhs[0].side == REMOTE: ret = (fhs[0], True) else: ret = (fhs[1], True) return ret def __resolver_merge_upload(self, side_states, fh, keep): # we modified to both sides, need to merge the entries ent1, ent2 = side_states defer_ent = self.state.lookup_oid(ent1.side, ent1.oid) replace_ent = self.state.lookup_oid(ent2.side, ent2.oid) if keep: # both sides are being kept, so we have to upload since there are no entries fh.seek(0) info1 = self.providers[ent1.side].create(ent1.path, fh) fh.seek(0) info2 = self.providers[ent2.side].create(ent2.path, fh) ent1.oid = info1.oid ent2.oid = info2.oid self.update_entry(defer_ent, ent1.side, ent1.oid, path=ent1.path, hash=ent1.hash) ent1 = defer_ent[ent1.side] ent2 = defer_ent[ent2.side] assert info2.hash assert info1.hash ent2.sync_hash = info2.hash ent2.sync_path = info2.path ent1.sync_hash = info1.hash ent1.sync_path = info1.path # in case oids have changed self.update_entry(defer_ent, ent2.side, ent2.oid, path=ent2.path, hash=ent2.hash) defer_ent[ent2.side].sync_hash = ent2.sync_hash defer_ent[ent2.side].sync_path = ent2.sync_path self.discard_entry(replace_ent) def resolve_conflict(self, side_states): # pylint: disable=too-many-statements fhs = self.__resolve_file_likes(side_states) fh, keep = self.__safe_call_resolver(fhs) log.debug("got ret side %s", getattr(fh, "side", None)) defer = None for i, rfh in enumerate(fhs): this = side_states[i] that = side_states[1-i] if fh is not rfh: # user didn't opt to keep my rfh log.debug("replacing %s", this.side) if not keep: fh.seek(0) info2 = self.providers[this.side].upload(this.oid, fh) this.hash = info2.hash assert info2.hash that.sync_hash = that.hash that.sync_path = that.path this.sync_hash = this.hash this.sync_path = this.path else: try: self._resolve_rename(this) except CloudFileNotFoundError: log.debug("there is no conflict, because the file doesn't exist? %s", this) if defer is None: defer = that.side else: defer = None if defer is not None: # toss the other side that was replaced sorted_states = sorted(side_states, key=lambda e: e.side) replace_side = other_side(defer) replace_ent = self.state.lookup_oid(replace_side, sorted_states[replace_side].oid) if replace_ent: self.discard_entry(replace_ent) else: # both sides were modified.... self.__resolver_merge_upload(side_states, fh, keep) log.debug("RESOLVED CONFLICT: %s dide: %s", side_states, defer) log.debug("table\r\n%s", self.state.pretty_print()) def delete_synced(self, sync, changed, synced): log.debug("try sync deleted %s", sync[changed].path) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] # ents2 = list(self.state.lookup_path(synced, sync[synced].path)) # ents += [ent for ent in ents2 if ent != sync] for ent in ents: if ent.is_creation(synced): log.debug("discard delete, pending create %s:%s", synced, ent) self.discard_entry(sync) return FINISHED if sync[synced].oid: try: self.providers[synced].delete(sync[synced].oid) except CloudFileNotFoundError: pass except CloudFileExistsError: log.debug("kids exist, punt %s", sync[changed].path) sync.punt() return REQUEUE else: log.debug("was never synced, ignoring deletion") sync[synced].exists = TRASHED self.discard_entry(sync) return FINISHED def check_disjoint_create(self, sync, changed, synced, translated_path): # check for creation of a new file with another in the table if sync[changed].otype != FILE: return False ents = list(self.state.lookup_path(synced, translated_path)) # filter for exists other_ents = [ent for ent in ents if ent != sync] if not other_ents: return False log.debug("found matching %s, other ents: %s", translated_path, other_ents) # ignoring trashed entries with different oids on the same path if all(TRASHED in (ent[synced].exists, ent[changed].exists) for ent in other_ents): return False other_untrashed_ents = [ent for ent in other_ents if TRASHED not in ( ent[synced].exists, ent[changed].exists)] if sync.punted == 0: # delaying sometimes helps, because future events can resolve conflicts # it's generally better to wait before conflicting something log.debug("punting, maybe it will fix itself") sync.punt() return True log.debug("split conflict found : %s", other_untrashed_ents) found = None info = self.providers[synced].info_path(translated_path) if info: for e in other_untrashed_ents: if e[synced].oid == info.oid: found = e if not found: return True return self.handle_split_conflict( other_untrashed_ents[0], synced, sync, changed) def handle_path_change_or_creation(self, sync, changed, synced): # pylint: disable=too-many-branches, too-many-return-statements if not sync[changed].path: self.update_sync_path(sync, changed) log.debug("NEW SYNC %s", sync) if sync[changed].exists == TRASHED or sync.discarded: log.debug("requeue trashed event %s", sync) return REQUEUE translated_path = self.translate(synced, sync[changed].path) if translated_path is None: # ignore these return FINISHED if not sync[changed].path: log.debug("can't sync, no path %s", sync) if sync.is_creation(changed): # never synced this before, maybe there's another local path with # the same name already? if self.check_disjoint_create(sync, changed, synced, translated_path): log.debug("disjoint, requeue") return REQUEUE if sync.is_creation(changed): assert not sync[changed].sync_hash # looks like a new file if sync[changed].otype == DIRECTORY: return self.mkdir_synced(changed, sync, translated_path) if not self.download_changed(changed, sync): return REQUEUE if sync[synced].oid and sync[synced].exists != TRASHED: if self.upload_synced(changed, sync): return FINISHED return REQUEUE return self.create_synced(changed, sync, translated_path) # handle rename # use == to allow rename for case reasons # todo: need a paths_match flag instead, so slashes don't break this line if sync[synced].sync_path == translated_path: return FINISHED assert sync[synced].sync_hash or sync[synced].otype == DIRECTORY log.debug("rename %s %s", sync[synced].sync_path, translated_path) try: new_oid = self.providers[synced].rename(sync[synced].oid, translated_path) except CloudFileNotFoundError as e: log.debug("ERROR: can't rename for now %s: %s", sync, e) if sync.punted > 5: log.exception("punted too many times, giving up") return FINISHED else: log.debug("fnf, punt") sync.punt() return REQUEUE except CloudFileExistsError: log.debug("can't rename, file exists") if sync.punted: log.debug("rename for conflict") self.rename_to_fix_conflict(sync, synced, translated_path) sync.punt() return REQUEUE sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, path=translated_path, oid=new_oid) return FINISHED def _resolve_rename(self, replace): replace_ent = self.state.lookup_oid(replace.side, replace.oid) if not replace_ent: return False _old_oid, new_oid, new_name = self.conflict_rename(replace.side, replace.path) if new_name is None: return False self.update_entry(replace_ent, side=replace.side, oid=new_oid) return True def rename_to_fix_conflict(self, sync, side, path): old_oid, new_oid, new_name = self.conflict_rename(side, path) if new_name is None: return False log.debug("rename to fix conflict %s -> %s", sync[side].path, new_name) # file can get renamed back, if there's a cycle if old_oid == sync[side].oid: self.update_entry(sync, side=side, oid=new_oid) else: ent = self.state.lookup_oid(side, old_oid) if ent: self.update_entry(ent, side=side, oid=new_oid) return True def conflict_rename(self, side, path): folder, base = self.providers[side].split(path) if base == "": raise ValueError("bad path %s" % path) index = base.find(".") if index >= 0: ext = base[index:] base = base[:index] else: base = base ext = "" oinfo = self.providers[side].info_path(path) if not oinfo: return None, None, None i = 1 new_oid = None conflict_name = base + ".conflicted" + ext while new_oid is None: try: conflict_path = self.providers[side].join(folder, conflict_name) new_oid = self.providers[side].rename(oinfo.oid, conflict_path) except CloudFileExistsError: i = i + 1 conflict_name = base + ".conflicted" + str(i) + ext log.debug("conflict renamed: %s -> %s", path, conflict_path) return oinfo.oid, new_oid, conflict_path def discard_entry(self, sync): if sync: sync.discard() self.state.storage_update(sync) def embrace_change(self, sync, changed, synced): # pylint: disable=too-many-return-statements if sync.discarded: log.warning("discarded!") return FINISHED log.debug("embrace %s, side:%s", sync, changed) if sync[changed].path: translated_path = self.translate(synced, sync[changed].path) if not translated_path: log.log(TRACE, ">>>Not a cloud path %s, discard", sync[changed].path) self.discard_entry(sync) return FINISHED # parent_conflict code # todo: make this walk up parents to the translate == None "root" conflict = self._get_parent_conflict(sync, changed) if conflict: log.debug("parent modify %s should happen first %s", sync[changed].path, conflict) conflict[changed].set_aged() sync.punt() return REQUEUE if sync[changed].exists == TRASHED: log.debug("delete") return self.delete_synced(sync, changed, synced) if sync.is_path_change(changed) or sync.is_creation(changed): ret = self.handle_path_change_or_creation(sync, changed, synced) if ret == REQUEUE: log.debug("requeue, not handled") return ret if sync[changed].hash != sync[changed].sync_hash: return self.handle_hash_diff(sync, changed, synced) log.debug("nothing changed %s", sync) return FINISHED def handle_hash_diff(self, sync, changed, synced): if sync[changed].sync_hash is None: sync[changed].sync_path = None # creation must have failed log.warning("needs create: %s index: %s bc %s != %s", sync, synced, sync[changed].hash, sync[changed].sync_hash) return REQUEUE # not a new file, which means we must have last sync info if sync[synced].exists == TRASHED: log.debug("dont upload to trashed, zero out trashed side") # not an upload sync[synced].exists = UNKNOWN sync[synced].hash = None sync[synced].changed = 0 sync[synced].path = None sync[synced].oid = None sync[synced].sync_path = None sync[synced].sync_hash = None sync[changed].sync_path = None sync[changed].sync_hash = None return REQUEUE log.debug("needs upload: %s index: %s bc %s != %s", sync, synced, sync[changed].hash, sync[changed].sync_hash) assert sync[synced].oid if not self.download_changed(changed, sync): return REQUEUE if not self.upload_synced(changed, sync): return REQUEUE return FINISHED def update_sync_path(self, sync, changed): assert sync[changed].oid info = self.providers[changed].info_oid(sync[changed].oid) if not info: sync[changed].exists = TRASHED return if not info.path: log.warning("impossible sync, no path. " "Probably a file that was shared, but not placed into a folder. discarding. %s", sync[changed]) self.discard_entry(sync) return log.debug("UPDATE PATH %s->%s", sync, info.path) self.update_entry( sync, changed, sync[changed].oid, path=info.path, exists=True) def handle_hash_conflict(self, sync): log.debug("splitting hash conflict %s", sync) # split the sync in two defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync) return self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def handle_split_conflict(self, defer_ent, defer_side, replace_ent, replace_side): if defer_ent[defer_side].otype == FILE: if not self.download_changed(defer_side, defer_ent): return False try: with open(defer_ent[defer_side].temp_file, "rb") as f: dhash = self.providers[replace_side].hash_data(f) if dhash == replace_ent[replace_side].hash: log.debug("same hash as remote, discard entry") self.discard_entry(replace_ent) return True except FileNotFoundError: return False self.resolve_conflict((defer_ent[defer_side], replace_ent[replace_side])) return True def handle_path_conflict(self, sync): # consistent handling log.debug("handle path conflict %s", sync) path1 = sync[0].path path2 = sync[1].path if path1 > path2: pick = 0 else: pick = 1 picked = sync[pick] other = sync[other_side(pick)] other_path = self.translate(other.side, picked.path) if other_path is None: return other_info = self.providers[other.side].info_oid(other.oid) log.debug("renaming to handle path conflict: %s -> %s", other.oid, other_path) def _update_syncs(new_oid): self.update_entry(sync, other.side, new_oid, path=other_path) if sync[other.side].sync_path: sync[other.side].sync_path = sync[other.side].path if sync[picked.side].sync_path: sync[picked.side].sync_path = sync[picked.side].path try: if other_info.path == other_path: # don't sync this entry log.warning("supposed rename conflict, but the names are the same") if not sync[other.side].sync_hash and sync[other.side].otype == FILE: log.warning("sync_hashes missing even though the sync_path is set...") sync[other.side].sync_path = None if not sync[picked.side].sync_hash and sync[picked.side].otype == FILE: log.warning("sync_hashes missing even though the sync_path is set...") sync[picked.side].sync_path = None raise CloudFileExistsError() new_oid = self.providers[other.side].rename(other.oid, other_path) _update_syncs(new_oid) except CloudFileExistsError: # other side already agrees _update_syncs(other.oid) def _get_parent_conflict(self, sync: SyncEntry, changed) -> Optional[str]: provider = self.providers[changed] path = sync[changed].path parent = provider.dirname(path) ret = None while path != parent: ents = list(self.state.lookup_path(changed, parent)) for ent in ents: ent: SyncEntry if ent[changed].changed and ent[changed].exists == EXISTS: ret = ent path = parent parent = provider.dirname(path) return ret PKc$O~* * cloudsync/sync/sqlite_storage.pyfrom typing import Dict, Any, Optional import logging import sqlite3 from cloudsync import Storage from cloudsync.log import TRACE log = logging.getLogger(__name__) class SqliteStorage(Storage): def __init__(self, filename: str): self._filename = filename self._ensure_table_exists() def _ensure_table_exists(self): self.db = sqlite3.connect(self._filename, uri=self._filename.startswith('file:'), check_same_thread=self._filename == ":memory:", timeout=5, isolation_level=None, ) self.db.execute("PRAGMA journal_mode=WAL;") self.db.execute("PRAGMA busy_timeout=5000;") # Not using AUTOINCREMENT: http://www.sqlitetutorial.net/sqlite-autoincrement/ self.db.execute('CREATE TABLE IF NOT EXISTS cloud (id INTEGER PRIMARY KEY, ' 'tag TEXT NOT NULL, serialization BLOB)') def create(self, tag: str, serialization: bytes) -> Any: db_cursor = self.db.execute('INSERT INTO cloud (tag, serialization) VALUES (?, ?)', [tag, serialization]) eid = db_cursor.lastrowid return eid def update(self, tag: str, serialization: bytes, eid: Any) -> int: log.log(TRACE, "updating eid%s", eid) db_cursor = self.db.execute('UPDATE cloud SET serialization = ? WHERE id = ? AND tag = ?', [serialization, eid, tag]) ret = db_cursor.rowcount if ret == 0: raise ValueError("id %s doesn't exist" % eid) return ret def delete(self, tag: str, eid: Any): log.log(TRACE, "deleting eid%s", eid) db_cursor = self.db.execute('DELETE FROM cloud WHERE id = ? AND tag = ?', [eid, tag]) if db_cursor.rowcount == 0: log.debug("ignoring delete: id %s doesn't exist", eid) return def read_all(self, tag: str) -> Dict[Any, bytes]: ret = {} db_cursor = self.db.execute('SELECT id, serialization FROM cloud WHERE tag = ?', [tag]) for row in db_cursor.fetchall(): eid, serialization = row ret[eid] = serialization return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: db_cursor = self.db.execute('SELECT serialization FROM cloud WHERE id = ? and tag = ?', [eid, tag]) for row in db_cursor.fetchall(): return row return None PKc$O $?rrcloudsync/sync/state.py# pylint: disable=attribute-defined-outside-init, protected-access """ SyncEntry[SideState, SideState] is a pair of entries, indexed by oid. The SideState class makes extensive use of __getattr__ logic to keep indexes up to date. There may be no need to keep them in "pairs". This is artificial. States should probably be altered to independent, and not paired at all. """ import copy import json import logging import time import traceback from threading import RLock from abc import ABC, abstractmethod from enum import Enum from typing import Optional, Tuple, Any, List, Dict, Set from typing import Union from cloudsync.types import DIRECTORY, FILE, NOTKNOWN from cloudsync.types import OType from cloudsync.scramble import scramble from cloudsync.log import TRACE from .util import debug_sig log = logging.getLogger(__name__) __all__ = ['SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] # adds a repr to some classes class Reprable: # pylint: disable=too-few-public-methods def __repr__(self): return self.__class__.__name__ + ":" + debug_sig(id(self)) + str(self.__dict__) # safe ternary, don't allow traditional comparisons class Exists(Enum): UNKNOWN = None EXISTS = True TRASHED = False def __bool__(self): raise ValueError("never bool enums") UNKNOWN = Exists.UNKNOWN EXISTS = Exists.EXISTS TRASHED = Exists.TRASHED # state of a single object class SideState(Reprable): # pylint: disable=too-few-public-methods, too-many-instance-attributes def __init__(self, parent: 'SyncEntry', side: int, otype: OType): self._frozen = False self._parent = parent self._side: int = side # just for assertions self._otype: OType = otype self._hash: Optional[bytes] = None # hash at provider # time of last change (we maintain this) self._changed: Optional[float] = None self._sync_hash: Optional[bytes] = None # hash at last sync self._sync_path: Optional[str] = None # path at last sync self._path: Optional[str] = None # path at provider self._oid: Optional[str] = None # oid at provider self._exists: Exists = UNKNOWN # exists at provider self._temp_file: Optional[str] = None self._frozen = True def __getattr__(self, k): if k[0] != "_": return getattr(self, "_" + k) raise AttributeError("%s not in SideState" % k) def __setattr__(self, k, v): if k[0] == "_": object.__setattr__(self, k, v) return if self._frozen: if "_" + k not in self.__dict__: raise AttributeError("%s not in SideState" % k) self._parent.updated(self._side, k, v) if k == "exists": self._set_exists(v) else: object.__setattr__(self, "_" + k, v) def _set_exists(self, val: Union[bool, Exists]): if val is False: val = TRASHED if val is True: val = EXISTS if val is None: val = UNKNOWN if type(val) != Exists: raise ValueError("use enum for exists") self._exists = val self._parent.updated(self._side, "exists", val) def set_aged(self): # setting to an old mtime marks this as fully aged self.changed = 1 # these are not really local or remote # but it's easier to reason about using these labels LOCAL = 0 REMOTE = 1 def other_side(index): return 1-index class Storage(ABC): @abstractmethod def create(self, tag: str, serialization: bytes) -> Any: """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def update(self, tag: str, serialization: bytes, eid: Any) -> int: """ take a serialization str, update it in sqlite, return the count of rows updated """ ... @abstractmethod def delete(self, tag: str, eid: Any): """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def read_all(self, tag: str) -> Dict[Any, bytes]: """yield all the serialized strings in a generator""" ... @abstractmethod def read(self, tag: str, eid: Any) -> Optional[bytes]: """return one serialized string or None""" ... # single entry in the syncs state collection class SyncEntry(Reprable): # pylint: disable=too-many-instance-attributes def __init__(self, parent: 'SyncState', otype: OType, storage_init: Optional[Tuple[Any, bytes]] = None): super().__init__() self._frozen = False self.__states: List[SideState] = [SideState(self, 0, otype), SideState(self, 1, otype)] self._discarded: str = "" self._storage_id: Any = None self._dirty: bool = True self._punted: int = 0 self._parent = parent if storage_init is not None: self._storage_id = storage_init[0] self.deserialize(storage_init) self._dirty = False self._frozen = True log.debug("new syncent %s", debug_sig(id(self))) def __getattr__(self, k): if k[0] != "_": return getattr(self, "_" + k) raise AttributeError("%s not in SyncEntry" % k) def __setattr__(self, k, v): if k[0] == "_": object.__setattr__(self, k, v) return if self._frozen: if "_" + k not in self.__dict__: raise AttributeError("%s not in SyncEntry" % k) self.updated(None, k, v) object.__setattr__(self, "_" + k, v) def updated(self, side, key, val): self._dirty = True self._parent.updated(self, side, key, val) def serialize(self) -> bytes: """converts SyncEntry into a json str""" def side_state_to_dict(side_state: SideState) -> dict: ret = dict() ret['otype'] = side_state.otype.value ret['side'] = side_state.side ret['hash'] = side_state.hash.hex() if isinstance( side_state.hash, bytes) else None ret['changed'] = side_state.changed ret['sync_hash'] = side_state.sync_hash.hex() if isinstance( side_state.sync_hash, bytes) else None ret['path'] = side_state.path ret['sync_path'] = side_state.sync_path ret['oid'] = side_state.oid ret['exists'] = side_state.exists.value ret['temp_file'] = side_state.temp_file # storage_id does not get serialized, it always comes WITH a serialization when deserializing return ret ser = dict() ser['side0'] = side_state_to_dict(self.__states[0]) ser['side1'] = side_state_to_dict(self.__states[1]) ser['discarded'] = self.discarded return json.dumps(ser).encode('utf-8') def deserialize(self, storage_init: Tuple[Any, bytes]): """loads the values in the serialization dict into self""" def dict_to_side_state(side, side_dict: dict) -> SideState: otype = OType(side_dict['otype']) side_state = SideState(self, side, otype) side_state.side = side_dict['side'] side_state.hash = bytes.fromhex( side_dict['hash']) if side_dict['hash'] else None side_state.changed = side_dict['changed'] side_state.sync_hash = bytes.fromhex( side_dict['sync_hash']) if side_dict['sync_hash'] else None side_state.sync_path = side_dict['sync_path'] side_state.path = side_dict['path'] side_state.oid = side_dict['oid'] side_state.exists = side_dict['exists'] side_state.temp_file = side_dict['temp_file'] return side_state self.storage_id = storage_init[0] ser: dict = json.loads(storage_init[1].decode('utf-8')) self.__states = [dict_to_side_state(0, ser['side0']), dict_to_side_state(1, ser['side1'])] self.discarded = ser['discarded'] def __getitem__(self, i): return self.__states[i] def __setitem__(self, side, val): # can't really move items.. just copy stuff new_path = val._path new_oid = val._oid # old value is no longer in charge of anything val.path = None val.oid = None # new value rulez val = copy.copy(val) val._path = new_path val._oid = new_oid val._parent = self assert type(val) is SideState assert val.side == side # we need to ensure a valid oid is present when changing the path if val._oid is None: self.updated(side, "path", val._path) self.updated(side, "oid", val._oid) else: self.updated(side, "oid", val._oid) self.updated(side, "path", val._path) self.updated(side, "changed", val._changed) self.__states[side] = copy.copy(val) def hash_conflict(self): if self[0].hash and self[1].hash: return self[0].hash != self[0].sync_hash and self[1].hash != self[1].sync_hash return False def is_path_change(self, changed): return self[changed].path != self[changed].sync_path def is_creation(self, changed): return not self[changed].sync_path and self[changed].path def discard(self): self.discarded = ''.join(traceback.format_stack()) @staticmethod def prettyheaders(): ret = "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s" % ( "EID", # _sig(id(self)), "SID", # _sig(self.storage_id), "Typ", # otype, "Change", # secs(self[LOCAL].changed), "Path", # self[LOCAL].path, "OID", # _sig(self[LOCAL].oid), "Last Sync Path E H", # str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, "Change", # secs(self[REMOTE].changed), "Path", # self[REMOTE].path, "OID", # _sig(self[REMOTE].oid), "Last Sync Path ", # str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, "Punt", # self.punted or "" ) return ret def pretty(self, fixed=True, use_sigs=True): if self.discarded: return "DISCARDED" def secs(t): if t: return str(round(t % 300, 3)).replace(".", "") else: return 0 def abbrev_bool(b, tup=('T', 'F', '?')): idx = 1-int(bool(b)) if b is None: idx = 2 return tup[idx] lexv = abbrev_bool(self[LOCAL].exists.value, ("E", "X", "?")) rexv = abbrev_bool(self[REMOTE].exists.value, ("E", "X", "?")) lhma = abbrev_bool(self[LOCAL].hash and self[LOCAL].sync_hash != self[LOCAL].hash, ("H", "=", "?")) rhma = abbrev_bool(self[REMOTE].hash and self[REMOTE].sync_hash != self[REMOTE].hash, ("H", "=", "?")) if use_sigs: _sig = debug_sig else: _sig = lambda a: a local_otype = self[LOCAL].otype.value if self[LOCAL].otype else '?' remote_otype = self[REMOTE].otype.value if self[REMOTE].otype else '?' if local_otype != remote_otype: otype = local_otype[0] + "-" + remote_otype[0] else: otype = local_otype if not fixed: return str((_sig(id(self)), otype, (secs(self[LOCAL].changed), self[LOCAL].path, _sig( self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma), (secs(self[REMOTE].changed), self[REMOTE].path, _sig( self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma), self._punted)) ret = "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s" % ( _sig(id(self)), _sig(self.storage_id), otype[:3], secs(self[LOCAL].changed), self[LOCAL].path, _sig(self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, secs(self[REMOTE].changed), self[REMOTE].path, _sig(self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, self._punted or "" ) return ret def __str__(self): return self.pretty(fixed=False) def store(self, tag: str, storage: Storage): if not self.storage_id: self.storage_id = storage.create(tag, self.serialize()) else: storage.update(tag, self.serialize(), self.storage_id) def punt(self): # do this one later # TODO provide help for making sure that we don't punt too many times self.punted += 1 # pylint: disable=no-member if self.punted > 2: # pylint: disable=no-member # slow down if self[LOCAL].changed: self[LOCAL].changed = time.time() if self[REMOTE].changed: self[REMOTE].changed = time.time() class SyncState: # pylint: disable=too-many-instance-attributes def __init__(self, providers, storage: Optional[Storage] = None, tag: Optional[str] = None, shuffle=True): self._oids = ({}, {}) self._paths = ({}, {}) self._changeset = set() self._storage: Optional[Storage] = storage self._tag = tag self.providers = providers assert len(providers) == 2 self.lock = RLock() self.cursor_id = dict() self.shuffle = shuffle self._loading = False if self._storage: self._loading = True storage_dict = self._storage.read_all(tag) for eid, ent_ser in storage_dict.items(): ent = SyncEntry(self, None, (eid, ent_ser)) for side in [LOCAL, REMOTE]: path, oid = ent[side].path, ent[side].oid if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][oid] = ent self._oids[side][oid] = ent if ent[side].changed: self._changeset.add(ent) self._loading = False def updated(self, ent, side, key, val): if self._loading: return assert key if key == "path": self._change_path(side, ent, val, self.providers[side]) elif key == "oid": self._change_oid(side, ent, val) elif key == "changed": if val or ent[other_side(side)].changed: self._changeset.add(ent) else: self._changeset.discard(ent) def _change_path(self, side, ent, path, provider): assert type(ent) is SyncEntry if path: assert ent[side].oid prior_ent = ent prior_path = ent[side].path if prior_path == path: return if prior_path and prior_path in self._paths[side]: prior_ent = self._paths[side][prior_path].pop(ent[side].oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] prior_ent = None if path: if path not in self._paths[side]: self._paths[side][path] = {} path_ents = self._paths[side][path] if ent[side].oid in path_ents: prior_ent = path_ents[ent[side].oid] assert prior_ent is not ent # ousted this ent prior_ent._path = None self._paths[side][path][ent[side].oid] = ent ent[side]._path = path self._update_kids(ent, side, prior_path, path, provider) def _update_kids(self, ent, side, prior_path, path, provider): if ent[side].otype == DIRECTORY and prior_path != path and not prior_path is None: # changing directory also changes child paths for sub in self.get_all(): if not sub[side].path: continue relative = provider.is_subpath(prior_path, sub[side].path, strict=True) if relative: new_path = provider.join(path, relative) sub[side].path = new_path if provider.oid_is_path: # TODO: state should not do online hits esp from event manager # either # a) have event manager *not* trigger this, maybe by passing none as the provider, etc # this may have knock on effects where the sync engine needs to process parent folders first # b) have a special oid_from_path function that is guaranteed not to be "online" # assert not _api() called, etc. new_info = provider.info_path(new_path) if new_info: sub[side].oid = new_info.oid def _change_oid(self, side, ent, oid): assert type(ent) is SyncEntry prior_oid = ent[side].oid prior_ent = None if prior_oid: prior_ent = self._oids[side].pop(prior_oid, None) if prior_ent: if prior_ent[side].path: prior_path = prior_ent[side].path if prior_path in self._paths[side]: self._paths[side][prior_path].pop(prior_oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] if prior_ent is not ent: # no longer indexed by oid, also clear change bit prior_ent._oid = None prior_ent[side].changed = False if oid: ent[side]._oid = oid self._oids[side][oid] = ent assert self.lookup_oid(side, oid) is ent if oid and ent[side].path: if ent[side].path not in self._paths[side]: self._paths[side][ent[side].path] = {} self._paths[side][ent[side].path][oid] = ent if oid: assert self.lookup_oid(side, oid) is ent def lookup_oid(self, side, oid): try: ret = self._oids[side][oid] return ret except KeyError: return None def lookup_path(self, side, path, stale=False): try: ret = self._paths[side][path].values() if ret: return [e for e in ret if stale or not e.discarded] return [] except KeyError: return [] def rename_dir(self, side, from_dir, to_dir, is_subpath, replace_path): """ when a directory changes, utility to rename all kids """ remove = [] # TODO: refactor this so that a list of affected items is gathered, then the alterations happen to the final # list, which will avoid having to remove after adding, which feels mildly risky # TODO: is this function called anywhere? ATM, it looks like no... It should be called or removed for path, oid_dict in self._paths[side].items(): if is_subpath(from_dir, path): new_path = replace_path(path, from_dir, to_dir) remove.append(path) self._paths[side][new_path] = oid_dict for ent in oid_dict.values(): ent[side].path = new_path for path in remove: self._paths[side].pop(path) def update_entry(self, ent, side, oid, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin, too-many-arguments assert ent if oid is not None: if ent.discarded: if self.providers[side].oid_is_path: if path and "conflicted" not in path: if otype: log.log(TRACE, "dropping old entry %s, and making new", ent) ent = SyncEntry(self, otype) ent[side].oid = oid if oid and not ent.discarded: assert ent in self.get_all() if otype is not None: ent[side].otype = otype assert otype is not NOTKNOWN or not exists if path is not None: ent[side].path = path if hash is not None: ent[side].hash = hash ent.dirty = True if exists is not None and exists is not ent[side].exists: assert type(ent[side]) is SideState ent[side].exists = exists ent.dirty = True assert type(ent[side].exists) is Exists if changed and not ent.discarded: assert ent[side].path or ent[side].oid log.log(TRACE, "add %s to changeset", ent) self.mark_changed(side, ent) log.log(TRACE, "updated %s", ent) def mark_changed(self, side, ent): if not ent.discarded: ent[side].changed = time.time() assert ent in self._changeset def storage_get_cursor(self, cursor_tag): if cursor_tag is None: return None retval = None if self._storage is not None: if cursor_tag in self.cursor_id: retval = self._storage.read(cursor_tag, self.cursor_id[cursor_tag]) if not retval: cursors = self._storage.read_all(cursor_tag) for eid, cursor in cursors.items(): self.cursor_id[cursor_tag] = eid retval = cursor if len(cursors) > 1: log.warning("Multiple cursors found for %s", cursor_tag) log.debug("storage_get_cursor id=%s cursor=%s", cursor_tag, str(retval)) return retval def storage_update_cursor(self, cursor_tag, cursor): if cursor_tag is None: return updated = 0 if self._storage is not None: if cursor_tag in self.cursor_id and self.cursor_id[cursor_tag]: updated = self._storage.update(cursor_tag, cursor, self.cursor_id[cursor_tag]) log.log(TRACE, "storage_update_cursor cursor %s %s", cursor_tag, cursor) if not updated: self.cursor_id[cursor_tag] = self._storage.create(cursor_tag, cursor) log.log(TRACE, "storage_update_cursor cursor %s %s", cursor_tag, cursor) def storage_update(self, ent: SyncEntry): log.log(TRACE, "storage_update eid%s", ent.storage_id) if self._storage is not None: if ent.storage_id is not None: self._storage.update(self._tag, ent.serialize(), ent.storage_id) else: new_id = self._storage.create(self._tag, ent.serialize()) ent.storage_id = new_id log.debug("storage_update creating eid%s", ent.storage_id) ent.dirty = False def __len__(self): return len(self.get_all()) def update(self, side, otype, oid, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin, too-many-arguments log.log(TRACE, "lookup %s", debug_sig(oid)) ent = self.lookup_oid(side, oid) prior_ent = None if prior_oid and prior_oid != oid: prior_ent = self.lookup_oid(side, prior_oid) if not ent and prior_ent and not prior_ent.discarded: ent = prior_ent prior_ent = None if prior_oid and prior_oid != oid: # this is an oid_is_path provider path_ents = self.lookup_path(side, path, stale=True) if path_ents: if not ent: ent = path_ents[0] ent.discarded = False log.debug("matched existing entry %s:%s", debug_sig(oid), path) elif ent is not path_ents[0]: path_ents[0].discard() self.storage_update(path_ents[0]) log.debug("discarded existing entry %s:%s", debug_sig(oid), path) if not ent: log.debug("creating new entry because %s not found in %s", debug_sig(oid), side) ent = SyncEntry(self, otype) self.update_entry(ent, side, oid, path=path, hash=hash, exists=exists, changed=True, otype=otype) self.storage_update(ent) def change(self, age): if not self._changeset: return None changes = self._changeset if self.shuffle: # at most 20 are randomized changes = scramble(changes, 20) earlier_than = time.time() - age for puntlevel in range(3): for e in changes: if not e.discarded and e.punted == puntlevel: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): return e ret = None remove = [] for e in self._changeset: if e.discarded: remove.append(e) else: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): ret = e for e in remove: self._changeset.discard(e) return ret def has_changes(self): return bool(self._changeset) def finished(self, ent): if ent[1].changed or ent[0].changed: log.info("not marking finished: %s", ent) return self._changeset.discard(ent) for e in self._changeset: e.punted = 0 def pretty_print(self, use_sigs=True): ret = SyncEntry.prettyheaders() + "\n" for e in self.get_all(): e: SyncEntry ret += e.pretty(fixed=True, use_sigs=use_sigs) + "\n" return ret def assert_index_is_correct(self): for ent in self._changeset: if not ent.discarded: assert ent in self.get_all(), ("%s in changeset, not in index" % ent) for ent in self.get_all(): assert ent if ent[LOCAL].path: assert ent in self.lookup_path(LOCAL, ent[LOCAL].path), ("%s local path not indexed" % ent) if ent[REMOTE].path: assert ent in self.lookup_path(REMOTE, ent[REMOTE].path), ("%s remote path not indexed" % ent) if ent[LOCAL].oid: assert ent is self.lookup_oid(LOCAL, ent[LOCAL].oid), ("%s local oid not indexed" % ent) if ent[REMOTE].oid: assert ent is self.lookup_oid(REMOTE, ent[REMOTE].oid), ("%s local oid not indexed" % ent) if ent[LOCAL].changed or ent[REMOTE].changed: if ent not in self._changeset: assert False, ("changeset missing %s" % ent) def get_all(self, discarded=False) -> Set['SyncState']: ents = set() for ent in self._oids[LOCAL].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) for ent in self._oids[REMOTE].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) return ents def entry_count(self): return len(self.get_all()) def split(self, ent): log.debug("splitting %s", ent) defer = REMOTE replace = LOCAL defer_ent = ent replace_ent = SyncEntry(self, ent[replace].otype) assert ent[replace].oid replace_ent[replace] = ent[replace] assert replace_ent[replace].oid if ent[replace].oid: assert replace_ent in self.get_all() if defer_ent[replace].path: assert self.lookup_path(replace, defer_ent[replace].path) defer_ent[replace] = SideState(defer_ent, replace, ent[replace].otype) # clear out assert replace_ent[replace].oid assert replace_ent in self.get_all() self.mark_changed(replace, replace_ent) self.mark_changed(defer, defer_ent) assert replace_ent[replace].oid # we aren't synced replace_ent[replace].sync_path = None replace_ent[replace].sync_hash = None # never synced defer_ent[defer].sync_path = None defer_ent[defer].sync_hash = None log.debug("split: %s", defer_ent) log.debug("split: %s", replace_ent) log.info("SPLIT\n%s", self.pretty_print()) assert replace_ent[replace].oid self.storage_update(defer_ent) self.storage_update(replace_ent) return defer_ent, defer, replace_ent, replace PKOeEDcloudsync/sync/util.pyfrom hashlib import md5 from base64 import b64encode # useful for converting oids and pointer nubmers into digestible nonces def debug_sig(t, size=3): if not t: return 0 return b64encode(md5(str(t).encode()).digest()).decode()[0:size] PKO==cloudsync/tests/__init__.pyfrom .fixtures.util import util from .test_provider import * PKc$OA%{{cloudsync/tests/conftest.pyimport cloudsync from .fixtures import * # pylint: disable=unused-import, unused-wildcard-import, wildcard-import cloudsync.logger.setLevel("TRACE") def pytest_configure(config): config.addinivalue_line("markers", "manual") def pytest_runtest_setup(item): if 'manual' in item.keywords and not item.config.getoption("--manual"): pytest.skip("need --manual option to run this test") def pytest_addoption(parser): parser.addoption("--provider", action="append", default=[], help="provider(s) to run tests for") parser.addoption("--manual", action="store_true", default=False, help="run the manual tests") PKc$Ov&bcloudsync/tests/pytest.ini[pytest] log_format = %(asctime)s p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s: %(message)s log_date_format = %Y-%m-%d %H:%M:%S log_level = 5 log_cli = true # dont change this to debug, it will log passing tests as debug log_cli_level = warning PKc$O3(_R_Rcloudsync/tests/test_cs.pyfrom io import BytesIO import logging import pytest from typing import List from .fixtures import MockProvider, MockStorage from cloudsync.sync.sqlite_storage import SqliteStorage from cloudsync import Storage, CloudSync, SyncState, SyncEntry, LOCAL, REMOTE, FILE, DIRECTORY, CloudFileNotFoundError, CloudFileExistsError from cloudsync.event import EventManager from .test_sync import WaitFor, RunUntilHelper log = logging.getLogger(__name__) @pytest.fixture(name="cs") def fixture_cs(mock_provider_generator): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass cs = CloudSyncMixin((mock_provider_generator(), mock_provider_generator()), roots, sleep=None) yield cs cs.done() def make_cs(mock_provider_creator, left, right): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass return CloudSyncMixin((mock_provider_creator(*left), mock_provider_creator(*right)), roots, sleep=None) @pytest.fixture(name="multi_cs") def fixture_multi_cs(mock_provider_generator): storage_dict = dict() storage = MockStorage(storage_dict) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = mock_provider_generator() p2 = mock_provider_generator() p3 = mock_provider_generator() roots1 = ("/local1", "/remote") roots2 = ("/local2", "/remote") cs1 = CloudSyncMixin((p1, p2), roots1, storage, sleep=None) cs2 = CloudSyncMixin((p1, p3), roots2, storage, sleep=None) yield cs1, cs2 cs1.done() cs2.done() def test_sync_multi(multi_cs): cs1, cs2 = multi_cs local_parent1 = "/local1" local_parent2 = "/local2" remote_parent1 = "/remote" remote_parent2 = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path11 = "/local1/stuff1" local_path21 = "/local2/stuff1" local_path12 = "/local1/stuff2" local_path22 = "/local2/stuff2" cs1.providers[LOCAL].mkdir(local_parent1) cs1.providers[REMOTE].mkdir(remote_parent1) cs2.providers[LOCAL].mkdir(local_parent2) cs2.providers[REMOTE].mkdir(remote_parent2) linfo1 = cs1.providers[LOCAL].create(local_path11, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path21, BytesIO(b"hello2"), None) rinfo1 = cs1.providers[REMOTE].create(remote_path2, BytesIO(b"hello3"), None) rinfo2 = cs2.providers[REMOTE].create(remote_path2, BytesIO(b"hello4"), None) assert linfo1 and linfo2 and rinfo1 and rinfo2 cs1.run_until_found( (LOCAL, local_path11), (LOCAL, local_path21), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) cs1.run(until=lambda: not cs1.state.has_changes(), timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 3 # 1 dirs, 2 files, 1 never synced (local2 file) try: cs2.run_until_found( (LOCAL, local_path12), (LOCAL, local_path22), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise linfo12 = cs1.providers[LOCAL].info_path(local_path12) rinfo11 = cs1.providers[REMOTE].info_path(remote_path1) linfo22 = cs2.providers[LOCAL].info_path(local_path22) rinfo21 = cs2.providers[REMOTE].info_path(remote_path1) assert linfo12.oid assert linfo22.oid assert rinfo11.oid assert rinfo21.oid assert linfo12.hash == rinfo1.hash assert linfo22.hash == rinfo2.hash # let cleanups/discards/dedups happen if needed cs2.run(until=lambda: not cs2.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 3 assert len(cs2.state) == 3 def test_sync_basic(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/stuff2" local_path2 = "/local/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"hello2"), None) cs.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) linfo2 = cs.providers[LOCAL].info_path(local_path2) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo2.oid assert rinfo1.oid assert linfo2.hash == rinfo2.hash assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") # let cleanups/discards/dedups happen if needed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs.state.pretty_print()) assert len(cs.state) == 3 assert not cs.state.has_changes() def setup_remote_local(cs, *names): remote_parent = "/remote" local_parent = "/local" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) found = [] for name in names: remote_path1 = "/remote/" + name local_path1 = "/local/" + name linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) found.append((REMOTE, remote_path1)) cs.run_until_found(*found) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) def test_sync_create_delete_same_name(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) cs.run(until=lambda: not cs.state.has_changes(), timeout=2) rinfo = cs.providers[REMOTE].info_path(remote_path1) cs.emgrs[LOCAL].do() cs.providers[LOCAL].delete(linfo1.oid) cs.emgrs[LOCAL].do() linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) # run local event manager only... not sync cs.emgrs[LOCAL].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") rinfo = cs.providers[REMOTE].info_path(remote_path1) bio = BytesIO() cs.providers[REMOTE].download(rinfo.oid, bio) assert bio.getvalue() == b'goodbye' # this test used to fail about 2 out of 10 times because of embracing a change that *wasn't properly indexed* # it covers cases where events arrive in unexpected orders... could be possible to get the same coverage # with a more deterministic version @pytest.mark.repeat(10) def test_sync_create_delete_same_name_heavy(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) import time, threading def creator(): i = 0 while i < 10: try: linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file" + bytes(str(i),"utf8"))) i += 1 except CloudFileExistsError: linfo1 = cs.providers[LOCAL].info_path(local_path1) if linfo1: cs.providers[LOCAL].delete(linfo1.oid) time.sleep(0.01) def done(): bio = BytesIO() rinfo = cs.providers[REMOTE].info_path(remote_path1) if rinfo: cs.providers[REMOTE].download(rinfo.oid, bio) return bio.getvalue() == b'file' + bytes(str(9),"utf8") return False thread = threading.Thread(target=creator, daemon=True) thread.start() cs.run(until=done, timeout=3) thread.join() assert done() log.info("TABLE 2\n%s", cs.state.pretty_print()) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") def test_sync_rename_heavy(cs): remote_parent = "/remote" local_parent = "/local" remote_sub = "/remote/sub" local_sub = "/local/sub" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/sub/stuff1" local_path2 = "/local/sub/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].mkdir(local_sub) cs.providers[REMOTE].mkdir(remote_sub) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"file")) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 1\n%s", cs.state.pretty_print()) import time, threading oid = linfo1.oid done = False ok = True def mover(): nonlocal done nonlocal oid nonlocal ok for _ in range(10): try: oid = cs.providers[LOCAL].rename(oid, local_path2) oid = cs.providers[LOCAL].rename(oid, local_path1) time.sleep(0.001) except Exception as e: log.exception(e) ok = False done = True thread = threading.Thread(target=mover, daemon=True) thread.start() cs.run(until=lambda: done, timeout=3) thread.join() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1 ) log.info("TABLE 3\n%s", cs.state.pretty_print()) assert ok assert cs.providers[REMOTE].info_path(remote_path1) def test_sync_two_conflicts(cs): remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # the local delete/create doesn't add entries assert(len(cs.state) == 2) else: assert(len(cs.state) == 4) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) # conflicted files are discarded, not in table log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 2) assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") or cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") b1 = BytesIO() b2 = BytesIO() cs.providers[LOCAL].download_path(local_path1, b1) cs.providers[LOCAL].download_path(local_path1 + ".conflicted", b2) assert b1.getvalue() in (b'goodbye', b'world') assert b2.getvalue() in (b'goodbye', b'world') assert b1.getvalue() != b2.getvalue() @pytest.mark.repeat(10) def test_sync_subdir_rename(cs): local_dir = "/local/a" local_base = "/local/a/stuff" local_dir2 = "/local/b" local_base2 = "/local/b/stuff" remote_dir = "/remote/a" remote_dir2 = "/remote/b" remote_base = "/remote/a/stuff" remote_base2 = "/remote/b/stuff" kid_count = 4 cs.providers[LOCAL].mkdir("/local") lpoid = cs.providers[LOCAL].mkdir(local_dir) lpaths = [] rpaths = [] rpaths2 = [] for i in range(kid_count): lpath = local_base + str(i) rpath = remote_base + str(i) rpath2 = remote_base2 + str(i) cs.providers[LOCAL].create(lpath, BytesIO(b'hello')) lpaths.append(lpath) rpaths.append((REMOTE, rpath)) rpaths2.append((REMOTE, rpath2)) cs.run_until_found(*rpaths, timeout=2) log.info("TABLE 1\n%s", cs.state.pretty_print()) cs.providers[LOCAL].rename(lpoid, local_dir2) for _ in range(10): cs.do() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.run_until_found(*rpaths2, timeout=2) log.info("TABLE 2\n%s", cs.state.pretty_print()) # this test is sensitive to the order in which things are processed # so run it a few times @pytest.mark.repeat(10) def test_sync_folder_conflicts_file(cs): remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff1/under" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].mkdir(remote_path1) linfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # there won't be 2 rows for /local/stuff1 is oid_is_path assert(len(cs.state) == 3) locs = cs.state.lookup_path(LOCAL, local_path1) assert locs and len(locs) == 1 loc = locs[0] assert loc[LOCAL].otype == FILE assert loc[REMOTE].otype == DIRECTORY else: # deleted /local/stuff, remote/stuff, remote/stuff/under, lcoal/stuff, /local assert(len(cs.state) == 5) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 4 or len(cs.state) == 3) local_conf = cs.providers[LOCAL].info_path(local_path1 + ".conflicted") remote_conf = cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") assert local_conf and not remote_conf # file was moved out of the way for the folder assert local_conf.otype == FILE # folder won local_conf = cs.providers[LOCAL].info_path(local_path1) assert local_conf.otype == DIRECTORY @pytest.fixture(params=[(MockStorage, dict()), (SqliteStorage, 'file::memory:?cache=shared')], ids=["mock_storage", "sqlite_storage"], name="storage" ) def storage_fixture(request): return request.param def test_storage(storage): roots = ("/local", "/remote") storage_class = storage[0] storage_mechanism = storage[1] class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = MockProvider(oid_is_path=False, case_sensitive=True) p2 = MockProvider(oid_is_path=False, case_sensitive=True) storage1: Storage = storage_class(storage_mechanism) cs1: CloudSync = CloudSyncMixin((p1, p2), roots, storage1, sleep=None) old_cursor = cs1.emgrs[0].state.storage_get_cursor(cs1.emgrs[0]._cursor_tag) assert old_cursor is not None log.debug("cursor=%s", old_cursor) test_sync_basic(cs1) # do some syncing, to get some entries into the state table storage2 = storage_class(storage_mechanism) cs2: CloudSync = CloudSyncMixin((p1, p2), roots, storage2, sleep=None) log.debug(f"state1 = {cs1.state.entry_count()}\n{cs1.state.pretty_print()}") log.debug(f"state2 = {cs2.state.entry_count()}\n{cs2.state.pretty_print()}") def not_dirty(s: SyncState): for se in s.get_all(): se: SyncEntry assert not se.dirty def compare_states(s1: SyncState, s2: SyncState) -> List[SyncEntry]: ret = [] found = False for e1 in s1.get_all(): e1: SyncEntry for e2 in s2.get_all(): e2: SyncEntry if e1.serialize() == e2.serialize(): found = True if not found: ret.append(e1) return ret not_dirty(cs1.state) missing1 = compare_states(cs1.state, cs2.state) missing2 = compare_states(cs2.state, cs1.state) for e in missing1: log.debug(f"entry in 1 not found in 2 {e.pretty()}") for e in missing2: log.debug(f"entry in 2 not found in 1 {e.pretty()}") assert not missing1 assert not missing2 new_cursor = cs1.emgrs[0].state.storage_get_cursor(cs1.emgrs[0]._cursor_tag) log.debug("cursor=%s %s", old_cursor, new_cursor) assert new_cursor is not None assert old_cursor != new_cursor @pytest.mark.parametrize("drain", [None, LOCAL, REMOTE]) def test_sync_already_there(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"hello"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") @pytest.mark.parametrize("drain", [LOCAL, REMOTE]) def test_sync_already_there_conflict(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) cs.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") or cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") @pytest.mark.parametrize('right', (True, False), ids=["right_cs", "right_in"]) @pytest.mark.parametrize('left', (True, False), ids=["left_cs", "left_in"]) def test_sync_rename_folder_case(mock_provider_creator, left, right): cs = make_cs(mock_provider_creator, (True, left), (False, right)) local_parent = "/local" remote_parent = "/remote" local_path1 = "/local/a" local_path2 = "/local/a/b" local_path3 = "/local/A" remote_path1 = "/remote/A" remote_path2 = "/remote/A/b" cs.providers[LOCAL].mkdir(local_parent) ldir = cs.providers[LOCAL].mkdir(local_path1) linfo = cs.providers[LOCAL].create(local_path2, BytesIO(b"hello"), None) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.debug("--- cs: %s ---", [e.case_sensitive for e in cs.providers]) cs.providers[LOCAL].log_debug_state() cs.providers[REMOTE].log_debug_state() cs.providers[LOCAL].rename(ldir, local_path3) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) rinfo2 = cs.providers[REMOTE].info_path(remote_path2) assert rinfo1 and rinfo2 assert rinfo1.path == remote_path1 assert rinfo2.path == remote_path2 # TODO: important tests: # 1. events coming in for path updates for conflicted files.... we should note conflict oids, and not insert them # 2. for oid_as_path... events coming in for old creations, long since deleted or otherwise overwritten (renamed away, etc) PKc$O lcloudsync/tests/test_events.pyimport time from io import BytesIO import threading import pytest from cloudsync import EventManager, SyncState, LOCAL @pytest.fixture(name="manager") def fixture_manager(mock_provider_generator): # TODO extend this to take any provider provider = mock_provider_generator() state = SyncState((provider, provider)) yield EventManager(provider, state, LOCAL) def test_event_basic(manager): provider = manager.provider state = manager.state info = provider.create("/dest", BytesIO(b'hello')) assert not state.lookup_path(LOCAL, "/dest") oid = None # this is normally a blocking function that runs forever def done(): nonlocal oid states = state.get_all() if states: oid = list(states)[0][LOCAL].oid return state.lookup_oid(LOCAL, oid) return False # loop the sync until the file is found manager.run(timeout=1, until=done) assert oid info = provider.info_oid(oid) assert info.path == "/dest" def test_events_shutdown_event_shouldnt_process(manager): handle = threading.Thread(target=manager.run, **{'kwargs': {'sleep': .3}}) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) try: manager.events.__next__() except StopIteration: assert False try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() def test_events_shutdown_force_process_event(manager): handle = threading.Thread(target=manager.run, **{'kwargs': {'sleep': .3}}) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) manager.do() try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() PKc$O|Vcloudsync/tests/test_mux.pyimport threading from cloudsync.muxer import Muxer def test_simple_mux(): def gen(): yield from range(4) m1 = Muxer(gen) m2 = Muxer(gen) assert len(list(m1)) == 4 assert len(list(m2)) == 4 def test_thready_mux(): threads = 10 count = 1000 def gen(): yield from range(count) def counter(m): def inner(): inner.count = 0 for _ in m: inner.count += 1 return inner m = [None] * threads c = [None] * threads t = [None] * threads for i in range(threads): m[i] = Muxer(gen) c[i] = counter(m[i]) t[i] = threading.Thread(target=c[i]) for i in range(threads): t[i].start() for i in range(threads): t[i].join() assert c[i].count == count def test_later_mux(): def gen(): yield from range(4) m1 = Muxer(gen) assert next(m1) == 0 m2 = Muxer(gen) assert len(m1.listeners) == 2 assert len(list(m1)) == 3 assert len(list(m2)) == 3 def test_restart_mux(): def gen(): yield from range(4) m1 = Muxer(gen, restart=True) m2 = Muxer(gen, restart=True) assert len(m1.listeners) == 2 assert len(list(m1)) == 4 assert len(list(m2)) == 8 assert len(list(m1)) == 8 assert len(list(m2)) == 8 def test_del(): def gen(): yield from range(4) m1 = Muxer(gen) _ = next(m1) m2 = Muxer(gen) lm2 = list(m2) assert len(m2.listeners) == 2 assert len(lm2) == 3 m2.__del__() assert len(m1.listeners) == 1 _ = list(m1) assert gen in Muxer.already m1.__del__() assert len(m1.listeners) == 0 assert gen not in Muxer.already PKc$O1PϮϮ cloudsync/tests/test_provider.pyimport os import time import logging from io import BytesIO from unittest.mock import patch from typing import Union, NamedTuple, Optional, Generator import pytest import cloudsync from cloudsync import Event, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError, FILE from cloudsync.tests.fixtures import Provider, mock_provider, mock_provider_instance from cloudsync.runnable import time_helper from cloudsync.types import OInfo # from cloudsync.providers import GDriveProvider, DropboxProvider log = logging.getLogger(__name__) ProviderMixin = Union[Provider, "ProviderHelper"] class ProviderHelper(): def __init__(self, prov): self.api_retry = True self.prov = prov self.test_root = getattr(self.prov, "test_root", None) self.event_timeout = getattr(self.prov, "event_timeout", 20) self.event_sleep = getattr(self.prov, "event_sleep", 1) self.creds = getattr(self.prov, "creds", {}) self.prov_api_func = self.prov._api self.prov._api = lambda *ar, **kw: self.__api_retry(self._api, *ar, **kw) self.prov.connect(self.creds) assert prov.connection_id if not self.test_root: # if the provider class doesn't specify a testing root # then just make one up self.test_root = "/" + os.urandom(16).hex() self.prov.mkdir(self.test_root) def _api(self, *ar, **kw): return self.prov_api_func(*ar, **kw) def __api_retry(self: ProviderMixin, func, *ar, **kw): # the cloud providers themselves should *not* have their own backoff logic # rather, they should punt rate limit and temp errors to the sync system # since we're not testing the sync system here, we need to make our own if not self.api_retry: return func(*ar, **kw) for _ in time_helper(timeout=self.event_timeout, sleep=self.event_sleep, multiply=2): try: return func(*ar, **kw) except CloudTemporaryError: log.info("api retry %s %s %s", func, ar, kw) # TEST-ROOT WRAPPER def __getattr__(self, k): return getattr(self.prov, k) def events(self) -> Generator[Event, None, None]: for e in self.prov.events(): if self.__filter_root(e): yield e def walk(self, path, since=None): path = self.__add_root(path) log.debug("WALK %s", path) for e in self.prov.walk(path): if self.__filter_root(e): yield e def download(self, *args, **kwargs): return self.__strip_root(self.prov.download(*args, **kwargs)) def create(self, path, file_like, metadata=None): path = self.__add_root(path) log.debug("CREATE %s", path) return self.__strip_root(self.prov.create(path, file_like, metadata)) def upload(self, *args, **kwargs): return self.__strip_root(self.prov.upload(*args, **kwargs)) def rename(self, oid, path): path = self.__add_root(path) return self.__strip_root(self.prov.rename(oid, path)) def mkdir(self, path): path = self.__add_root(path) return self.__strip_root(self.prov.mkdir(path)) def delete(self, *args, **kwargs): return self.__strip_root(self.prov.delete(*args, **kwargs)) def exists_oid(self, oid): return self.prov.exists_oid(oid) def exists_path(self, path): path = self.__add_root(path) return self.prov.exists_path(path) def info_path(self, path: str) -> Optional[OInfo]: path = self.__add_root(path) return self.__strip_root(self.prov.info_path(path)) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: return self.__strip_root(self.prov.info_oid(oid)) def listdir(self, oid): for e in self.prov.listdir(oid): if self.__filter_root(e): yield e def __add_root(self, path): return self.join(self.test_root, path) def __filter_root(self, obj): if hasattr(obj, "path"): raw_path = obj.path if not raw_path: info = self.prov.info_oid(obj.oid) if info: raw_path = info.path if not raw_path: # pathless objects always get passed through # so isolation is not perfect return True if not self.is_subpath(self.test_root, raw_path): return False self.__strip_root(obj) return True def __strip_root(self, obj): if hasattr(obj, "path"): path = obj.path if path: relative = self.is_subpath(self.test_root, path) assert relative path = relative if not path.startswith(self.sep): path = self.sep + path obj.path = path return obj # HELPERS def temp_name(self: ProviderMixin, name="tmp", *, folder=None): fname = self.join(folder or self.sep, os.urandom(16).hex() + "." + name) return fname def events_poll(self: ProviderMixin, timeout=None, until=None) -> Generator[Event, None, None]: if timeout is None: timeout = self.event_timeout if timeout == 0: yield from self.events() return for _ in time_helper(timeout, sleep=self.event_sleep, multiply=2): got = False for e in self.events(): yield e got = True if not until and got: break elif until and until(): break def __cleanup(self: ProviderMixin, oid): try: for info in self.prov.listdir(oid): if info.otype == FILE: log.debug("cleaning %s", info) self.delete(info.oid) else: self.__cleanup(info.oid) log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileNotFoundError: pass def test_cleanup(self: ProviderMixin, timeout=None, until=None): info = self.prov.info_path(self.test_root) self.__cleanup(info.oid) info = self.prov.info_path(self.test_root) if info: try: log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileExistsError: # deleting the root might now be supported pass def mixin_provider(prov): assert prov assert isinstance(prov, Provider) prov = ProviderHelper(prov) yield prov prov.test_cleanup() @pytest.fixture def provider_params(): return None class ProviderConfig: def __init__(self, name, param=(), param_id=None): if param_id is None: param_id = name self.name = name if name == "mock": assert param self.param = param self.param_id = param_id def __repr__(self): return "%s(%s)" % (type(self), self.__dict__) @pytest.fixture def config_provider(request, provider_config): try: request.raiseerror("foo") except Exception as e: FixtureLookupError = type(e) if provider_config.name == "external": # if there's a fixture available, use it return request.getfixturevalue("cloudsync_provider") # deferring imports to prevent needing deps we don't want to require for everyone elif provider_config.name == "mock": return mock_provider_instance(*provider_config.param) elif provider_config.name == "gdrive": from .providers.gdrive import gdrive_provider return gdrive_provider() elif provider_config.name == "dropbox": from .providers.dropbox import dropbox_provider return dropbox_provider() else: assert False, "Must provide a valid --provider name or use the -p " known_providers = ('gdrive', 'external', 'dropbox', 'mock') def configs_from_name(name): provs = [] if name == "mock": provs += [ProviderConfig("mock", (False, True), "mock_oid_cs")] provs += [ProviderConfig("mock", (True, True), "mock_path_cs")] else: provs += [ProviderConfig(name)] return provs def configs_from_keyword(kw): provs = [] # crappy approximation of pytest evaluation routine, because false = {} for known_prov in known_providers: false[known_prov] = False for known_prov in known_providers: if known_prov == kw or '[' + known_prov + ']' == kw: ok = True else: ids = false.copy() ids[known_prov] = True try: ok = eval(kw, {}, ids) except NameError as e: ok = False except Exception as e: log.error("%s %s", type(e), e) ok = False if type(ok) is list: ok = any(ok) if ok: provs += configs_from_name(known_prov) return provs _registered = False def pytest_generate_tests(metafunc): global _registered if not _registered: for known_prov in known_providers: metafunc.config.addinivalue_line( "markers", known_prov ) _registered = True if "provider_config" in metafunc.fixturenames: provs = [] for e in metafunc.config.getoption("provider", []): for n in e.split(","): provs += configs_from_name(n) if not provs: kw = metafunc.config.getoption("keyword", "") if kw: provs += configs_from_keyword(kw) if not provs: provs += configs_from_name("mock") ids = [p.param_id for p in provs] marks = [pytest.param(p, marks=[getattr(pytest.mark, p.name)]) for p in provs] metafunc.parametrize("provider_config", marks, ids=ids) @pytest.fixture(name="provider") def provider_fixture(config_provider): yield from mixin_provider(config_provider) def test_join(mock_provider): assert "/a/b/c" == mock_provider.join("a", "b", "c") assert "/a/c" == mock_provider.join("a", None, "c") assert "/a/b/c" == mock_provider.join("/a", "/b", "/c") assert "/a/c" == mock_provider.join("a", "/", "c") def test_connect(provider): assert provider.connected def test_create_upload_download(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) info2 = provider.upload(info1.oid, data()) assert info1.hash assert info2.hash assert info1.oid == info2.oid assert info1.hash == info2.hash assert provider.exists_path(dest) dest = BytesIO() provider.download(info2.oid, dest) dest.seek(0) assert dest.getvalue() == dat def test_rename(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) dest2 = provider.temp_name("dest2") provider.rename(info1.oid, dest2) assert provider.exists_path(dest2) assert not provider.exists_path(dest) # test that renaming a folder renames the children folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() file_name = os.urandom(16).hex() file_path1 = provider.join(folder_name1, file_name) file_path2 = provider.join(folder_name2, file_name) sub_folder_name = os.urandom(16).hex() sub_folder_path1 = provider.join(folder_name1, sub_folder_name) sub_folder_path2 = provider.join(folder_name2, sub_folder_name) sub_file_name = os.urandom(16).hex() sub_file_path1 = provider.join(sub_folder_path1, sub_file_name) sub_file_path2 = provider.join(sub_folder_path2, sub_file_name) folder_oid = provider.mkdir(folder_name1) sub_folder_oid = provider.mkdir(sub_folder_path1) file_info = provider.create(file_path1, data()) sub_file_info = provider.create(sub_file_path1, data()) assert provider.exists_path(file_path1) assert not provider.exists_path(file_path2) assert provider.exists_path(sub_file_path1) assert not provider.exists_path(sub_file_path2) assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) new_oid = provider.rename(folder_oid, folder_name2) assert provider.exists_path(file_path2) assert not provider.exists_path(file_path1) assert provider.exists_path(sub_file_path2) assert not provider.exists_path(sub_file_path1) assert provider.exists_oid(new_oid) if not provider.oid_is_path: assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) assert provider.info_oid(file_info.oid).path == file_path2 assert provider.info_oid(sub_file_info.oid).path == sub_file_path2 else: assert not provider.exists_oid(file_info.oid) assert not provider.exists_oid(sub_file_info.oid) # move to sub dest = provider.temp_name("movy") sub_file_name = os.urandom(16).hex() sub_file_path3 = provider.join(sub_folder_path2, sub_file_name) info1 = provider.create(dest, data()) provider.rename(info1.oid, sub_file_path3) def test_mkdir(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") provider.mkdir(dest) info = provider.info_path(dest) assert info.otype == cloudsync.DIRECTORY sub_f = provider.temp_name("dest", folder=dest) log.debug("parent = %s, sub = %s", dest, sub_f) with pytest.raises(CloudFileExistsError): provider.create(dest, data(), None) assert provider.exists_path(dest) log.debug("folder %s exists", dest) provider.create(sub_f, data(), None) def test_walk(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) folder = provider.temp_name("folder") provider.mkdir(folder) subfolder = provider.join(folder, provider.temp_name("subfolder")) provider.mkdir(subfolder) dest0 = provider.temp_name("dest0") dest1 = provider.join(folder, provider.temp_name("dest1")) dest2 = provider.join(subfolder, provider.temp_name("dest2")) oids = {} info = provider.create(dest0, temp, None) oids[dest0] = info.oid info = provider.create(dest1, temp, None) oids[dest1] = info.oid info = provider.create(dest2, temp, None) oids[dest2] = info.oid got_event = False found = {} for e in provider.walk("/"): if e.otype == cloudsync.DIRECTORY: continue log.debug("WALK %s", e) path = e.path if path is None: path = provider.info_oid(e.oid).path assert oids[path] == e.oid found[path] = True assert e.mtime assert e.exists got_event = True for x in [dest0, dest1, dest2]: assert found.get(x, False) is True log.debug("found %s", x) assert got_event def check_event_path(event: Event, provider: ProviderMixin, target_path): # confirms that the path in the event matches the target_path # if the provider doesn't provide the path in the event, look it up by the oid in the event # if we can't get the path, that's OK if the file doesn't exist event_path = event.path if event_path is None: try: event_path = provider.info_oid(event.oid).path assert event_path == target_path except CloudFileNotFoundError: if event.exists: raise def test_event_basic(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep, 1)): log.debug("event %s", e) wait_sleep_cycles = 30 log.debug("create event") info1 = provider.create(dest, temp, None) assert info1 is not None # TODO: check info1 for more things received_event = None event_count = 0 done = False waiting = None wait_secs = min(provider.event_sleep * wait_sleep_cycles, 2) for e in provider.events_poll(until=lambda: done): log.debug("got event %s", e) # you might get events for the root folder here or other setup stuff if e.exists: if not e.path: info = provider.info_oid(e.oid) if info: e.path = info.path if e.path == dest: received_event = e event_count += 1 log.debug("%s vs %s", e.path, dest) if e.path == dest and not waiting: waiting = time.monotonic() + wait_secs if waiting and time.monotonic() > waiting: # wait for extra events up to 10 sleep cycles, or 2 seconds done = True assert event_count == 1 assert received_event is not None assert received_event.oid path = received_event.path if path is None: path = provider.info_oid(received_event.oid).path assert path == dest assert received_event.mtime assert received_event.exists deleted_oid = received_event.oid log.debug("delete event") provider.delete(oid=deleted_oid) provider.delete(oid=deleted_oid) # Tests that deleting a non-existing file does not raise a FNFE received_event = None for e in provider.events_poll(until=lambda: received_event is not None): log.debug("event %s", e) if e.exists and e.path is None: info2 = provider.info_oid(e.oid) if info2: e.path = info2.path else: e.exists = False # assert not e.exists or e.path is not None # This is actually OK, google will do this legitimately log.debug("event %s", e) if (not e.exists and e.oid == deleted_oid) or (e.path and path in e.path): received_event = e assert received_event is not None assert received_event.oid assert not received_event.exists if received_event.path is not None: assert received_event.path == dest assert received_event.oid == deleted_oid assert received_event.mtime def test_event_del_create(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) temp2 = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) provider.delete(info1.oid) info2 = provider.create(dest, temp2) last_event = None saw_first_delete = False saw_first_create = False disordered = False done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path if path == dest or e.exists is False: last_event = e if e.oid == info1.oid: if e.exists: saw_first_create = True if saw_first_delete and not provider.oid_is_path: log.debug("disordered!") disordered = True else: saw_first_delete = True if e.exists and e.oid == info2.oid: if provider.oid_is_path: if saw_first_delete and saw_first_create: done = True else: done = True # the important thing is that we always get a create after the delete event assert last_event, "Event loop timed out before getting any events" assert done, "Event loop timed out after the delete, but before the create, " \ "saw_first_delete=%s, saw_first_create=%s, disordered=%s" % (saw_first_delete, saw_first_create, disordered) assert last_event.exists is True # The provider may compress out the first create, or compress out the first create and delete, or deliver both # So, if we saw the first create, make sure we got the delete. If we didn't see the first create, # it doesn't matter if we saw the first delete. if saw_first_create: assert saw_first_delete assert not disordered def test_event_rename(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") dest2 = provider.temp_name("dest") dest3 = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) oid2 = provider.rename(info1.oid, dest2) if provider.oid_is_path: info1.oid = provider.info_path(dest2).oid oid3 = provider.rename(info1.oid, dest3) if provider.oid_is_path: info1.oid = provider.info_path(dest3).oid seen = set() last_event = None second_to_last = None done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): if provider.oid_is_path: assert e.path log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path last_event = e seen.add(e.oid) if provider.oid_is_path: # 2 and 3 are in order if path == dest2: second_to_last = True if path == dest3 and (second_to_last or not provider.oid_is_path): done = True else: done = info1.oid in seen if provider.oid_is_path: # providers with path based oids need to send intermediate renames accurately and in order assert len(seen) > 2 assert last_event.path == dest3 assert last_event.prior_oid == oid2 else: # oid based providers just need to let us know something happend to that oid assert info1.oid in seen def test_api_failure(provider): # assert that the cloud # a) uses an api function # b) does not trap CloudTemporaryError's def side_effect(*a, **k): raise CloudTemporaryError("fake disconnect") with patch.object(provider, "_api", side_effect=side_effect): with patch.object(provider, "api_retry", False): with pytest.raises(CloudTemporaryError): provider.exists_path("/notexists") def test_file_not_found(provider: ProviderMixin): # Test that operations on nonexistent file system objects raise CloudFileNotFoundError # when appropriate, and don't when inappropriate dat = os.urandom(32) def data(): return BytesIO(dat) test_file_deleted_path = provider.temp_name("dest1") # Created, then deleted test_file_deleted_info = provider.create(test_file_deleted_path, data(), None) test_file_deleted_oid = test_file_deleted_info.oid provider.delete(test_file_deleted_oid) test_folder_deleted_path = provider.temp_name("dest1") # Created, then deleted test_folder_deleted_oid = provider.mkdir(test_folder_deleted_path) provider.delete(test_folder_deleted_oid) test_path_made_up = provider.temp_name("dest2") # Never created test_oid_made_up = "never created" # TODO: consider mocking info_path to always return None, and then call all the provider methods # to see if they are handling the None, and not raising exceptions other than FNF # Tests: # exists_path # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_path(test_file_deleted_path) is False assert provider.exists_path(test_folder_deleted_path) is False assert provider.exists_path(test_path_made_up) is False # exists_oid # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_oid(test_folder_deleted_oid) is False assert provider.exists_oid(test_oid_made_up) is False # info_path # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_path(test_file_deleted_path) is None assert provider.info_path(test_folder_deleted_path) is None assert provider.info_path(test_path_made_up) is None # info_oid # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_oid(test_file_deleted_oid) is None assert provider.info_oid(test_folder_deleted_oid) is None assert provider.info_oid(test_oid_made_up) is None # hash_oid # deleted file returns None # never existed file returns None # if getattr(provider, "hash_oid", False): # TODO implement hash_oid in gdrive, then don't have this be conditional # assert provider.hash_oid(test_file_deleted_oid) is None # assert provider.hash_oid(test_oid_made_up) is None # upload # to a deleted file raises FNF, or untrashes the file, either is OK # to a made up oid raises FNF # TODO: uploading to a deleted file might not raise an FNF, it might just untrash the file assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_path(test_file_deleted_path) is False try: info = provider.upload(test_file_deleted_oid, data(), None) # This succeeded so the file must exist now, at the same oid as before assert info.oid == test_file_deleted_oid assert provider.exists_path(test_file_deleted_path) is True assert provider.exists_oid(test_file_deleted_oid) is True re_delete = True except CloudFileNotFoundError: re_delete = False pass if re_delete: provider.delete(test_file_deleted_oid) with pytest.raises(CloudFileNotFoundError): provider.upload(test_oid_made_up, data(), None) # create # to a non-existent folder, raises FNF # to a previously deleted folder, raises FNF with pytest.raises(CloudFileNotFoundError): provider.create(test_path_made_up + "/junk", data(), None) with pytest.raises(CloudFileNotFoundError): provider.create(test_folder_deleted_path + "/junk", data(), None) # upload: to the OID of a deleted folder, raises FNFE with pytest.raises(CloudFileNotFoundError): provider.upload(test_folder_deleted_oid, data(), None) # download # on a deleted oid raises FNF # on a made up oid raises FNF with pytest.raises(CloudFileNotFoundError): provider.download(test_file_deleted_oid, data()) with pytest.raises(CloudFileNotFoundError): provider.download(test_oid_made_up, data()) # rename # from a deleted oid raises FNF # from a made up oid raises FNF # to a non-existent folder raises [something], conditionally # to a previously deleted folder raises # check the rename source to see if there are others with pytest.raises(CloudFileNotFoundError): provider.rename(test_file_deleted_oid, test_file_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_folder_deleted_oid, test_folder_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_oid_made_up, test_path_made_up) # mkdir # to a non-existent folder raises FNF # to a previously deleted folder as parent folder raises FNF # to a previously deleted file as parent folder raises FNF with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_path_made_up + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_folder_deleted_path + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_file_deleted_path + "/junk") # delete # on a deleted file oid does not raise # on a deleted folder oid does not raise # on a made up oid does not raise provider.delete(test_file_deleted_oid) provider.delete(test_folder_deleted_oid) provider.delete(test_oid_made_up) # delete: create a file, delete it, then create a new file at that path, then re-delete the deleted oid, raises FNFE temp_path = provider.temp_name() info1 = provider.create(temp_path, BytesIO(b"Hello")) provider.delete(info1.oid) info2 = provider.create(temp_path, BytesIO(b"world")) if provider.oid_is_path: assert provider.exists_oid(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) else: assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) provider.delete(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) # listdir # on a deleted file raises FNF # on a deleted folder raises FNF # on a made up path raises FNF with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_file_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_folder_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_oid_made_up)) # TODO: Google drive raises FNF when it can't find the root... can we test for that here? def test_file_exists(provider: ProviderMixin): dat = os.urandom(32) def data(da=dat): return BytesIO(da) def create_and_delete_file(): create_and_delete_file_name = provider.temp_name() file_info = provider.create(create_and_delete_file_name, data(), None) provider.delete(file_info.oid) return create_and_delete_file_name, file_info.oid def create_and_delete_folder(): create_and_delete_folder_name = provider.temp_name() create_and_delete_folder_oid = provider.mkdir(create_and_delete_folder_name) provider.delete(create_and_delete_folder_oid) return create_and_delete_folder_name, create_and_delete_folder_oid def create_and_rename_file(): file_name1 = provider.temp_name() file_name2 = provider.temp_name() assert file_name1 != file_name2 file_info1 = provider.create(file_name1, data(), None) provider.rename(file_info1.oid, file_name2) return file_name1, file_info1.oid def create_and_rename_folder(): folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() assert folder_name1 != folder_name2 folder_oid1 = provider.mkdir(folder_name1) provider.rename(folder_oid1, folder_name2) return folder_name1, folder_oid1 def create_file(create_file_name=None): if create_file_name is None: create_file_name = provider.temp_name() file_info = provider.create(create_file_name, data(), None) return create_file_name, file_info.oid def create_folder(create_folder_name=None): if create_folder_name is None: create_folder_name = provider.temp_name() create_folder_oid = provider.mkdir(create_folder_name) return create_folder_name, create_folder_oid # Test that operations on existent file system objects raise CloudExistsError # when appropriate, and don't when inappropriate # api methods to check for FileExists: # vulnerable to existing paths: # mkdir, create, rename # Possible issues to potentially check each of the vulnerable api methods: # target path has a component in the parent folder that already exists as a file # target path exists # target path exists, but the type of the existing object at that location is different from expected # target path exists, but the type of the existing object at that location is what was expected # target path existed, but was deleted, different type as source # target path existed, but was deleted, same type as source # target path existed, but was renamed, different type as source # target path existed, but was renamed, same type as source # # vulnerable to existing OIDs: # upload, delete # Possible issues to potentially check each of the vulnerable api methods: # target OID exists, but the type of the existing object at that location is different from expected # target OID existed, but was trashed, should un-trash the object # target OID is a non-empty folder, delete should raise FEx # # The enumerated tests: # mkdir: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name + "/junk") # mkdir: where target path exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name) # mkdir: where target path exists as a folder, does not raise name1, oid1 = create_folder() oid2 = provider.mkdir(name1) assert oid1 == oid2 # mkdir: creating a file, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: creating a folder, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as file, but was renamed name1, oid1 = create_and_rename_file() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as folder, but was renamed name1, oid1 = create_and_rename_folder() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # upload: where target OID is a folder, raises FEx _, oid = create_folder() with pytest.raises(CloudFileExistsError): provider.upload(oid, data(), None) # delete: a non-empty folder, raises FEx name1, oid1 = create_folder() create_file(name1 + "/junk") with pytest.raises(CloudFileExistsError): provider.delete(oid1) # create: where target path exists, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name) def get_contents(oid): temp_contents = BytesIO() provider.download(oid, temp_contents) temp_contents.seek(0) return temp_contents.getvalue() # create: creating a file, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() _, oid2 = create_file(name1) assert oid1 != oid2 or provider.oid_is_path if provider.oid_is_path: assert provider.exists_oid(oid1) else: assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) # piggyback test -- uploading to the deleted oid should not step on the file that replaced it at that path if not provider.oid_is_path: try: new_contents = b"yo" # bytes(os.urandom(16).hex(), 'utf-8') new_info = provider.upload(oid1, BytesIO(new_contents)) assert new_info.oid != oid1 assert new_info.oid != oid2 assert not provider.exists_oid(oid1) contents2 = get_contents(oid2) assert contents2 == new_contents contents1 = get_contents(oid1) assert contents1 == new_contents except CloudFileNotFoundError: pass # create: creating a folder, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() _, oid2 = create_file(name1) assert oid1 != oid or provider.oid_is_path # create: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name + "/junk") # create: target path existed as folder, but was renamed name, _ = create_and_rename_folder() create_file(name) # create: target path existed as file, but was renamed name, _ = create_and_rename_file() create_file(name) # rename: rename folder over empty folder succeeds name1, oid1 = create_folder() create_file(name1 + "/junk") name2, oid2 = create_folder() assert oid1 != oid2 contents1 = [x.name for x in provider.listdir(oid1)] provider.rename(oid1, name2) if provider.oid_is_path: log.debug("oid1 %s, oid2 %s", oid1, oid2) assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid2)] else: assert provider.exists_oid(oid1) assert not provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid1)] assert contents1 == contents2 # rename: rename folder over non-empty folder raises FEx _, oid1 = create_folder() name2, oid2 = create_folder() assert oid1 != oid2 create_file(name2 + "/junk") with pytest.raises(CloudFileExistsError): provider.rename(oid1, name2) # rename: target has a parent folder that already exists as a file, raises FEx folder_name, _ = create_file() # notice that I am creating a file, and calling it a folder with pytest.raises(CloudFileExistsError): create_file(folder_name + "/junk") # rename: renaming file over empty folder, raises FEx folder_name, folder_oid = create_folder() file_name, file_oid = create_file() other_file_name, other_file_oid = create_file() with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # rename: renaming file over non-empty folder, raises FEx create_file(folder_name + "/test") with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(folder_oid, file_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(file_oid, other_file_name) # reuse the same file and folder from the last test # rename: create a file, delete it, then rename a file to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_file() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename file to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_file() provider.rename(oid2, deleted_folder_name) # rename: create a file, delete it, then rename a folder to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_folder() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename folder to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_folder() provider.rename(oid2, deleted_folder_name) # rename: target folder path existed, but was renamed away, folder type as source name1, oid1 = create_and_rename_folder() name2, oid2 = create_folder() provider.rename(oid2, name1) # rename: target folder path existed, but was renamed away, file type as source name1, oid1 = create_folder() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, folder type as source name1, oid1 = create_file() name2, oid2 = create_folder() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, file type as source name1, oid1 = create_file() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # TODO: test that renaming A over B replaces B's OID with A's OID, and B's OID is trashed def test_listdir(provider: ProviderMixin): outer = provider.temp_name() root = provider.dirname(outer) temp_name = provider.is_subpath(root, outer) outer_oid_rm = provider.mkdir(outer) assert [] == list(provider.listdir(outer_oid_rm)) provider.delete(outer_oid_rm) outer_oid = provider.mkdir(outer) assert provider.exists_path(outer) assert provider.exists_oid(outer_oid) inner = outer + temp_name inner_oid = provider.mkdir(inner) assert provider.exists_oid(inner_oid) provider.create(outer + "/file1", BytesIO(b"hello")) provider.create(outer + "/file2", BytesIO(b"there")) provider.create(inner + "/file3", BytesIO(b"world")) contents = [x.name for x in provider.listdir(outer_oid)] assert len(contents) == 3 expected = ["file1", "file2", temp_name[1:]] assert contents.sort() == expected.sort() def test_upload_to_a_path(provider: ProviderMixin): temp_name = provider.temp_name() info = provider.create(temp_name, BytesIO(b"test")) assert info.hash # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): info = provider.upload(temp_name, BytesIO(b"test2")) def test_upload_zero_bytes(provider: ProviderMixin): temp_name = provider.temp_name() info = provider.create(temp_name, BytesIO(b"")) info2 = provider.upload(info.oid, BytesIO(b"")) dest = BytesIO() provider.download(info.oid, dest) assert info assert info2 assert info.hash == info2.hash def test_delete_doesnt_cross_oids(provider: ProviderMixin): temp_name = provider.temp_name() info1 = provider.create(temp_name, BytesIO(b"test1")) provider.delete(info1.oid) info2 = provider.create(temp_name, BytesIO(b"test2")) if not provider.oid_is_path: assert info1.oid != info2.oid assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) if not provider.oid_is_path: provider.delete(info1.oid) assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) def test_rename_case_change(provider: ProviderMixin): temp_namel = provider.temp_name().lower() temp_nameu = temp_namel.upper() infol = provider.create(temp_namel, BytesIO(b"test")) assert infol.path == temp_namel new_oid = provider.rename(infol.oid, temp_nameu) assert new_oid infou = provider.info_oid(new_oid) assert infou.path == temp_nameu infopu = provider.info_path(temp_nameu) infopl = provider.info_path(temp_namel) assert infopu assert infopu.path == temp_nameu if provider.case_sensitive: assert not infopl else: assert infopl assert infopl.path == temp_nameu PKd$O_xxcloudsync/tests/test_sync.pyimport logging from io import BytesIO import pytest from typing import NamedTuple from cloudsync import SyncManager, SyncState, CloudFileNotFoundError, LOCAL, REMOTE, FILE, DIRECTORY from cloudsync.provider import Provider from cloudsync.types import OInfo from cloudsync.sync.state import SideState class WaitFor(NamedTuple): side: int = None path: str = None hash: bytes = None oid: str = None exists: bool = True log = logging.getLogger(__name__) TIMEOUT = 4 class RunUntilHelper: def run_until_found(self: SyncManager, *files, timeout=TIMEOUT): log.debug("running until found") last_error = None def found(): ok = True for info in files: if type(info) is tuple: info = WaitFor(side=info[0], path=info[1]) try: other_info = self.providers[info.side].info_path(info.path) except CloudFileNotFoundError: other_info = None if other_info is None: nonlocal last_error if info.exists is False: log.debug("waiting not exists %s", info.path) continue log.debug("waiting exists %s", info.path) last_error = CloudFileNotFoundError(info.path) ok = False break if info.exists is False: ok = False break if info.hash and info.hash != other_info.hash: log.debug("waiting hash %s", info.path) ok = False break return ok self.run(timeout=timeout, until=found) if not found(): if last_error: raise TimeoutError("timed out while waiting: %s" % last_error) else: raise TimeoutError("timed out while waiting") class SyncMgrMixin(SyncManager, RunUntilHelper): pass @pytest.fixture(name="sync") def fixture_sync(mock_provider_generator): providers = (mock_provider_generator(), mock_provider_generator(oid_is_path=False)) state = SyncState(providers) def translate(to, path): if to == LOCAL: return "/local" + path.replace("/remote", "") if to == REMOTE: return "/remote" + path.replace("/local", "") raise ValueError("bad path: %s", path) def resolve(f1, f2): return None # two providers and a translation function that converts paths in one to paths in the other sync = SyncMgrMixin(state, providers, translate, resolve) yield sync sync.state.assert_index_is_correct() sync.done() def test_sync_state_basic(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_sync_state_rename(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_sync_state_rename2(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo") assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, "123") state.update(LOCAL, FILE, path="foo2", oid="456", prior_oid="123") assert state.lookup_path(LOCAL, path="foo2") assert state.lookup_oid(LOCAL, "456") assert not state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="456") assert not state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_sync_state_rename3(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers) ahash = "ah" bhash = "bh" state.update(LOCAL, FILE, path="a", oid="a", hash=ahash) state.update(LOCAL, FILE, path="b", oid="b", hash=bhash) infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") assert infoa[LOCAL].hash == ahash assert infob[LOCAL].hash == bhash # rename in a circle state.update(LOCAL, FILE, path="c", oid="c", prior_oid="a") log.debug("TABLE 0:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="a", oid="a", prior_oid="b") log.debug("TABLE 1:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="b", oid="b", prior_oid="c") log.debug("TABLE 2:\n%s", state.pretty_print(use_sigs=False)) assert state.lookup_path(LOCAL, "a") assert state.lookup_path(LOCAL, "b") infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") # hashes should be flipped assert infoa[LOCAL].hash == bhash assert infob[LOCAL].hash == ahash state.assert_index_is_correct() def test_sync_state_multi(mock_provider): providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, FILE, path="foo2", oid="123") assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_sync_state_kids(mock_provider): # annoyingly, the state manager now interacts with the provider # this means that the state manager needs to know how to get an oid # TODO: make a layer that knows about providers and state, and ANOTHER layer that just knows about state # that way we can go back to have a pure state/storage manager providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, DIRECTORY, path="/dir", oid="123") assert state.lookup_path(LOCAL, path="/dir") state.update(LOCAL, FILE, path="/dir/foo", oid="124") assert state.lookup_path(LOCAL, path="/dir/foo") new_oid = mock_provider.mkdir("/dir2") mock_provider.create("/dir2/foo", BytesIO(b'hi')) state.update(LOCAL, DIRECTORY, path="/dir2", oid=new_oid, prior_oid="123") log.debug("TABLE:\n%s", state.pretty_print(use_sigs=False)) state.assert_index_is_correct() assert len(state) == 2 assert state.lookup_path(LOCAL, "/dir2/foo") def test_sync_state_split(mock_provider): # annoyingly, the state manager now interacts with the provider # this means that the state manager needs to know how to get an oid # TODO: make a layer that knows about providers and state, and ANOTHER layer that just knows about state # that way we can go back to have a pure state/storage manager providers = (mock_provider, mock_provider) state = SyncState(providers) state.update(LOCAL, DIRECTORY, path="/dir", oid="123") ent = state.lookup_oid(LOCAL, "123") # oid/path updated ent[REMOTE].oid="999" ent[REMOTE].path="/rem" assert state.lookup_oid(LOCAL, "123") assert state.lookup_path(LOCAL, "/dir") assert state.lookup_path(REMOTE, "/rem") assert state.lookup_oid(REMOTE, "999") (defer, ds, repl, rs) = state.split(ent) assert state.lookup_oid(LOCAL, "123") is repl assert state.lookup_path(LOCAL, "/dir") assert state.lookup_path(REMOTE, "/rem") assert state.lookup_oid(REMOTE, "999") is defer state.assert_index_is_correct() def test_sync_basic(sync: "SyncMgrMixin"): remote_parent = "/remote" local_parent = "/local" remote_path1 = Provider.join(remote_parent, "stuff1") local_path1 = sync.translate(LOCAL, remote_path1) local_path1.replace("\\", "/") assert local_path1 == "/local/stuff1" Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(LOCAL, FILE, oid=linfo.oid, exists=True) assert sync.state.entry_count() == 1 rinfo = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello2")) # inserts info about some cloud path sync.change_state(REMOTE, FILE, oid=rinfo.oid, path=remote_path2, hash=rinfo.hash) def done(): has_info = [None] * 4 try: has_info[0] = sync.providers[LOCAL].info_path("/local/stuff1") has_info[1] = sync.providers[LOCAL].info_path("/local/stuff2") has_info[2] = sync.providers[REMOTE].info_path("/remote/stuff2") has_info[3] = sync.providers[REMOTE].info_path("/remote/stuff2") except CloudFileNotFoundError as e: log.debug("waiting for %s", e) pass return all(has_info) # loop the sync until the file is found sync.run(timeout=TIMEOUT, until=done) assert done() info = sync.providers[LOCAL].info_path("/local/stuff2") assert info.hash == sync.providers[LOCAL].hash_oid(info.oid) assert info.oid log.debug("all state %s", sync.state.get_all()) sync.state.assert_index_is_correct() def test_sync_conflict_rename_path(sync): base = "/some@.o dd/cr azy.path" join = sync.providers[LOCAL].join sync.providers[LOCAL].mkdirs(base) path = join(base, "to()a.doc.zip") sync.providers[LOCAL].create(path, BytesIO(b"hello")) oid, new, cpath = sync.conflict_rename(LOCAL, path) assert cpath == join(base, "to()a.conflicted.doc.zip") sync.providers[LOCAL].create(path, BytesIO(b"hello")) oid, new, cpath = sync.conflict_rename(LOCAL, path) assert cpath == join(base, "to()a.conflicted2.doc.zip") def test_sync_rename(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" local_path2 = Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) new_oid = sync.providers[LOCAL].rename(linfo.oid, local_path2) sync.change_state(LOCAL, FILE, path=local_path2, oid=new_oid, hash=linfo.hash, prior_oid=linfo.oid) sync.run_until_found((REMOTE, remote_path2)) assert sync.providers[REMOTE].info_path("/remote/stuff") is None sync.state.assert_index_is_correct() def test_sync_hash(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" remote_path1 = "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) linfo = sync.providers[LOCAL].upload(linfo.oid, BytesIO(b"hello2")) sync.change_state(LOCAL, FILE, linfo.oid, hash=linfo.hash) sync.run_until_found(WaitFor(REMOTE, remote_path1, hash=linfo.hash)) info = sync.providers[REMOTE].info_path(remote_path1) check = BytesIO() sync.providers[REMOTE].download(info.oid, check) assert check.getvalue() == b"hello2" sync.state.assert_index_is_correct() def test_sync_rm(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) sync.providers[LOCAL].delete(linfo.oid) sync.change_state(LOCAL, FILE, linfo.oid, exists=False) sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state.assert_index_is_correct() def test_sync_mkdir(sync): local_dir1 = "/local" local_path1 = "/local/stuff" remote_dir1 = "/remote" remote_path1 = "/remote/stuff" local_dir_oid1 = sync.providers[LOCAL].mkdir(local_dir1) local_path_oid1 = sync.providers[LOCAL].mkdir(local_path1) # inserts info about some local path sync.change_state(LOCAL, DIRECTORY, path=local_dir1, oid=local_dir_oid1) sync.change_state(LOCAL, DIRECTORY, path=local_path1, oid=local_path_oid1) sync.run_until_found((REMOTE, remote_dir1)) sync.run_until_found((REMOTE, remote_path1)) log.debug("BEFORE DELETE\n %s", sync.state.pretty_print()) sync.providers[LOCAL].delete(local_path_oid1) sync.change_state(LOCAL, FILE, local_path_oid1, exists=False) log.debug("AFTER DELETE\n %s", sync.state.pretty_print()) log.debug("wait for delete") sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state.assert_index_is_correct() def test_sync_conflict_simul(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo = sync.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(REMOTE, FILE, path=remote_path1, oid=rinfo.oid, hash=rinfo.hash) # one of them is a conflict sync.run(until=lambda: sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") or sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") ) sync.run_until_found( (REMOTE, "/remote/stuff1"), (LOCAL, "/local/stuff1") ) sync.providers[LOCAL].log_debug_state("LOCAL") sync.providers[REMOTE].log_debug_state("REMOTE") b1 = BytesIO() b2 = BytesIO() if sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted"): sync.providers[REMOTE].download_path("/remote/stuff1.conflicted", b1) sync.providers[REMOTE].download_path("/remote/stuff1", b2) else: sync.providers[LOCAL].download_path("/local/stuff1.conflicted", b2) sync.providers[LOCAL].download_path("/local/stuff1", b1) # both files are intact assert b1.getvalue() != b2.getvalue() assert b1.getvalue() in (b"hello", b"goodbye") assert b2.getvalue() in (b"hello", b"goodbye") sync.state.assert_index_is_correct() MERGE = 2 @pytest.mark.parametrize("keep", [True, False]) @pytest.mark.parametrize("side", [LOCAL, REMOTE, MERGE]) def test_sync_conflict_resolve(sync, side, keep): data = (b"hello", b"goodbye", b"merge") def resolver(f1, f2): if side == MERGE: return (BytesIO(data[MERGE]), keep) if f1.side == side: return (f1, keep) return (f2, keep) sync.set_resolver(resolver) remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(data[LOCAL])) rinfo = sync.providers[REMOTE].create(remote_path1, BytesIO(data[REMOTE])) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(REMOTE, FILE, path=remote_path1, oid=rinfo.oid, hash=rinfo.hash) # ensure events are flushed a couple times sync.run(until=lambda: not sync.state.has_changes(), timeout=1) sync.providers[LOCAL].log_debug_state("LOCAL") sync.providers[REMOTE].log_debug_state("REMOTE") b1 = BytesIO() b2 = BytesIO() sync.providers[REMOTE].download_path("/remote/stuff1", b2) sync.providers[LOCAL].download_path("/local/stuff1", b1) # both files are intact assert b1.getvalue() == data[side] assert b2.getvalue() == data[side] if not keep: assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") else: assert sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") or sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted.conflicted") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted.conflicted") assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted2") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted2") sync.state.assert_index_is_correct() def test_sync_conflict_path(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff" remote_path1 = "/remote/stuff" local_path2 = "/local/stuff-l" remote_path2 = "/remote/stuff-r" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) rinfo = sync.providers[REMOTE].info_path(remote_path1) assert len(sync.state.get_all()) == 1 ent = sync.state.get_all().pop() sync.providers[REMOTE].log_debug_state("BEFORE") new_oid_l = sync.providers[LOCAL].rename(linfo.oid, local_path2) new_oid_r = sync.providers[REMOTE].rename(rinfo.oid, remote_path2) sync.providers[REMOTE].log_debug_state("AFTER") sync.change_state(LOCAL, FILE, path=local_path2, oid=new_oid_l, hash=linfo.hash, prior_oid=linfo.oid) assert len(sync.state.get_all()) == 1 assert ent[REMOTE].oid == new_oid_r sync.change_state(REMOTE, FILE, path=remote_path2, oid=new_oid_r, hash=rinfo.hash, prior_oid=rinfo.oid) assert len(sync.state.get_all()) == 1 log.debug("TABLE 0:\n%s", sync.state.pretty_print()) # currently defers to the alphabetcially greater name, rather than conflicting sync.run_until_found((LOCAL, "/local/stuff-r")) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) assert not sync.providers[LOCAL].exists_path(local_path1) assert not sync.providers[LOCAL].exists_path(local_path2) sync.state.assert_index_is_correct() def test_sync_cycle(sync): sync: SyncMgrMixin l_parent = "/local" r_parent = "/remote" lp1, lp2, lp3 = "/local/a", "/local/b", "/local/c", rp1, rp2, rp3 = "/remote/a", "/remote/b", "/remote/c", templ = "/local/d" sync.providers[LOCAL].mkdir(l_parent) sync.providers[REMOTE].mkdir(r_parent) linfo1 = sync.providers[LOCAL].create(lp1, BytesIO(b"hello1")) sync.change_state(LOCAL, FILE, path=lp1, oid=linfo1.oid, hash=linfo1.hash) sync.run_until_found((REMOTE, rp1), timeout=1) rinfo1 = sync.providers[REMOTE].info_path(rp1) linfo2 = sync.providers[LOCAL].create(lp2, BytesIO(b"hello2")) sync.change_state(LOCAL, FILE, path=lp2, oid=linfo2.oid, hash=linfo2.hash) sync.run_until_found((REMOTE, rp2)) rinfo2 = sync.providers[REMOTE].info_path(rp2) linfo3 = sync.providers[LOCAL].create(lp3, BytesIO(b"hello3")) sync.change_state(LOCAL, FILE, path=lp3, oid=linfo3.oid, hash=linfo3.hash) sync.run_until_found((REMOTE, rp3)) rinfo3 = sync.providers[REMOTE].info_path(rp3) sync.providers[REMOTE].log_debug_state("BEFORE") tmp1oid = sync.providers[LOCAL].rename(linfo1.oid, templ) lp1oid = sync.providers[LOCAL].rename(linfo3.oid, lp1) lp3oid = sync.providers[LOCAL].rename(linfo2.oid, lp3) lp2oid = sync.providers[LOCAL].rename(tmp1oid, lp2) # a->temp, c->a, b->c, temp->b log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=templ, oid=tmp1oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp1, oid=lp1oid, hash=linfo3.hash, prior_oid=linfo3.oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp3, oid=lp3oid, hash=linfo2.hash, prior_oid=linfo2.oid) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp2, oid=lp2oid, hash=linfo1.hash, prior_oid=tmp1oid) log.debug("TABLE 4:\n%s", sync.state.pretty_print()) assert len(sync.state.get_all()) == 3 sync.providers[REMOTE].log_debug_state("MIDDLE") sync.run(until=lambda: not sync.state.has_changes(), timeout=1) sync.providers[REMOTE].log_debug_state("AFTER") i1 = sync.providers[REMOTE].info_path(rp1) i2 = sync.providers[REMOTE].info_path(rp2) i3 = sync.providers[REMOTE].info_path(rp3) assert i1 and i2 and i3 assert i1.hash == rinfo3.hash assert i2.hash == rinfo1.hash assert i3.hash == rinfo2.hash def test_sync_conflict_path_combine(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path3 = "/local/stuff" remote_path3 = "/remote/stuff" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo2 = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo1.oid, hash=linfo1.hash) sync.change_state(REMOTE, FILE, path=remote_path2, oid=rinfo2.oid, hash=rinfo2.hash) sync.run_until_found((REMOTE, remote_path1), (LOCAL, local_path2)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) new_oid1 = sync.providers[LOCAL].rename(linfo1.oid, local_path3) prior_oid = sync.providers[LOCAL].oid_is_path and linfo1.oid sync.change_state(LOCAL, FILE, path=local_path3, oid=new_oid1, prior_oid=prior_oid) new_oid2 = sync.providers[REMOTE].rename(rinfo2.oid, remote_path3) prior_oid = sync.providers[REMOTE].oid_is_path and rinfo2.oid sync.change_state(REMOTE, FILE, path=remote_path3, oid=new_oid2, prior_oid=prior_oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) ok = lambda: ( sync.providers[REMOTE].exists_path("/remote/stuff.conflicted") or sync.providers[LOCAL].exists_path("/local/stuff.conflicted") ) sync.run(until=ok, timeout=3) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert ok() def test_create_then_move(sync): # TODO: combine with the reverse test remote_parent = "/remote" local_parent = "/local" local_folder = "/local/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_file1, BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=local_file1, oid=linfo1.oid, hash=linfo1.hash) sync.run_until_found((REMOTE, remote_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[LOCAL].mkdir(local_folder) sync.change_state(LOCAL, DIRECTORY, path=local_folder, oid=folder_oid, hash=None) new_oid = sync.providers[LOCAL].rename(linfo1.oid, local_file2) sync.change_state(LOCAL, FILE, path=local_file2, oid=new_oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((REMOTE, remote_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def test_create_then_move_reverse(sync): # TODO: see if this can be combined with the reverse test remote_parent = "/remote" local_parent = "/local" remote_folder = "/remote/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" oid = sync.providers[LOCAL].mkdir(local_parent) oid = sync.providers[REMOTE].mkdir(remote_parent) rinfo1 = sync.providers[REMOTE].create(remote_file1, BytesIO(b"hello")) sync.change_state(REMOTE, FILE, path=remote_file1, oid=rinfo1.oid, hash=rinfo1.hash) sync.run_until_found((LOCAL, local_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[REMOTE].mkdir(remote_folder) sync.change_state(REMOTE, DIRECTORY, path=remote_folder, oid=folder_oid, hash=None) sync.providers[REMOTE].rename(rinfo1.oid, remote_file2) sync.change_state(REMOTE, FILE, path=remote_file2, oid=rinfo1.oid, hash=rinfo1.hash) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((LOCAL, local_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def _test_rename_folder_with_kids(sync, source, dest): parent = ["/local", "/remote"] folder1 = ["/local/folder1", "/remote/folder1"] file1 = ["/local/folder1/file", "/remote/folder1/file"] folder2 = ["/local/folder2", "/remote/folder2"] file2 = ["/local/folder2/file", "/remote/folder2/file"] for loc in (source, dest): sync.providers[loc].mkdir(parent[loc]) folder_oid = sync.providers[source].mkdir(folder1[source]) sync.change_state(source, DIRECTORY, path=folder1[source], oid=folder_oid, hash=None) file_info: OInfo = sync.providers[source].create(file1[source], BytesIO(b"hello")) sync.change_state(source, FILE, path=file1[source], oid=file_info.oid, hash=None) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.run_until_found((dest, folder1[dest])) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) new_oid = sync.providers[source].rename(folder_oid, folder2[source]) sync.change_state(source, DIRECTORY, path=folder2[source], oid=new_oid, hash=None, prior_oid=folder_oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.run_until_found((source, file2[source])) sync.run_until_found((dest, file2[dest])) sync.run_until_found( (source, file2[source]), (dest, file2[dest]), ) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) @pytest.mark.parametrize("ordering", [(LOCAL, REMOTE), (REMOTE, LOCAL)]) def test_rename_folder_with_kids(sync, ordering): _test_rename_folder_with_kids(sync, *ordering) def test_aging(sync): local_parent = "/local" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/file2" remote_file2 = "/remote/file2" sync.providers[LOCAL].mkdir(local_parent) linfo1 = sync.providers[LOCAL].create(local_file1, BytesIO(b"hello")) # aging slows things down sync.aging = 0.2 sync.change_state(LOCAL, FILE, path=local_file1, oid=linfo1.oid, hash=linfo1.hash) sync.do() sync.do() sync.do() log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert not sync.providers[REMOTE].info_path(local_file1) sync.run_until_found((REMOTE, remote_file1), timeout=2) assert sync.providers[REMOTE].info_path(remote_file1) sync.aging = 0 linfo2 = sync.providers[LOCAL].create(local_file2, BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=local_file2, oid=linfo2.oid, hash=linfo2.hash) sync.do() sync.do() sync.do() log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert sync.providers[REMOTE].info_path(remote_file2) # but withotu it, things are fast def test_remove_folder_with_kids(sync): parent = ["/local", "/remote"] folder1 = ["/local/folder1", "/remote/folder1"] file1 = ["/local/folder1/file", "/remote/folder1/file"] for loc in (LOCAL, REMOTE): sync.providers[loc].mkdir(parent[loc]) folder_oid = sync.providers[LOCAL].mkdir(folder1[LOCAL]) sync.change_state(LOCAL, DIRECTORY, path=folder1[LOCAL], oid=folder_oid, hash=None) file_info: OInfo = sync.providers[LOCAL].create(file1[LOCAL], BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=file1[LOCAL], oid=file_info.oid, hash=None) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.run_until_found((REMOTE, file1[REMOTE])) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.providers[LOCAL].delete(file_info.oid) sync.providers[LOCAL].delete(folder_oid) sync.change_state(LOCAL, DIRECTORY, oid=file_info.oid, exists=False) sync.change_state(LOCAL, DIRECTORY, oid=folder_oid, exists=False) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.run_until_found(WaitFor(REMOTE, file1[REMOTE], exists=False), WaitFor(REMOTE, folder1[REMOTE], exists=False)) # TODO: test to confirm that a file that is both a rename and an update will be both renamed and updated # TODO: test to confirm that a sync with an updated path name that is different but matches the old name will be ignored (eg: a/b -> a\b) PKOMM$cloudsync/tests/fixtures/__init__.pyfrom .util import * from .mock_provider import * from .mock_storage import * PKc$O >>)cloudsync/tests/fixtures/mock_provider.pyimport os import time import copy import logging from hashlib import md5 from typing import Dict, List, Any, Optional, Generator import pytest from cloudsync.event import Event from cloudsync.provider import Provider from cloudsync.types import OInfo, OType, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError log = logging.getLogger(__name__) class MockFSObject: # pylint: disable=too-few-public-methods FILE = 'mock file' DIR = 'mock dir' def __init__(self, path, object_type, oid_is_path, contents=None, mtime=None): # self.display_path = path # TODO: used for case insensitive file systems if contents is None and type == MockFSObject.FILE: contents = b"" self.path = path.rstrip("/") self.contents = contents self.oid = path if oid_is_path else str(id(self)) self.exists = True self.type = object_type if self.type == self.FILE: # none is not valid for empty file if self.contents is None: self.contents = b'' self.mtime = mtime or time.time() @property def otype(self): if self.type == self.FILE: return OType.FILE else: return OType.DIRECTORY def hash(self) -> Optional[bytes]: if self.type == self.DIR: return None return md5(self.contents).digest() def update(self): self.mtime = time.time() def copy(self): return copy.copy(self) class MockEvent: # pylint: disable=too-few-public-methods ACTION_CREATE = "provider create" ACTION_RENAME = "provider rename" ACTION_UPDATE = "provider modify" ACTION_DELETE = "provider delete" def __init__(self, action, target_object: MockFSObject, prior_oid=None): self._target_object = copy.copy(target_object) self._action = action self._prior_oid = prior_oid self._timestamp = time.time() def serialize(self): ret_val = {"action": self._action, "id": self._target_object.oid, "object type": self._target_object.type, "path": self._target_object.path, "mtime": self._target_object.mtime, "prior_oid": self._prior_oid, "trashed": not self._target_object.exists, } return ret_val class MockProvider(Provider): default_sleep = 0.01 connected = True name = "Mock" # TODO: normalize names to get rid of trailing slashes, etc. def __init__(self, oid_is_path: bool, case_sensitive: bool): """Constructor for MockProvider :param oid_is_path: Act as a filesystem or other oid-is-path provider :param case_sensitive: Paths are case sensistive """ super().__init__() log.debug("mock mode: o:%s, c:%s", oid_is_path, case_sensitive) self.oid_is_path = oid_is_path self.case_sensitive = case_sensitive self._fs_by_path: Dict[str, MockFSObject] = {} self._fs_by_oid: Dict[str, MockFSObject] = {} self._events: List[MockEvent] = [] self._latest_cursor = -1 self._cursor = -1 self._type_map = { MockFSObject.FILE: OType.FILE, MockFSObject.DIR: OType.DIRECTORY, } self.event_timeout = 1 self.event_sleep = 0.001 self.creds = {} self.connection_id = os.urandom(2).hex() def _register_event(self, action, target_object, prior_oid=None): event = MockEvent(action, target_object, prior_oid) self._events.append(event) target_object.update() self._latest_cursor = len(self._events) - 1 def _get_by_oid(self, oid): self._api() return self._fs_by_oid.get(oid, None) def normalize(self, path): if self.case_sensitive: return path else: return path.lower() def _get_by_path(self, path): path = self.normalize(path) # TODO: normalize the path, support case insensitive lookups, etc self._api() return self._fs_by_path.get(path, None) def _store_object(self, fo: MockFSObject): # TODO: support case insensitive storage assert fo.path == fo.path.rstrip("/") self._fs_by_path[self.normalize(fo.path)] = fo self._fs_by_oid[fo.oid] = fo def _unstore_object(self, fo: MockFSObject): # TODO: do I need to check if the path and ID exist before del to avoid a key error, # or perhaps just catch and swallow that exception? del self._fs_by_path[self.normalize(fo.path)] del self._fs_by_oid[fo.oid] def _translate_event(self, pe: MockEvent, cursor) -> Event: event = pe.serialize() provider_type = event.get("object type", None) standard_type = self._type_map.get(provider_type, None) assert standard_type oid = event.get("id", None) mtime = event.get("mtime", None) trashed = event.get("trashed", None) prior_oid = event.get("prior_oid", None) path = None if self.oid_is_path: path = event.get("path") retval = Event(standard_type, oid, path, None, not trashed, mtime, prior_oid, new_cursor=cursor) return retval def _api(self, *args, **kwargs): pass @property def latest_cursor(self): return self._latest_cursor @property def current_cursor(self): return self._cursor def events(self) -> Generator[Event, None, None]: self._api() while self._cursor < self._latest_cursor: self._cursor += 1 pe = self._events[self._cursor] yield self._translate_event(pe, self._cursor) def walk(self, path, since=None): # TODO: implement "since" parameter self._api() for obj in self._fs_by_oid.values(): if self.is_subpath(path, obj.path, strict=False): yield Event(obj.otype, obj.oid, obj.path, obj.hash(), obj.exists, obj.mtime) def upload(self, oid, file_like, metadata=None) -> OInfo: self._api() file = self._fs_by_oid.get(oid, None) if file is None or not file.exists: raise CloudFileNotFoundError(oid) if file.type != MockFSObject.FILE: raise CloudFileExistsError("Only files may be uploaded, and %s is not a file" % file.path) contents = file_like.read() file.contents = contents self._register_event(MockEvent.ACTION_UPDATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def listdir(self, oid) -> Generator[DirInfo, None, None]: folder_obj = self._get_by_oid(oid) if not (folder_obj and folder_obj.exists and folder_obj.type == MockFSObject.DIR): raise CloudFileNotFoundError(oid) path = folder_obj.path for obj in self._fs_by_oid.values(): if obj.exists: relative = self.is_subpath(path, obj.path, strict=True) if relative: relative = relative.lstrip("/") if "/" not in relative: yield DirInfo(otype=obj.otype, oid=obj.oid, hash=obj.hash(), path=obj.path, name=relative) def create(self, path, file_like, metadata=None) -> OInfo: # TODO: store the metadata self._api() file = self._get_by_path(path) if file is not None and file.exists: raise CloudFileExistsError("Cannot create, '%s' already exists" % file.path) self._verify_parent_folder_exists(path) if file is None or not file.exists: file = MockFSObject(path, MockFSObject.FILE, self.oid_is_path) self._store_object(file) file.contents = file_like.read() file.exists = True log.debug("created %s %s", file.oid, file.type) self._register_event(MockEvent.ACTION_CREATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def download(self, oid, file_like): self._api() file = self._fs_by_oid.get(oid, None) if file is None or file.exists is False: raise CloudFileNotFoundError(oid) if file.type == MockFSObject.DIR: raise CloudFileExistsError("is a directory") file_like.write(file.contents) def rename(self, oid, path) -> str: log.debug("renaming %s -> %s", oid, path) self._api() # TODO: folders are implied by the path of the file... # actually check to make sure the folder exists and raise a FileNotFound if not object_to_rename = self._fs_by_oid.get(oid, None) if not (object_to_rename and object_to_rename.exists): raise CloudFileNotFoundError(oid) possible_conflict = self._get_by_path(path) if possible_conflict and possible_conflict.oid == oid: possible_conflict = None self._verify_parent_folder_exists(path) if possible_conflict and possible_conflict.exists: if possible_conflict.type != object_to_rename.type: log.debug("rename %s:%s conflicts with existing object of another type", oid, object_to_rename.path) raise CloudFileExistsError(path) if possible_conflict.type == MockFSObject.DIR: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError(path) except StopIteration: pass # Folder is empty, rename over it no problem else: raise CloudFileExistsError(path) log.debug("secretly deleting empty folder %s", path) self.delete(possible_conflict.oid) if object_to_rename.path == path: return oid prior_oid = None if self.oid_is_path: prior_oid = object_to_rename.oid if object_to_rename.type == MockFSObject.FILE: self._rename_single_object(object_to_rename, path, event=True) else: # object to rename is a directory old_path = object_to_rename.path for obj in set(self._fs_by_oid.values()): if self.is_subpath(old_path, obj.path, strict=True): new_obj_path = self.replace_path(obj.path, old_path, path) self._rename_single_object(obj, new_obj_path, event=False) # only parent generates event self._rename_single_object(object_to_rename, path, event=True) if self.oid_is_path: assert object_to_rename.oid != prior_oid, "rename %s to %s" % (prior_oid, path) else: assert object_to_rename.oid == oid, "rename %s to %s" % (object_to_rename.oid, oid) return object_to_rename.oid def _rename_single_object(self, source_object: MockFSObject, destination_path, *, event): destination_path = destination_path.rstrip("/") # This will assume all validation has already been done, and just rename the thing # without trying to rename contents of folders, just rename the object itself log.debug("renaming %s to %s", source_object.path, destination_path) prior_oid = source_object.oid if self.oid_is_path else None self._unstore_object(source_object) source_object.path = destination_path if self.oid_is_path: source_object.oid = destination_path self._store_object(source_object) self._register_event(MockEvent.ACTION_RENAME, source_object, prior_oid) log.debug("rename complete %s", source_object.path) self.log_debug_state() def mkdir(self, path) -> str: self._api() self._verify_parent_folder_exists(path) file = self._get_by_path(path) if file and file.exists: if file.type == MockFSObject.FILE: raise CloudFileExistsError(path) else: log.debug("Skipped creating already existing folder: %s", path) return file.oid new_fs_object = MockFSObject(path, MockFSObject.DIR, self.oid_is_path) self._store_object(new_fs_object) self._register_event(MockEvent.ACTION_CREATE, new_fs_object) return new_fs_object.oid def delete(self, oid): log.debug("delete %s", oid) self._api() file = self._fs_by_oid.get(oid, None) log.debug("got %s", file) if file and file.exists: if file.otype == OType.DIRECTORY: try: next(self.listdir(file.oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, file.path)) except StopIteration: pass # Folder is empty, delete it no problem else: path = file.path if file else "" log.debug("Deleting non-existent oid %s:%s ignored", oid, path) return file.exists = False self._register_event(MockEvent.ACTION_DELETE, file) def exists_oid(self, oid): self._api() file = self._fs_by_oid.get(oid, None) return file is not None and file.exists def exists_path(self, path) -> bool: self._api() file = self._get_by_path(path) return file is not None and file.exists def hash_oid(self, oid) -> Any: file = self._fs_by_oid.get(oid, None) if file and file.exists: return file.hash() else: return None @staticmethod def hash_data(file_like) -> bytes: return md5(file_like.read()).digest() def info_path(self, path: str) -> Optional[OInfo]: self._api() file: MockFSObject = self._get_by_path(path) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: self._api() file: MockFSObject = self._fs_by_oid.get(oid, None) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) # @staticmethod # def _slurp(path): # with open(path, "rb") as x: # return x.read() # # @staticmethod # def _burp(path, contents): # with open(path, "wb") as x: # x.write(contents) def log_debug_state(self, msg=""): log.debug("%s: mock provider state %s", msg, list(self.walk("/"))) ################### def mock_provider_instance(oid_is_path, case_sensitive): prov = MockProvider(oid_is_path, case_sensitive) prov.event_timeout = 1 prov.event_sleep = 0.001 prov.creds = {} return prov @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider(request): return mock_provider_instance(*request.param) @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider_generator(request): return lambda oid_is_path=None, case_sensitive=None: \ mock_provider_instance( request.param[0] if oid_is_path is None else oid_is_path, request.param[1] if case_sensitive is None else case_sensitive) @pytest.fixture def mock_provider_creator(): return mock_provider_instance def test_mock_basic(): """ basic spot-check, more tests are in test_providers with mock as one of the providers """ from io import BytesIO m = MockProvider(False, True) info = m.create("/hi.txt", BytesIO(b'hello')) assert info.hash assert info.oid b = BytesIO() m.download(info.oid, b) assert b.getvalue() == b'hello' PKc$O)(cloudsync/tests/fixtures/mock_storage.pyfrom threading import Lock from typing import Dict, Any, Tuple, Optional import logging from cloudsync import Storage, LOCAL, REMOTE log = logging.getLogger(__name__) class MockStorage(Storage): # Does not actually persist the data... but it's just a mock top_lock = Lock() lock_dict = dict() def __init__(self, storage_dict: Dict[str, Dict[int, bytes]]): self.storage_dict = storage_dict self.cursor: int = 0 # the next eid def _get_internal_storage(self, tag: str) -> Tuple[Lock, Dict[int, bytes]]: with self.top_lock: lock: Lock = self.lock_dict.setdefault(tag, Lock()) return lock, self.storage_dict.setdefault(tag, dict()) def create(self, tag: str, serialization: bytes) -> Any: lock, storage = self._get_internal_storage(tag) with lock: current_index = self.cursor self.cursor += 1 storage[current_index] = serialization return current_index def update(self, tag: str, serialization: bytes, eid: Any): lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) storage[eid] = serialization return 1 def delete(self, tag: str, eid: Any): lock, storage = self._get_internal_storage(tag) log.debug("deleting eid%s", eid) with lock: if eid not in storage: log.debug("ignoring delete: id %s doesn't exist" % eid) return del storage[eid] def read_all(self, tag: str) -> Dict[Any, bytes]: lock, storage = self._get_internal_storage(tag) with lock: ret: Dict[Any, bytes] = storage.copy() return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) return storage[eid] PKOoo cloudsync/tests/fixtures/util.pyimport pytest import os import tempfile import shutil from inspect import getframeinfo, stack import logging from cloudsync.provider import Provider log = logging.getLogger(__name__) log.setLevel(logging.INFO) class Util: def __init__(self): self.base = tempfile.mkdtemp(suffix=".cloudsync") log.debug("temp files will be in: %s", self.base) @staticmethod def get_context(level): caller = getframeinfo(stack()[level+1][0]) return caller def temp_file(self, *, fill_bytes=None): # pretty names for temps caller = self.get_context(1) fn = os.path.basename(caller.filename) if not fn: fn = "unk" else: fn = os.path.splitext(fn)[0] func = caller.function name = fn + '-' + func + "." + os.urandom(16).hex() fp = Provider.join(self.base, name) if fill_bytes is not None: with open(fp, "wb") as f: f.write(os.urandom(fill_bytes)) log.debug("temp file %s", fp) return fp def do_cleanup(self): shutil.rmtree(self.base) @pytest.fixture(scope="module") def util(request): # user can override at the module level or class level # if tehy want to look at the temp files made cleanup = getattr(getattr(request, "cls", None), "util_cleanup", True) if cleanup: cleanup = getattr(request.module, "util_cleanup", True) u = Util() yield u if cleanup: u.do_cleanup() def test_util(util): log.setLevel(logging.DEBUG) f = util.temp_file(fill_bytes=32) assert len(open(f, "rb").read()) == 32 PKO%cloudsync/tests/providers/__init__.pyPKO?~%%$cloudsync/tests/providers/dropbox.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.dropbox import DropboxProvider def dropbox_creds(): token_set = os.environ.get("DROPBOX_TOKEN") if not token_set: return None tokens = token_set.split(",") creds = { "key": tokens[random.randrange(0, len(tokens))], } return creds def dropbox_provider(): cls = DropboxProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = dropbox_creds() return cls() @pytest.fixture def cloudsync_provider(): return dropbox_provider() def test_connect(): creds = dropbox_creds() if not creds: pytest.skip('requires dropbox token and client secret') sync_root = "/" + os.urandom(16).hex() gd = DropboxProvider(sync_root) gd.connect(creds) assert gd.client quota = gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass PKc$Oʮ#cloudsync/tests/providers/gdrive.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.gdrive import GDriveProvider # move this to provider ci_creds() function? def gdrive_creds(): token_set = os.environ.get("GDRIVE_TOKEN") cli_sec = os.environ.get("GDRIVE_CLI_SECRET") if not token_set or not cli_sec: return None tokens = token_set.split(",") creds = { "refresh_token": tokens[random.randrange(0, len(tokens))], "client_secret": cli_sec, "client_id": '433538542924-ehhkb8jn358qbreg865pejbdpjnm31c0.apps.googleusercontent.com', } return creds def gdrive_provider(): cls = GDriveProvider cls.event_timeout = 60 cls.event_sleep = 2 cls.creds = gdrive_creds() return cls() @pytest.fixture def cloudsync_provider(): gdrive_provider() def connect_test(want_oauth: bool): creds = gdrive_creds() if not creds: pytest.skip('requires gdrive token and client secret') if want_oauth: creds.pop("refresh_token", None) # triggers oauth to get a new refresh token sync_root = "/" + os.urandom(16).hex() gd = GDriveProvider() gd.connect(creds) assert gd.client gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass def test_connect(): connect_test(False) @pytest.mark.manual def test_oauth_connect(): connect_test(True) PKc$O2!cloudsync/tests/providers/mock.pyimport pytest from ..fixtures.mock_provider import MockProvider @pytest.fixture def cloudsync_provider(): cls = MockProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = {} return cls PK!H!J4P+cloudsync-0.1.17.dist-info/entry_points.txtN+I/N.,()J/M)Kr3%%H ~ Everyone is permitted to copy and distribute verbatim copies of this licensedocument, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, “this License” refers to version 3 of the GNU Lesser General Public License, and the “GNU GPL” refers to version 3 of the GNU General Public License. “The Library” refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An “Application” is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A “Combined Work” is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the “Linked Version”. The “Minimal Corresponding Source” for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The “Corresponding Application Code” for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License “or any later version” applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. PK!HPO cloudsync-0.1.17.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H"(!w}#cloudsync-0.1.17.dist-info/METADATAUMoFW 苔dd-BE ;p*P;X#qfwΒD!Iy'?" sNBԘC)Q>_Ǧ= Z:p6 ;D +=Xɒ~#6|qym8Y ݊Z(UdgqsYvuߋn;ޘ`-ʡ]c=ix 5ҢKGs¿nҟ{ts ;Oxb[^GwJ8'OVb M)QydBClB@|y&CWsh5Z`٬,NA JxƊzs7 7pwI^@05is\>w&T6s9kiHq](. X1JKfE_mf%4̍!vSZW ц'T6M?^ am)8e.\_;|] 5r#*KZ+ h '4[*jBNKOa$By썘9/} IX.\"Ӥƨۼ yAal6g(0Y!O7="coՓ, ]B@G#GqJgimDj7.mFuz6޼dln:)7.t$ҵ/#̞8`A1l~%R~*" }R eAE#l~;߁>rvC>#b9Tմh%q¿HK@d;kfMm0!ĦL.1B}&cGvPoùם)3x Y=hX8P͓\ټ:C<5~ER8^:w+@(<6~|ilw]-2}1S71{JoeW&Ֆvnף v;"E(8P$O$.4n|y})#R* "l`!m |s8eTk̂kcy,A "cc5Ӓ{P`G\/)Hؙ=4RV`a ~s:_P#[#G[xI8 @bܵϧX3AK [65 zOVl (Awl@Lg-X( Ju:ĢJg5RՕrH|_-! PRPd' /Nkut ʮUH/(uVR ??ha[#(lXQvUSJHq0{(F|b,S$(k*ֵu#mtj b#'sfg?`oy=TʠU|1lmw .63YeLlGu|i2)af-+2KqWO6@~tqUNlHԁgɭxh@>I!i䄲+d+0~s~ڹg*TI'ѪmT1u$Sr!9Ailv{~ϩ}:xK['9#^DAn{gr]crz.%6KtSjK/6wzj5S+—!˖[ɒOʠ/f7 癫V-\M  bp uNJU|qg,AHTYGvPlfOz2%H+xF9$~?`GRN3XY{ʟd__Pj%B"lb,M>C>t͈T}w" 7O ~j}--lJ{)؛n X^_(av.حH ֣t Q2¬Gܕ uaLkU9fi-W^71tmVWe-^C~/Ompg`PKc$OZcloudsync/__init__.pyPKc$O=h##Ccloudsync/apiserver.pyPKc$Os+ 'cloudsync/command.pyPKc$Op&& )cloudsync/cs.pyPKc$O2;?`;cloudsync/event.pyPKOá2;;NJcloudsync/exceptions.pyPKc$OÖ&##Kcloudsync/log.pyPKc$OC4Mcloudsync/muxer.pyPKc$OPLLScloudsync/oauth_redir_server.pyPKc$OP ~ccloudsync/provider.pyPKc$OC@YY6cloudsync/runnable.pyPKc$OdmŒcloudsync/scramble.pyPKc$OUoȷcloudsync/types.pyPKc$O 牀cloudsync/providers/__init__.pyPKc$O5>KK]cloudsync/providers/dropbox.pyPKc$Oboo<cloudsync/providers/gdrive.pyPKO_lNcloudsync/sync/__init__.pyPKd$O{{Ncloudsync/sync/manager.pyPKc$O~* * cloudsync/sync/sqlite_storage.pyPKc$O $?rrcloudsync/sync/state.pyPKOeEDdcloudsync/sync/util.pyPKO==Becloudsync/tests/__init__.pyPKc$OA%{{ecloudsync/tests/conftest.pyPKc$Ov&blhcloudsync/tests/pytest.iniPKc$O3(_R_Ricloudsync/tests/test_cs.pyPKc$O l8cloudsync/tests/test_events.pyPKc$O|Vcloudsync/tests/test_mux.pyPKc$O1PϮϮ cloudsync/tests/test_provider.pyPKd$O_xxzcloudsync/tests/test_sync.pyPKOMM$jcloudsync/tests/fixtures/__init__.pyPKc$O >>)cloudsync/tests/fixtures/mock_provider.pyPKc$O)(\2cloudsync/tests/fixtures/mock_storage.pyPKOoo :cloudsync/tests/fixtures/util.pyPKO%eAcloudsync/tests/providers/__init__.pyPKO?~%%$Acloudsync/tests/providers/dropbox.pyPKc$Oʮ#Fcloudsync/tests/providers/gdrive.pyPKc$O2!9Lcloudsync/tests/providers/mock.pyPK!H!J4P+QMcloudsync-0.1.17.dist-info/entry_points.txtPKc$O[0}WW"Mcloudsync-0.1.17.dist-info/LICENSEPK!HPO emcloudsync-0.1.17.dist-info/WHEELPK!H"(!w}#mcloudsync-0.1.17.dist-info/METADATAPK!HUVRk !qcloudsync-0.1.17.dist-info/RECORDPK**! [y