PKPO[cloudsync/__init__.py""" cloudsync enables simple cloud file-level sync with a variety of cloud providers External modules: cloudsync.Event cloudsync.Provider cloudsync.Sync Example: import cloudsync prov = cloudsync.Provider('GDrive', token="237846234236784283") info = prov.upload(file, "/dest") print ("id of /dest is %s, hash of /dest is %s" % (info.id, info.hash)) Command-line example: cloudsync -p gdrive --token "236723782347823642786" -f ~/gdrive-folder --daemon """ __version__ = "0.1.9" # must be imported first from .log import logger # import modules into top level for convenience from .provider import * from .event import * from .sync import * from .exceptions import * from .types import * from .cs import * from .command import main if __name__ == "__main__": main() PKO=h##cloudsync/apiserver.pyimport re import json import traceback import time import urllib.parse as urlparse import threading import logging from enum import Enum import unittest # TODO: this is an inappropriate default server, default should be wsgiref builtin import waitress # TODO: caller should specify the mechanism for channel empty detection from waitress.channel import HTTPChannel log = logging.getLogger(__name__) class ApiServerLogLevel(Enum): NONE = 0 # do not log calls CALLS = 1 # log calls but not their args ARGS = 2 # log calls with args class ApiError(Exception): def __init__(self, code, msg=None, desc=None): super().__init__() self.code = code self.msg = str(msg) or "UNKNOWN" self.desc = desc def __str__(self): return f"{self.code}, {self.msg}" @classmethod def from_json(cls, error_json): return cls(error_json.get('code', 500), msg=error_json.get('msg', None), desc=error_json.get('desc', None)) def api_route(path): def outer(func): if not hasattr(func, "_routes"): setattr(func, "_routes", []) func._routes += [path] return func return outer def sanitize_for_status(e): e = e.replace("\r\n", " ") e = e.replace("\n", " ") e = e.replace("\r", " ") e = e[0:100] return e class ApiServer: def __init__(self, addr, port, headers=None, log_level=ApiServerLogLevel.ARGS): """ Create a new server on address, port. Port can be zero. from apiserver import ApiServer, ApiError, api_route Create your handlers by inheriting from ApiServer and tagging them with @api_route("/path"). Alternately you can use the ApiServer() directly, and call add_handler("path", function) Raise errors by raising ApiError(code, message, description=None) Return responses by simply returning a dict() or str() object Parameter to handlers is a dict() Query arguments are shoved into the dict via urllib.parse_qs """ self.__addr = addr self.__port = port self.__headers = headers if headers else [] self.__log_level = log_level self.__server = waitress.server.create_server(self, host=self.__addr, port=self.__port, clear_untrusted_proxy_headers=False) self.__started = False self.__routes = {} # routed methods map into handler for meth in type(self).__dict__.values(): if hasattr(meth, "_routes"): for route in meth._routes: # pylint: disable=protected-access self.add_route(route, meth) def add_route(self, path, meth, content_type='application/json'): self.__routes[path] = (meth, content_type) def port(self): """Get my port""" sa = self.__server.socket.getsockname() return sa[1] def address(self): """Get my ip address""" sa = self.__server.socket.getsockname() return sa[0] def uri(self, path): """Make a URI pointing at myself""" if path[0] == "/": path = path[1:] uri = "http://" + self.__addr + ":" + str(self.port()) + "/" + path return uri def serve_forever(self): self.__started = True try: self.__server.run() except OSError: pass def __del__(self): self.shutdown() def shutdown(self): try: if self.__started: timeout = time.time() + 2 for channel in self.__server.active_channels.values(): channel: HTTPChannel while channel.total_outbufs_len > 0 and time.time() < timeout: time.sleep(.01) # give any connections with a non-empty output buffer a chance to drain self.__server.socket.close() self.__server.asyncore.close_all() except Exception: log.exception("exception during shutdown") def __call__(self, env, start_response): # pylint: disable=too-many-locals, too-many-branches, too-many-statements content = b"{}" length = env.get("CONTENT_LENGTH", 0) if length: content = env['wsgi.input'].read(int(length)) if content: try: info = json.loads(content) except Exception: raise ApiError(400, "Invalid JSON " + str(content, "utf-8")) else: info = {} url = '' try: url = env.get('PATH_INFO', '/') if self.__log_level == ApiServerLogLevel.CALLS or self.__log_level == ApiServerLogLevel.ARGS: log.debug('Processing URL %s', url) handler_tmp = self.__routes.get(url) if not handler_tmp: if url[-1] == "/": tmp = url[0:-1] handler_tmp = self.__routes.get(tmp) if not handler_tmp: m = re.match(r"(.*?/)[^/]+$", url) if m: # adding a route "/" handles /foo # adding a route "/foo/bar/" handles /foo/bar/baz handler_tmp = self.__routes.get(m[1]) query = env.get('QUERY_STRING') if query: params = urlparse.parse_qs(query) else: params = {} info.update(params) if handler_tmp: handler, content_type = handler_tmp try: response = handler(env, info) if response is None: response = "" if isinstance(response, dict): response = json.dumps(response) response = bytes(str(response), "utf-8") headers = self.__headers + [('Content-Type', content_type), ("Content-Length", str(len(response)))] start_response('200 OK', headers) yield response except ApiError: raise except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) except Exception as e: raise ApiError(500, type(e).__name__ + " : " + str(e), traceback.format_exc()) else: raise ApiError(404, f"No handler for {url}") except ApiError as e: try: log.error("GET %s : ERROR : %s", url, e) response = json.dumps({"code": e.code, "msg": e.msg, "desc": e.desc}) start_response(str(e.code) + ' ' + sanitize_for_status(e.msg), [('Content-Type', 'application/json'), ("Content-Length", str(len(response)))]) yield bytes(response, "utf-8") except ConnectionAbortedError as e: log.error("GET %s : ERROR : %s", url, e) class TestApiServer(unittest.TestCase): @staticmethod def test_nostart(): httpd = ApiServer('127.0.0.1', 0) httpd.shutdown() def test_basic(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, req): # pylint: disable=no-self-argument,no-self-use return "HERE" + str(req) @api_route("/json") def json(ctx, req): # pylint: disable=no-self-argument,no-self-use _ = req return {"obj": 1} httpd = MyServer('127.0.0.1', 0) httpd.add_route("/foo", lambda ctx, x: "FOO" + x["x"][0]) try: print("serving on ", httpd.address(), httpd.port()) threading.Thread(target=httpd.serve_forever, daemon=True).start() import requests response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.text, "HERE{}") response = requests.post(httpd.uri("/notfound"), data='{}') self.assertEqual(response.status_code, 404) response = requests.get(httpd.uri("/foo?x=4")) self.assertEqual(response.text, "FOO4") finally: httpd.shutdown() def test_error(self): class MyServer(ApiServer): @api_route("/popup") def popup(ctx, unused_req): # pylint: disable=no-self-argument,no-self-use raise ApiError(501, "BLAH") httpd = MyServer('127.0.0.1', 0) try: print("serving on ", httpd.address(), httpd.port()) thread = threading.Thread(target=httpd.serve_forever, daemon=True) thread.start() import requests response = requests.post(httpd.uri("/popup"), data='{}') self.assertEqual(response.status_code, 501) finally: httpd.shutdown() if thread: thread.join(timeout=2) assert not thread.is_alive() if __name__ == "__main__": unittest.main() PKOs+cloudsync/command.py#import argparse import os def main(): # parser = argparse.ArgumentParser(description='Process some integers.') # parser.add_argument('command', type=str, help='Command to run') # parser.add_argument('args', nargs=argparse.REMAINDER) # args = parser.parse_args() raise NotImplementedError() if __name__ == "__main__": import sys sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) main() PKPOcloudsync/cs.pyimport threading import logging from typing import Optional, Tuple from .sync import SyncManager, SyncState, Storage from .runnable import Runnable from .event import EventManager from .provider import Provider from .log import TRACE log = logging.getLogger(__name__) class CloudSync(Runnable): def __init__(self, providers: Tuple[Provider, Provider], roots: Tuple[str, str] = None, storage: Optional[Storage] = None, sleep: Optional[Tuple[int, int]] = None, ): if not roots and self.translate == CloudSync.translate: # pylint: disable=comparison-with-callable raise ValueError("Either override the translate() function, or pass in a pair of roots") self.providers = providers self.roots = roots if sleep is None: sleep = (providers[0].default_sleep, providers[1].default_sleep) # The tag for the SyncState will isolate the state of a pair of providers along with the sync roots state = SyncState(storage, tag=self.storage_label()) smgr = SyncManager(state, providers, self.translate, self.resolve_conflict, sleep=sleep) # for tests, make these accessible self.state = state self.smgr = smgr # the label for each event manager will isolate the cursor to the provider/login combo for that side self.emgrs: Tuple[EventManager, EventManager] = ( EventManager(smgr.providers[0], state, 0, roots[0]), EventManager(smgr.providers[1], state, 1, roots[1]) ) self.sthread = threading.Thread(target=smgr.run, kwargs={'sleep': 0.1}) self.ethreads = ( threading.Thread(target=self.emgrs[0].run, kwargs={'sleep': sleep[0]}), threading.Thread(target=self.emgrs[1].run, kwargs={'sleep': sleep[1]}) ) log.info("initialized sync: %s", self.storage_label()) def lockattr(k, _v): if k not in self.__dict__: raise AttributeError("%s not an attribute" % k) self.__setattr__ = lockattr @property def aging(self): return self.smgr.aging @aging.setter def aging(self, val): self.smgr.aging = val def storage_label(self): # if you're using a pure translate, and not roots, you don't have to override the storage label # just don't resuse storage for the same pair of providers roots = self.roots or ('?', '?') assert self.providers[0].connection_id is not None assert self.providers[1].connection_id is not None return f"{self.providers[0].name}:{self.providers[0].connection_id}:{roots[0]}."\ f"{self.providers[1].name}:{self.providers[1].connection_id}:{roots[1]}" def walk(self): roots = self.roots or ('/', '/') for index, provider in enumerate(self.providers): for event in provider.walk(roots[index]): self.emgrs[index].process_event(event) def translate(self, index, path): if not self.roots: raise ValueError("Override translate function or provide root paths") relative = self.providers[1-index].is_subpath(self.roots[1-index], path) if not relative: log.log(TRACE, "%s is not subpath of %s", path, self.roots[1-index]) return None return self.providers[index].join(self.roots[index], relative) @staticmethod def resolve_conflict(_f1, _f2): # Input: # - f1 and f2 are file-likes that will block on read, and can possibly pull data from the network, internet, etc # - f1 and f2 also support the .path property to get a relative path to the file # - f1 and f2 also support the .side property # # Return Values: # # - A "merged" file-like which should be used as the data to replace both f1/f2 with # - One of f1 or f2, which is selected as the correct version # - "None", meaning there is no good resolution return None def start(self): self.sthread.start() self.ethreads[0].start() self.ethreads[1].start() def stop(self): log.info("stopping sync: %s", self.storage_label()) self.smgr.stop() self.emgrs[0].stop() self.emgrs[1].stop() # for tests, make this manually runnable def do(self): self.smgr.do() self.emgrs[0].do() self.emgrs[1].do() def done(self): self.smgr.done() self.emgrs[0].done() self.emgrs[1].done() PKPO1 cloudsync/event.pyimport logging from typing import TYPE_CHECKING, Optional from dataclasses import dataclass from .runnable import Runnable from .muxer import Muxer from .types import OType if TYPE_CHECKING: from cloudsync.sync import SyncState from cloudsync import Provider log = logging.getLogger(__name__) @dataclass class Event: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id path: Optional[str] # path hash: Optional[bytes] # fsobject hash (better name: ohash) exists: Optional[bool] mtime: Optional[float] = None prior_oid: Optional[str] = None # path basesd systems use this on renames new_cursor: Optional[str] = None class EventManager(Runnable): def __init__(self, provider: "Provider", state: "SyncState", side, walk_root=None): log.debug("provider %s, root %s", provider.name, walk_root) self.provider = provider assert self.provider.connection_id self.label = f"{self.provider.name}:{self.provider.connection_id}" self.events = Muxer(provider.events, restart=True) self.state = state self.side = side self.shutdown = False self._cursor_tag = self.label + "_cursor" self.cursor = self.state.storage_get_cursor(self._cursor_tag) self.walk_root = None if not self.cursor: self.walk_root = walk_root self.cursor = provider.current_cursor if self.cursor: self.state.storage_update_cursor(self._cursor_tag, self.cursor) else: log.debug("retrieved existing cursor %s for %s", self.cursor, self.provider.name) # TODO!!!! provider.current_cursor = self.cursor def do(self): if self.walk_root: log.debug("walking all %s/%s files as events, because no working cursor on startup", self.provider.name, self.walk_root) for event in self.provider.walk(self.walk_root): self.process_event(event) self.walk_root = None for event in self.events: self.process_event(event) current_cursor = self.provider.current_cursor if current_cursor != self.cursor: self.state.storage_update_cursor(self._cursor_tag, current_cursor) self.cursor = current_cursor def _drain(self): # for tests, delete events for _ in self.events: pass def process_event(self, event: Event): log.debug("%s got event %s", self.label, event) path = event.path exists = event.exists otype = event.otype if not event.path and not self.state.lookup_oid(self.side, event.oid): info = self.provider.info_oid(event.oid) if info and info.otype != event.otype: log.warning("provider %s gave a bad event: %s != %s, using %s", self.provider.name, info.path, event.otype, info.otype) if info: path = info.path otype = info.otype else: log.debug("ignoring delete of something that can't exist") return self.state.update(self.side, otype, event.oid, path=path, hash=event.hash, exists=exists, prior_oid=event.prior_oid, provider=self.provider) def stop(self): self.events.shutdown = True self.shutdown = True super().stop() PKOá2;;cloudsync/exceptions.pyclass CloudException(Exception): pass class CloudFileNotFoundError(CloudException): pass class CloudTemporaryError(CloudException): pass class CloudFileExistsError(CloudException): pass class CloudTokenError(CloudException): pass class CloudDisconnectedError(CloudException): pass PKPOÖ&##cloudsync/log.py# add TRACE named level, because libraries need it import logging logger = logging.getLogger(__package__) if isinstance(logging.getLevelName('TRACE'), str): logging.addLevelName(5, 'TRACE') # ses docs, this actually gets a number, because reasons TRACE = logging.getLevelName('TRACE') PKPOC4cloudsync/muxer.pyimport queue from threading import Lock from collections import namedtuple class Muxer: Entry = namedtuple('Entry', 'genref listeners, lock') already = {} top_lock = Lock() def __init__(self, func, key=None, restart=False): self.restart = restart self.func = func self.queue = queue.Queue() self.shutdown = False self.key = key or func with self.top_lock: if self.key not in self.already: self.already[self.key] = self.Entry([func()], [], Lock()) ent = self.already[self.key] self.genref = ent.genref self.lock = ent.lock self.listeners = ent.listeners self.listeners.append(self) def __iter__(self): return self def __next__(self): try: e = self.queue.get_nowait() except queue.Empty: with self.lock: try: e = self.queue.get_nowait() except queue.Empty: try: e = next(self.genref[0]) for other in self.listeners: if other is not self: other.queue.put(e) except StopIteration: if self.restart and not self.shutdown: self.genref[0] = self.func() raise return e def __del__(self): with self.top_lock: try: self.listeners.remove(self) except ValueError: pass if not self.listeners and self.key in self.already: del self.already[self.key] PKOPLLcloudsync/oauth_redir_server.pyimport logging import sys import socket import threading import errno # from src import config from .apiserver import ApiServer # from src.osutil import is_windows # from src.vapiserver import VApiClient log = logging.getLogger(__name__) def is_windows(): return sys.platform in ("win32", "cygwin") class OAuthFlowException(Exception): pass class OAuthBadTokenException(Exception): pass class OAuthRedirServer: PORT_MIN = 52400 PORT_MAX = 52450 GUI_TIMEOUT = 15 def __init__(self, on_success, display_name, use_predefined_ports=False, on_failure=None): self.on_success = on_success self.on_failure = on_failure self.display_name = display_name # self.template = os.path.join(config.get().assetdir, 'oauth_result_template.html') log.debug('Creating oauth redir server') if use_predefined_ports: # Some providers (Dropbox) don't allow us to just use localhost # redirect. For these providers, we define a range of # 127.0.0.1:(PORT_MIN..PORT_MAX) as valid redir URLs for port in range(self.PORT_MIN, self.PORT_MAX): try: if is_windows(): # Windows is dumb and will be weird if we just try to # connect to the port directly. Check to see if the # port is responsive before claiming it as our own try: socket.create_connection(('127.0.0.1', port), 0.001) continue except socket.timeout: pass log.debug('Attempting to start api server on port %d', port) self.api_server = ApiServer('127.0.0.1', port) break except OSError: pass else: self.api_server = ApiServer('127.0.0.1', 0) if not self.api_server: raise OSError(errno.EADDRINUSE, "Unable to open any port in range 52400-52405") self.api_server.add_route('/auth/', self.auth_redir_success, content_type='text/html') self.api_server.add_route('/favicon.ico', lambda x, y: "", content_type='text/html') self.__thread = threading.Thread(target=self.api_server.serve_forever, daemon=True) self.__thread.start() log.info('Listening on %s', self.api_server.uri('/auth/')) def auth_redir_success(self, env, info): log.debug('In auth_redir_success with env=%s, info=%s', env, info) err = "" if info and ('error' in info or 'error_description' in info): err = info['error'] if 'error' in info else \ info['error_description'][0] if isinstance(err, list): err = err[0] if self.on_failure: self.on_failure(err) return self.auth_failure(err) try: self.on_success(auth_dict=info) except OAuthFlowException: log.warning('Got a page request when not in flow', exc_info=True) err = "No pending OAuth. This can happen if you refreshed this tab. " except Exception as e: log.exception('Failed to authenticate') err = 'Unknown error: %s' % e return self.auth_failure(err) if err else self.auth_success() def auth_success(self): return self.display_name + ": OAuth Success" def auth_failure(self, msg): return self.display_name + ": OAuth Failure:" + msg def shutdown(self): if self.api_server: self.api_server.shutdown() self.__thread = None def uri(self, *args, **kwargs): return self.api_server.uri(*args, **kwargs) __all__ = ['OAuthFlowException', 'OAuthBadTokenException', 'OAuthRedirServer'] PKPOw cloudsync/provider.pyfrom abc import ABC, abstractmethod import os import re import logging from typing import TYPE_CHECKING, Generator, Optional from cloudsync.types import OInfo, DIRECTORY, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError if TYPE_CHECKING: from cloudsync.event import Event log = logging.getLogger(__name__) class Provider(ABC): # pylint: disable=too-many-public-methods sep: str = '/' # path delimiter alt_sep: str = '\\' # alternate path delimiter oid_is_path: bool = False case_sensitive: bool = True win_paths: bool = False connection_id: Optional[str] = None default_sleep: int = 0.01 @abstractmethod def _api(self, *args, **kwargs): ... def connect(self, creds): # pylint: disable=unused-argument # some providers don't need connections, so just don't implement/overload this method # providers who implement connections need to set the connection_id to a value # that is unique to each connection, so that connecting to this provider # under multiple userid's will produce different connection_id's. One # suggestion is to just set the connection_id to the user's login_id self.connection_id = os.urandom(16).hex() @property @abstractmethod def name(self): ... @property @abstractmethod def latest_cursor(self): ... @property @abstractmethod def current_cursor(self): ... @abstractmethod def events(self) -> Generator["Event", None, None]: ... @abstractmethod def walk(self, path, since=None): # Test that the root path does not show up in the walk ... @abstractmethod def upload(self, oid, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def create(self, path, file_like, metadata=None) -> 'OInfo': ... @abstractmethod def download(self, oid, file_like): ... @abstractmethod def rename(self, oid, path) -> str: # TODO: test that a renamed file can be renamed again # TODO: test that renaming a folder renames the children in the state file ... @abstractmethod def mkdir(self, path) -> str: ... @abstractmethod def delete(self, oid): ... @abstractmethod def exists_oid(self, oid): ... @abstractmethod def exists_path(self, path) -> bool: ... @abstractmethod def listdir(self, oid) -> Generator[DirInfo, None, None]: ... def hash_oid(self, oid) -> Optional[bytes]: # TODO add a test to FNFE info = self.info_oid(oid) return info.hash if info else None @staticmethod @abstractmethod def hash_data(file_like) -> bytes: ... @abstractmethod def info_path(self, path: str) -> Optional[OInfo]: ... @abstractmethod def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: ... # CONVENIENCE def download_path(self, path, io): info = self.info_path(path) if not info or not info.oid: raise CloudFileNotFoundError() self.download(info.oid, io) # HELPER @classmethod def join(cls, *paths): res = "" rl = [] for path in paths: if not path or path == cls.sep: continue if isinstance(path, str): rl = rl + [path.strip(cls.sep).strip(cls.alt_sep)] continue for sub_path in path: if sub_path is None or sub_path == cls.sep or sub_path == cls.alt_sep: continue rl = rl + [sub_path.strip(cls.sep)] if not rl: return cls.sep res = cls.sep.join(rl) if not cls.win_paths or res[1] != ':': res = cls.sep + res return res def split(self, path): # todo cache regex index = path.rfind(self.sep) if index == -1 and self.alt_sep: index = path.rfind(self.alt_sep) if index == -1: return path, "" if index == 0: return self.sep, path[index+1:] return path[:index], path[index+1:] def normalize_path(self, path: str): norm_path = path.rstrip(self.sep) if self.sep in ["\\", "/"]: parts = re.split(r'[\\/]+', norm_path) else: parts = re.split(r'[%s]+' % self.sep, norm_path) norm_path = self.join(*parts) if not self.case_sensitive: norm_path = norm_path.lower() return norm_path def is_subpath(self, folder, target, sep=None, alt_sep=None, strict=False): sep = sep or self.sep alt_sep = alt_sep or self.alt_sep if alt_sep: folder = folder.replace(alt_sep, sep) target = target.replace(alt_sep, sep) # Will return True for is-same-path in addition to target folder_full = str(folder) folder_full = folder_full.rstrip(sep) target_full = str(target) target_full = target_full.rstrip(sep) # .lower() instead of normcase because normcase will also mess with separators if not self.case_sensitive: folder_full_case = folder_full.lower() target_full_case = target_full.lower() else: folder_full_case = folder_full target_full_case = target_full # target is same as folder, or target is a subpath (ensuring separator is there for base) if folder_full_case == target_full_case: return False if strict else sep elif len(target_full) > len(folder_full) and target_full[len(folder_full)] == sep: if target_full_case.startswith(folder_full_case): return target_full[len(folder_full):] else: return False return False def replace_path(self, path, from_dir, to_dir): relative = self.is_subpath(from_dir, path) if relative: retval = to_dir + (relative if relative != self.sep else "") return retval if relative != "" else self.sep raise ValueError("replace_path used without subpath") def paths_match(self, patha, pathb): if patha is None and pathb is None: return True elif patha is None or pathb is None: return False return self.normalize_path(patha) == self.normalize_path(pathb) def dirname(self, path: str): ret, _ = self.split(path) return ret def basename(self, path: str): _, ret = self.split(path) return ret def _verify_parent_folder_exists(self, path): parent_path = self.dirname(path) if parent_path != self.sep: parent_obj = self.info_path(parent_path) if parent_obj is None: # perhaps this should separate "FileNotFound" and "non-folder parent exists" # and raise different exceptions raise CloudFileNotFoundError(parent_path) if parent_obj.otype != DIRECTORY: raise CloudFileExistsError(parent_path) def mkdirs(self, path): log.debug("mkdirs %s", path) try: oid = self.mkdir(path) # todo update state except CloudFileExistsError: # todo: mabye CloudFileExistsError needs to have an oid and/or path in it # at least optionally info = self.info_path(path) if info and info.otype == DIRECTORY: oid = info.oid else: raise except CloudFileNotFoundError: ppath, _ = self.split(path) if ppath == path: raise log.debug("mkdirs parent, %s", ppath) unused_oid = self.mkdirs(ppath) try: oid = self.mkdir(path) # todo update state except CloudFileNotFoundError: # when syncing, file exists seems to hit better conflict code for these sorts of situations # but this is a guess. todo: scenarios that make this happen raise CloudFileExistsError("f'ed up mkdir") return oid PKPOC@YYcloudsync/runnable.pyimport time from abc import ABC, abstractmethod import threading import logging log = logging.getLogger(__name__) def time_helper(timeout, sleep=None, multiply=1): forever = not timeout end = forever or time.monotonic() + timeout while forever or end >= time.monotonic(): yield True if sleep is not None: time.sleep(sleep) sleep = sleep * multiply class Runnable(ABC): stopped = False wakeup = False def run(self, *, timeout=None, until=None, sleep=0.01): self.stopped = False self.wakeup = False endtime = sleep + time.monotonic() for _ in time_helper(timeout, sleep=.01): while time.monotonic() < endtime and not self.stopped and not self.wakeup: time.sleep(max(0, min(.01, endtime - time.monotonic()))) self.wakeup = False if self.stopped: break try: self.do() except Exception: log.exception("unhandled exception in %s", self.__class__) if self.stopped or (until is not None and until()): break endtime = sleep + time.monotonic() if self.stopped: self.done() def wake(self): self.wakeup = True @abstractmethod def do(self): ... def stop(self): self.stopped = True # pylint: disable=attribute-defined-outside-init def done(self): pass def test_runnable(): class TestRun(Runnable): def __init__(self): self.cleaned = False self.called = 0 def do(self): self.called += 1 def done(self): self.cleaned = True testrun = TestRun() testrun.run(timeout=0.02, sleep=0.001) assert testrun.called testrun.called = 0 testrun.run(until=lambda: testrun.called == 1) assert testrun.called == 1 thread = threading.Thread(target=testrun.run) thread.start() testrun.stop() thread.join(timeout=1) assert testrun.stopped == 1 PKPOdmcloudsync/scramble.pyimport random def scramble(gen, buffer_size): buf = [] i = iter(gen) while True: try: e = next(i) buf.append(e) if len(buf) >= buffer_size: choice = random.randint(0, len(buf)-1) buf[-1], buf[choice] = buf[choice], buf[-1] yield buf.pop() except StopIteration: random.shuffle(buf) yield from buf return PKOUoȷcloudsync/types.pyfrom typing import Optional from enum import Enum from dataclasses import dataclass class OType(Enum): DIRECTORY = "dir" FILE = "file" NOTKNOWN = "trashed" DIRECTORY = OType.DIRECTORY FILE = OType.FILE NOTKNOWN = OType.NOTKNOWN # only allowed for deleted files! @dataclass class OInfo: otype: OType # fsobject type (DIRECTORY or FILE) oid: str # fsobject id hash: Optional[bytes] # fsobject hash (better name: ohash) path: Optional[str] # path @dataclass class DirInfo(OInfo): name: Optional[str] = None mtime: Optional[float] = None PKO 牀cloudsync/providers/__init__.pyfrom .gdrive import GDriveProvider from ..tests.fixtures.mock_provider import MockProvider from .dropbox import DropboxProvider PKATO۾_wKwKcloudsync/providers/dropbox.pyimport io import os import time import logging import threading from hashlib import sha256 from typing import Generator, Optional import requests import arrow import dropbox from dropbox import Dropbox, exceptions, files from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, \ CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError log = logging.getLogger(__name__) class _FolderIterator: def __init__(self, api, path, *, recursive, cursor=None): self.api = api self.path = path self.ls_res = None if not cursor: self.ls_res = self.api('files_list_folder', path=self.path, recursive=recursive, limit=200) else: self.ls_res = self.api('files_list_folder_continue', cursor) def __iter__(self): return self def __next__(self): if self.ls_res: if not self.ls_res.entries and self.ls_res.has_more: self.ls_res = self.api( 'files_list_folder_continue', self.ls_res.cursor) if self.ls_res and self.ls_res.entries: ret = self.ls_res.entries.pop() ret.cursor = self.ls_res.cursor return ret raise StopIteration() @property def cursor(self): return self.ls_res and self.ls_res.cursor class DropboxProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 _max_simple_upload_size = 15 * 1024 * 1024 _upload_block_size = 10 * 1024 * 1024 name = "Dropbox" def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() @property def connected(self): return self.client is not None def get_quota(self): space_usage = self._api('users_get_space_usage') account = self._api('users_get_current_account') if space_usage.allocation.is_individual(): used = space_usage.used allocated = space_usage.allocation.get_individual().allocated else: team_allocation = space_usage.allocation.get_team() used, allocated = team_allocation.used, team_allocation.allocated res = { 'used': used, 'total': allocated, 'login': account.email, 'uid': account.account_id[len('dbid:'):] } return res def connect(self, creds): log.debug('Connecting to dropbox') if not self.client: self.api_key = creds.get('key', self.api_key) if not self.api_key: raise CloudTokenError() self.client = Dropbox(self.api_key) try: quota = self.get_quota() self.connection_id = quota['login'] except exceptions.AuthError: self.disconnect() raise CloudTokenError() except Exception as e: log.debug("error connecting %s", e) self.disconnect() raise CloudDisconnectedError() def _api(self, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") log.debug("_api: %s (%s %s)", method, args, kwargs) with self.mutex: try: return getattr(self.client, method)(*args, **kwargs) except exceptions.ApiError as e: if isinstance(e.error, (files.ListFolderError, files.GetMetadataError, files.ListRevisionsError)): if e.error.is_path() and isinstance(e.error.get_path(), files.LookupError): inside_error: files.LookupError = e.error.get_path() if inside_error.is_malformed_path(): log.debug('Malformed path when executing %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'Malformed path when executing %s(%s)' % (method, kwargs)) if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.DeleteError): if e.error.is_path_lookup(): inside_error: files.LookupError = e.error.get_path_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s)' % (method, kwargs)) if isinstance(e.error, files.RelocationError): if e.error.is_from_lookup(): inside_error: files.LookupError = e.error.get_from_lookup() if inside_error.is_not_found(): log.debug('file not found %s(%s %s) : %s', method, args, kwargs, e) raise CloudFileNotFoundError( 'File not found when executing %s(%s,%s)' % (method, args, kwargs)) if e.error.is_to(): inside_error: files.WriteError = e.error.get_to() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) log.debug("here") if isinstance(e.error, files.CreateFolderError): if e.error.is_path() and isinstance(e.error.get_path(), files.WriteError): inside_error: files.WriteError = e.error.get_path() if inside_error.is_conflict(): raise CloudFileExistsError( 'File already exists when executing %s(%s)' % (method, kwargs)) except (exceptions.InternalServerError, exceptions.RateLimitError, requests.exceptions.ReadTimeout): raise CloudTemporaryError() except dropbox.stone_validators.ValidationError as e: log.debug("f*ed up api error: %s", e) if "never created" in str(e): raise CloudFileNotFoundError() if "did not match" in str(e): log.warning("oid error %s", e) raise CloudFileNotFoundError() raise except requests.exceptions.ConnectionError as e: log.exception('api error handled exception %s:%s', "dropbox", e.__class__.__name__) self.disconnect() raise CloudDisconnectedError() @property def root_id(self): return "" def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('files_list_folder_get_latest_cursor', self.root_id, recursive=True, include_deleted=True, limit=200) if res: return res.cursor else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor def _events(self, cursor, path=None): # pylint: disable=too-many-branches if path and path != "/": info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) oid = info.oid else: oid = self.root_id for res in _FolderIterator(self._api, oid, recursive=True, cursor=cursor): exists = True log.debug("event %s", res) if isinstance(res, files.DeletedMetadata): # dropbox doesn't give you the id that was deleted # we need to get the ids of every revision # then find out which one was the latest before the deletion time # then get the oid for that revs = self._api('files_list_revisions', res.path_lower, limit=10) if revs is None: # dropbox will give a 409 conflict if the revision history was deleted # instead of raising an error, this gets converted to revs==None log.info("revs is none for %s %s", oid, path) continue log.debug("revs %s", revs) deleted_time = revs.server_deleted if deleted_time is None: # not really sure why this happens, but this event isn't useful without it log.error("revs %s has no deleted time?", revs) continue latest_time = None for ent in revs.entries: assert ent.server_modified is not None if ent.server_modified <= deleted_time and \ (latest_time is None or ent.server_modified >= latest_time): oid = ent.id latest_time = ent.server_modified if not oid: log.error( "skipping deletion %s, because we don't know the oid", res) continue exists = False otype = None ohash = None elif isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None oid = res.id else: otype = FILE ohash = res.content_hash oid = res.id path = res.path_display event = Event(otype, oid, path, ohash, exists, time.time()) yield event if getattr(res, "cursor", False): self.__cursor = res.cursor def events(self) -> Generator[Event, None, None]: yield from self._events(self.current_cursor) def walk(self, path, since=None): yield from self._events(None, path=path) def listdir(self, oid) -> Generator[DirInfo, None, None]: yield from self._listdir(oid, recursive=False) def _listdir(self, oid, *, recursive) -> Generator[DirInfo, None, None]: info = self.info_oid(oid) for res in _FolderIterator(self._api, oid, recursive=recursive): if isinstance(res, files.DeletedMetadata): continue if isinstance(res, files.FolderMetadata): otype = DIRECTORY ohash = None else: otype = FILE ohash = res.content_hash path = res.path_display oid = res.id relative = self.is_subpath(info.path, path).lstrip("/") if relative: yield DirInfo(otype, oid, ohash, path, name=relative) def create(self, path: str, file_like, metadata=None) -> OInfo: self._verify_parent_folder_exists(path) if self.exists_path(path): raise CloudFileExistsError(path) return self._upload(path, file_like, metadata) def upload(self, oid: str, file_like, metadata=None) -> OInfo: if oid.startswith(self.sep): raise CloudFileNotFoundError("Called upload with a path instead of an OID: %s" % oid) if not self.exists_oid(oid): raise CloudFileNotFoundError(oid) return self._upload(oid, file_like, metadata) def _upload(self, oid, file_like, metadata=None) -> OInfo: res = None metadata = metadata or {} file_like.seek(0, io.SEEK_END) size = file_like.tell() file_like.seek(0) if size < self._max_simple_upload_size: res = self._api('files_upload', file_like.read(), oid, mode=files.WriteMode('overwrite')) else: cursor = None while True: data = file_like.read(self._upload_block_size) if not data: if cursor: local_mtime = arrow.get(metadata.get('mtime', time.time())).datetime commit = files.CommitInfo(path=oid, mode=files.WriteMode.overwrite, autorename=False, client_modified=local_mtime, mute=True) res = self._api( 'files_upload_session_finish', data, cursor, commit ) break if not cursor: self._api('files_upload_session_start', data) cursor = files.UploadSessionCursor( res.session_id, len(data)) else: self._api('files_upload_session_append_v2', data, cursor) cursor.offset += len(data) if res is None: raise CloudFileExistsError() ret = OInfo(otype=FILE, oid=res.id, hash=res.content_hash, path=res.path_display) log.debug('upload result is %s', ret) return ret def download(self, oid, file_like): ok = self._api('files_download', oid) if not ok: raise CloudFileNotFoundError() res, content = ok for data in content.iter_content(self._upload_block_size): file_like.write(data) return OInfo(otype=FILE, oid=oid, hash=res.content_hash, path=res.path_display) def _attempt_rename_folder_over_empty_folder(self, info: OInfo, path: str) -> None: if info.otype != DIRECTORY: raise CloudFileExistsError(path) possible_conflict = self.info_path(path) if possible_conflict.otype == DIRECTORY: try: next(self._listdir(possible_conflict.oid, recursive=False)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: pass # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) self._api('files_move_v2', info.oid, path) return else: # conflict is a file, and we already know that the rename is on a folder raise CloudFileExistsError(path) def rename(self, oid, path): try: self._api('files_move_v2', oid, path) except CloudFileExistsError: old_info = self.info_oid(oid) if self.paths_match(old_info.path, path): new_info = self.info_path(path) if oid == new_info.oid and old_info.path != path: temp_path = path + "." + os.urandom(16).hex() self._api('files_move_v2', oid, temp_path) self.rename(oid, path) return oid if old_info.otype == DIRECTORY: self._attempt_rename_folder_over_empty_folder(old_info, path) else: raise return oid @staticmethod def hash_data(file_like) -> bytes: # get a hash from a filelike that's the same as the hash i natively use binstr = b'' while True: data = file_like.read(4 * 1024 * 1024) if not data: break binstr += sha256(data).digest() return sha256(binstr).hexdigest() def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ, unused-argument # TODO: check if a regular filesystem lets you mkdir over a non-empty folder... self._verify_parent_folder_exists(path) if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError() log.debug("Skipped creating already existing folder: %s", path) return info.oid res = self._api('files_create_folder_v2', path) log.debug("dbx mkdir %s", res) res = res.metadata return res.id def delete(self, oid): info = self.info_oid(oid) if not info: return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self._listdir(oid, recursive=False)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files_delete_v2', oid) except CloudFileNotFoundError: # shouldn't happen because we are checking above... return def exists_oid(self, oid) -> bool: return bool(self.info_oid(oid)) def info_path(self, path: str) -> Optional[OInfo]: if path == "/": return OInfo(DIRECTORY, "", None, "/") try: log.debug("res info path %s", path) res = self._api('files_get_metadata', path) log.debug("res info path %s", res) oid = res.id if oid[0:3] != 'id:': log.warning("invalid oid %s from path %s", oid, path) if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash return OInfo(otype, oid, fhash, path) except CloudFileNotFoundError: return None def exists_path(self, path) -> bool: return self.info_path(path) is not None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: if oid == "": otype = DIRECTORY fhash = None path = "/" else: try: res = self._api('files_get_metadata', oid) log.debug("res info oid %s", res) path = res.path_display if isinstance(res, files.FolderMetadata): otype = DIRECTORY fhash = None else: otype = FILE fhash = res.content_hash except CloudFileNotFoundError: return None return OInfo(otype, oid, fhash, path) PKQOGLllcloudsync/providers/gdrive.pyimport time import logging import threading import webbrowser import hashlib from ssl import SSLError import json from typing import Generator, Optional import arrow from googleapiclient.discovery import build # pylint: disable=import-error from googleapiclient.errors import HttpError # pylint: disable=import-error from httplib2 import Http, HttpLib2Error from oauth2client import client # pylint: disable=import-error from oauth2client.client import OAuth2WebServerFlow, HttpAccessTokenRefreshError, OAuth2Credentials # pylint: disable=import-error from googleapiclient.http import _should_retry_response # This is necessary because google masks errors from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload # pylint: disable=import-error from cloudsync.oauth_redir_server import OAuthRedirServer from cloudsync import Provider, OInfo, DIRECTORY, FILE, Event, DirInfo from cloudsync.exceptions import CloudTokenError, CloudDisconnectedError, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError class GDriveFileDoneError(Exception): pass log = logging.getLogger(__name__) logging.getLogger('googleapiclient.discovery').setLevel(logging.WARN) class GDriveInfo(DirInfo): # pylint: disable=too-few-public-methods pids = [] def __init__(self, *a, pids=None, **kws): super().__init__(*a, **kws) if pids is None: pids = [] self.pids = pids class GDriveProvider(Provider): # pylint: disable=too-many-public-methods, too-many-instance-attributes case_sensitive = False default_sleep = 15 provider = 'googledrive' name = 'Google Drive' _scope = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.activity.readonly' ] _client_id = '433538542924-ehhkb8jn358qbreg865pejbdpjnm31c0.apps.googleusercontent.com' _client_secret = 'Y-GBVYGO2v5V9cV4WsjMSXDV' _redir = 'urn:ietf:wg:oauth:2.0:oob' _token_uri = 'https://accounts.google.com/o/oauth2/token' _folder_mime_type = 'application/vnd.google-apps.folder' _io_mime_type = 'application/octet-stream' def __init__(self): super().__init__() self.__root_id = None self.__cursor = None self.client = None self.api_key = None self.refresh_token = None self.user_agent = 'cloudsync/1.0' self.mutex = threading.Lock() self._ids = {"/": "root"} self._trashed_ids = {} self._flow = None self._redir_server: Optional[OAuthRedirServer] = None self._oauth_done = threading.Event() @property def connected(self): return self.client is not None def get_display_name(self): return self.name def initialize(self, localhost_redir=True): if localhost_redir: try: if not self._redir_server: self._redir_server = OAuthRedirServer(self._on_oauth_success, self.get_display_name(), use_predefined_ports=False, on_failure=self._on_oauth_failure) self._flow = OAuth2WebServerFlow(client_id=self._client_id, client_secret=self._client_secret, scope=self._scope, redirect_uri=self._redir_server.uri('/auth/')) except OSError: log.exception('Unable to use redir server. Falling back to manual mode') localhost_redir = False if not localhost_redir: self._flow = OAuth2WebServerFlow(client_id=self._client_id, client_secret=self._client_secret, scope=self._scope, redirect_uri=self._redir) url = self._flow.step1_get_authorize_url() self._oauth_done.clear() webbrowser.open(url) return localhost_redir, url def _on_oauth_success(self, auth_dict): auth_string = auth_dict['code'][0] try: res: OAuth2Credentials = self._flow.step2_exchange(auth_string) self.refresh_token = res.refresh_token self.api_key = res.access_token self._oauth_done.set() except Exception: log.exception('Authentication failed') raise def _on_oauth_failure(self, err): log.error("oauth failure: %s", err) self._oauth_done.set() def authenticate(self): try: self.initialize() self._oauth_done.wait() return {"refresh_token": self.refresh_token, "api_key": self.api_key, "client_secret": self._client_secret, "client_id": self._client_id, } finally: if self._redir_server: self._redir_server.shutdown() self._redir_server = None def get_quota(self): # https://developers.google.com/drive/api/v3/reference/about res = self._api('about', 'get', fields='storageQuota,user') quota = res['storageQuota'] user = res['user'] usage = int(quota['usage']) if 'limit' in quota and quota['limit']: limit = int(quota['limit']) else: # It is possible for an account to have unlimited space - pretend it's 1TB limit = 1024 * 1024 * 1024 * 1024 res = { 'used': usage, 'total': limit, 'login': user['emailAddress'], 'uid': user['permissionId'] } return res def connect(self, creds): log.debug('Connecting to googledrive') if not self.client: api_key = creds.get('api_key', self.api_key) refresh_token = creds.get('refresh_token', self.refresh_token) if not refresh_token: new_creds = self.authenticate() api_key = new_creds.get('api_key', None) refresh_token = new_creds.get('refresh_token', None) kwargs = {} try: with self.mutex: creds = client.GoogleCredentials(access_token=api_key, client_id=creds.get( 'client_id', self._client_id), client_secret=creds.get( 'client_secret', self._client_secret), refresh_token=refresh_token, token_expiry=None, token_uri=self._token_uri, user_agent=self.user_agent) creds.refresh(Http()) self.client = build( 'drive', 'v3', http=creds.authorize(Http())) kwargs['api_key'] = creds.access_token if getattr(creds, 'refresh_token', None): refresh_token = creds.refresh_token self.refresh_token = refresh_token self.api_key = api_key quota = None try: quota = self.get_quota() except SSLError: # pragma: no cover # Seeing some intermittent SSL failures that resolve on retry log.warning('Retrying intermittent SSLError') quota = self.get_quota() self.connection_id = quota['login'] except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() return self.client @staticmethod def _get_reason_from_http_error(e): # gets a default something (actually the message, not the reason) using their secret interface reason = e._get_reason() # pylint: disable=protected-access # parses the JSON of the content to get the reason from where it really lives in the content try: # this code was copied from googleapiclient/http.py:_should_retry_response() data = json.loads(e.content.decode('utf-8')) if isinstance(data, dict): reason = data['error']['errors'][0]['reason'] else: reason = data[0]['error']['errors']['reason'] except (UnicodeDecodeError, ValueError, KeyError): log.warning('Invalid JSON content from response: %s', e.content) return reason def _api(self, resource, method, *args, **kwargs): # pylint: disable=arguments-differ, too-many-branches, too-many-statements if not self.client: raise CloudDisconnectedError("currently disconnected") with self.mutex: try: if resource == 'media': res = args[0] args = args[1:] else: res = getattr(self.client, resource)() meth = getattr(res, method)(*args, **kwargs) if resource == 'media' or (resource == 'files' and method == 'get_media'): ret = meth else: ret = meth.execute() log.debug("api: %s %s (%s) -> %s", method, args, kwargs, ret) return ret except HttpAccessTokenRefreshError: self.disconnect() raise CloudTokenError() except SSLError as e: if "WRONG_VERSION" in str(e): # httplib2 used by google's api gives this weird error for no discernable reason raise CloudTemporaryError(str(e)) raise except HttpError as e: if str(e.resp.status) == '416': raise GDriveFileDoneError() if str(e.resp.status) == '404': raise CloudFileNotFoundError('File not found when executing %s.%s(%s)' % ( resource, method, kwargs )) reason = self._get_reason_from_http_error(e) if str(e.resp.status) == '403' and str(reason) == 'parentNotAFolder': raise CloudFileExistsError("Parent Not A Folder") if (str(e.resp.status) == '403' and reason in ('userRateLimitExceeded', 'rateLimitExceeded', )) \ or str(e.resp.status) == '429': raise CloudTemporaryError("rate limit hit") # At this point, _should_retry_response() returns true for error codes >=500, 429, and 403 with # the reason 'userRateLimitExceeded' or 'rateLimitExceeded'. 403 without content, or any other # response is not retried. We have already taken care of some of those cases above, but we call this # below to catch the rest, and in case they improve their library with more conditions. If we called # meth.execute() above with a num_retries argument, all this retrying would happen in the google api # library, and we wouldn't have to think about retries. should_retry = _should_retry_response(e.resp.status, e.content) if should_retry: raise CloudTemporaryError("unknown error %s" % e) raise except (TimeoutError, HttpLib2Error): self.disconnect() raise CloudDisconnectedError("disconnected on timeout") @property def root_id(self): if not self.__root_id: res = self._api('files', 'get', fileId='root', fields='id', ) self.__root_id = res['id'] self._ids['/'] = self.__root_id return self.__root_id def disconnect(self): self.client = None @property def latest_cursor(self): res = self._api('changes', 'getStartPageToken') if res: return res.get('startPageToken') else: return None @property def current_cursor(self): if not self.__cursor: self.__cursor = self.latest_cursor return self.__cursor def events(self) -> Generator[Event, None, None]: # pylint: disable=too-many-locals, too-many-branches page_token = self.current_cursor while page_token is not None: # log.debug("looking for events, timeout: %s", timeout) response = self._api('changes', 'list', pageToken=page_token, spaces='drive', includeRemoved=True, includeItemsFromAllDrives=True, supportsAllDrives=True) new_cursor = response.get('newStartPageToken', None) for change in response.get('changes'): log.debug("got event %s", change) # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T16:57:06.779Z', # 'removed': False, 'fileId': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'file': {'kind': 'drive#file', # 'id': '1NCi2j1SjsPUTQTtaD2dFNsrt49J8TPDd', 'name': 'dest', 'mimeType': 'application/octet-stream'}} # {'kind': 'drive#change', 'type': 'file', 'changeType': 'file', 'time': '2019-07-23T20:02:14.156Z', # 'removed': True, 'fileId': '1lhRe0nDplA6I5JS18642rg0KIbYN66lR'} ts = arrow.get(change.get('time')).float_timestamp oid = change.get('fileId') exists = not change.get('removed') fil = change.get('file') if fil: if fil.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE else: otype = None ohash = None path = self._path_oid(oid, use_cache=False) event = Event(otype, oid, path, ohash, exists, ts, new_cursor=new_cursor) remove = [] for cpath, coid in self._ids.items(): if coid == oid: if cpath != path: remove.append(cpath) if path and otype == DIRECTORY and self.is_subpath(path, cpath): remove.append(cpath) for r in remove: self._ids.pop(r, None) if path: self._ids[path] = oid log.debug("converted event %s as %s", change, event) yield event if new_cursor and page_token and new_cursor != page_token: self.__cursor = new_cursor page_token = response.get('nextPageToken') def _walk(self, path, oid): for ent in self.listdir(oid): current_path = self.join(path, ent.name) event = Event(otype=ent.otype, oid=ent.oid, path=current_path, hash=ent.hash, exists=True, mtime=time.time()) log.debug("walk %s", event) yield event if ent.otype == DIRECTORY: if self.exists_oid(ent.oid): yield from self._walk(current_path, ent.oid) def walk(self, path, since=None): info = self.info_path(path) if not info: raise CloudFileNotFoundError(path) yield from self._walk(path, info.oid) def __prep_upload(self, path, metadata): # modification time mtime = metadata.get("modifiedTime", time.time()) mtime = arrow.get(mtime).isoformat() gdrive_info = { 'modifiedTime': mtime } # mime type, if provided mime_type = metadata.get("mimeType", None) if mime_type: gdrive_info['mimeType'] = mime_type # path, if provided if path: _, name = self.split(path) gdrive_info['name'] = name # misc properties, if provided app_props = metadata.get("appProperties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['appProperties'] = app_props # misc properties, if provided app_props = metadata.get("properties", None) if app_props: # caller can specify google-specific stuff, if desired gdrive_info['properties'] = app_props log.debug("info %s", gdrive_info) return gdrive_info def upload(self, oid, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(None, metadata) ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' res = self._api('files', 'update', body=gdrive_info, fileId=oid, media_body=ul, fields=fields) log.debug("response from upload %s", res) if not res: raise CloudTemporaryError("unknown response from drive on upload") md5 = res.get('md5Checksum', None) # can be none if the user tries to upload to a folder if md5 is None: possible_conflict = self._info_oid(oid) if possible_conflict and possible_conflict.otype == DIRECTORY: raise CloudFileExistsError("Can only upload to a file: %s" % possible_conflict.path) return OInfo(otype=FILE, oid=res['id'], hash=md5, path=None) def create(self, path, file_like, metadata=None) -> 'OInfo': if not metadata: metadata = {} gdrive_info = self.__prep_upload(path, metadata) if self.exists_path(path): raise CloudFileExistsError() ul = MediaIoBaseUpload(file_like, mimetype=self._io_mime_type, chunksize=4 * 1024 * 1024) fields = 'id, md5Checksum' parent_oid = self.get_parent_id(path) gdrive_info['parents'] = [parent_oid] res = self._api('files', 'create', body=gdrive_info, media_body=ul, fields=fields) log.debug("response from create %s : %s", path, res) if not res: raise CloudTemporaryError("unknown response from drive on upload") self._ids[path] = res['id'] log.debug("path cache %s", self._ids) return OInfo(otype=FILE, oid=res['id'], hash=res['md5Checksum'], path=path) def download(self, oid, file_like): req = self._api('files', 'get_media', fileId=oid) dl = MediaIoBaseDownload(file_like, req, chunksize=4 * 1024 * 1024) done = False while not done: try: _, done = self._api('media', 'next_chunk', dl) except GDriveFileDoneError: done = True def rename(self, oid, path): # pylint: disable=too-many-locals, too-many-branches pid = self.get_parent_id(path) add_pids = [pid] if pid == 'root': add_pids = [self.root_id] info = self._info_oid(oid) if info is None: raise CloudFileNotFoundError(oid) remove_pids = info.pids old_path = info.path _, name = self.split(path) body = {'name': name} if self.exists_path(path): possible_conflict = self.info_path(path) if FILE in (info.otype, possible_conflict.otype): if possible_conflict.oid != oid: # it's OK to rename a file over itself, frex, to change case raise CloudFileExistsError(path) else: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError("Cannot rename over non-empty folder %s" % path) except StopIteration: # Folder is empty, rename over it no problem self.delete(possible_conflict.oid) if not old_path: for cpath, coid in list(self._ids.items()): if coid == oid: old_path = cpath self._api('files', 'update', body=body, fileId=oid, addParents=add_pids, removeParents=remove_pids, fields='id') for cpath, coid in list(self._ids.items()): relative = self.is_subpath(old_path, cpath) if relative: new_cpath = self.join(path, relative) self._ids.pop(cpath) self._ids[new_cpath] = coid log.debug("renamed %s -> %s", oid, body) return oid def listdir(self, oid) -> Generator[GDriveInfo, None, None]: query = f"'{oid}' in parents" try: res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, name, mimeType, trashed)', pageToken=None) except CloudFileNotFoundError: if self._info_oid(oid): return log.debug("listdir oid gone %s", oid) raise if not res or not res['files']: if self.exists_oid(oid): return raise CloudFileNotFoundError(oid) log.debug("listdir got res %s", res) for ent in res['files']: fid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent['name'] trashed = ent.get('trashed', False) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE if not trashed: yield GDriveInfo(otype, fid, fhash, None, pids=pids, name=name) def mkdir(self, path, metadata=None) -> str: # pylint: disable=arguments-differ if self.exists_path(path): info = self.info_path(path) if info.otype == FILE: raise CloudFileExistsError(path) log.debug("Skipped creating already existing folder: %s", path) return info.oid pid = self.get_parent_id(path) _, name = self.split(path) file_metadata = { 'name': name, 'parents': [pid], 'mimeType': self._folder_mime_type, } if metadata: file_metadata.update(metadata) res = self._api('files', 'create', body=file_metadata, fields='id') fileid = res.get('id') self._ids[path] = fileid return fileid def delete(self, oid): info = self.info_oid(oid) if not info: log.debug("deleted non-existing oid %s", oid) return # file doesn't exist already... if info.otype == DIRECTORY: try: next(self.listdir(oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, info.path)) except StopIteration: pass # Folder is empty, delete it no problem try: self._api('files', 'delete', fileId=oid) except CloudFileNotFoundError: log.debug("deleted non-existing oid %s", oid) for currpath, curroid in list(self._ids.items()): if curroid == oid: self._trashed_ids[currpath] = self._ids[currpath] del self._ids[currpath] def exists_oid(self, oid): return self._info_oid(oid) is not None def info_path(self, path: str) -> Optional[OInfo]: if path == "/": return self.info_oid(self.root_id) try: parent_id = self.get_parent_id(path) _, name = self.split(path) query = f"'{parent_id}' in parents and name='{name}'" res = self._api('files', 'list', q=query, spaces='drive', fields='files(id, md5Checksum, parents, mimeType, trashed, name)', pageToken=None) except CloudFileNotFoundError: return None if not res['files']: return None if res.get('trashed'): return None ent = res['files'][0] log.debug("res is %s", res) oid = ent['id'] pids = ent['parents'] fhash = ent.get('md5Checksum') name = ent.get('name') # query is insensitive to certain features of the name # ....cache correct basename path = self.join(self.dirname(path), name) if ent.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE self._ids[path] = oid return GDriveInfo(otype, oid, fhash, path, pids=pids) def exists_path(self, path) -> bool: if path in self._ids: return True return self.info_path(path) is not None def get_parent_id(self, path): if not path: return None parent, _ = self.split(path) if parent == path: return self._ids.get(parent) if not self.exists_path(parent): raise CloudFileNotFoundError("parent %s must exist" % parent) return self._ids[parent] def _path_oid(self, oid, info=None, use_cache=True) -> Optional[str]: """convert oid to path""" if use_cache: for p, pid in self._ids.items(): if pid == oid: return p for p, pid in self._trashed_ids.items(): if pid == oid: return p # todo, better cache, keep up to date, etc. if not info: info = self._info_oid(oid) if info and info.pids and info.name: ppath = self._path_oid(info.pids[0]) if ppath: path = self.join(ppath, info.name) self._ids[path] = oid return path return None def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: info = self._info_oid(oid) if info is None: return None # expensive path = self._path_oid(oid, info, use_cache=use_cache) ret = OInfo(info.otype, info.oid, info.hash, path) log.debug("info oid ret: %s", ret) return ret @staticmethod def hash_data(file_like) -> bytes: # get a hash from a filelike that's the same as the hash i natively use md5 = hashlib.md5() for c in iter(lambda: file_like.read(32768), b''): md5.update(c) return md5.hexdigest() def _info_oid(self, oid) -> Optional[GDriveInfo]: try: res = self._api('files', 'get', fileId=oid, fields='name, md5Checksum, parents, mimeType, trashed', ) except CloudFileNotFoundError: return None log.debug("info oid %s", res) if res.get('trashed'): return None pids = res.get('parents') fhash = res.get('md5Checksum') name = res.get('name') if res.get('mimeType') == self._folder_mime_type: otype = DIRECTORY else: otype = FILE return GDriveInfo(otype, oid, fhash, None, pids=pids, name=name) PKO_lcloudsync/sync/__init__.py__all__ = ['SyncManager', 'SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] from .manager import * from .state import * PKPOg$3~~cloudsync/sync/manager.pyimport os import logging import tempfile import shutil import time from typing import Tuple, Optional from cloudsync.provider import Provider __all__ = ['SyncManager'] from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError, CloudTemporaryError from cloudsync.types import DIRECTORY, FILE from cloudsync.runnable import Runnable from cloudsync.log import TRACE from .state import SyncState, SyncEntry, SideState, TRASHED, EXISTS, LOCAL, REMOTE from .util import debug_sig log = logging.getLogger(__name__) # useful for converting oids and pointer nubmers into digestible nonces FINISHED = 1 REQUEUE = 0 def other_side(index): return 1-index class ResolveFile(): def __init__(self, info, provider): self.info = info self.provider = provider self.path = info.path self.side = info.side self.otype = info.otype self.temp_file = info.temp_file if self.otype == FILE: assert info.temp_file self.__fh = None @property def fh(self): if not self.__fh: if not os.path.exists(self.info.temp_file): try: with open(self.info.temp_file, "wb") as f: self.provider.download(self.info.oid, f) except Exception as e: log.debug("error downloading %s", e) try: os.unlink(self.info.temp_file) except FileNotFoundError: pass raise self.__fh = open(self.info.temp_file, "rb") return self.__fh def read(self, *a): return self.fh.read(*a) def write(self, buf): return self.fh.write(buf) def close(self): return self.fh.close() def seek(self, *a): return self.fh.seek(*a) class SyncManager(Runnable): # pylint: disable=too-many-public-methods, too-many-instance-attributes def __init__(self, state, providers: Tuple[Provider, Provider], translate, resolve_conflict, sleep=None): self.state: SyncState = state self.providers: Tuple[Provider, Provider] = providers self.translate = translate self.__resolve_conflict = resolve_conflict self.tempdir = tempfile.mkdtemp(suffix=".cloudsync") if not sleep: # these are the event sleeps, but really we need more info than this sleep = (self.providers[LOCAL].default_sleep, self.providers[REMOTE].default_sleep) self.sleep = sleep # TODO: we need sync_aging, backoff_min, backoff_max, backoff_mult documented with an interface and tests! #### self.min_backoff = 0 self.max_backoff = 0 self.backoff = 0 max_sleep = max(sleep) # on sync fail, use the worst time for backoff self.aging = max_sleep / 5 # how long before even trying to sync self.min_backoff = max_sleep / 10.0 # event sleep of 15 seconds == 1.5 second backoff on failures self.max_backoff = max_sleep * 4.0 # escalating up to a 1 minute wait time self.backoff = self.min_backoff self.mult_backoff = 2 assert len(self.providers) == 2 def set_resolver(self, resolver): self.__resolve_conflict = resolver def do(self): sync: SyncEntry = self.state.change(self.aging) if sync: try: self.sync(sync) self.state.storage_update(sync) self.backoff = self.min_backoff except CloudTemporaryError as e: log.error( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted) sync.punt() time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) except Exception as e: log.exception( "exception %s[%s] while processing %s, %i", type(e), e, sync, sync.punted, stack_info=True) sync.punt() time.sleep(self.backoff) self.backoff = min(self.backoff * self.mult_backoff, self.max_backoff) def done(self): log.info("cleanup %s", self.tempdir) shutil.rmtree(self.tempdir) def get_latest_state(self, ent): log.log(TRACE, "before update state %s", ent) for i in (LOCAL, REMOTE): if not ent[i].changed: continue info = self.providers[i].info_oid(ent[i].oid, use_cache=False) if not info: continue ent[i].exists = EXISTS ent[i].hash = info.hash ent[i].otype = info.otype if ent[i].otype == FILE: if ent[i].hash is None: ent[i].hash = self.providers[i].hash_oid(ent[i].oid) if ent[i].exists == EXISTS: assert ent[i].hash is not None, "Cannot sync if hash is None" if ent[i].path != info.path: self.update_entry(ent, oid=ent[i].oid, side=i, path=info.path) log.log(TRACE, "after update state %s", ent) def path_conflict(self, ent): if ent[0].path and ent[1].path: return not self.providers[0].paths_match(ent[0].path, ent[0].sync_path) and \ not self.providers[1].paths_match(ent[1].path, ent[1].sync_path) return False def sync(self, sync): self.get_latest_state(sync) if sync.hash_conflict(): self.handle_hash_conflict(sync) return if self.path_conflict(sync): self.handle_path_conflict(sync) return log.log(TRACE, "table\r\n%s", self.state.pretty_print()) for i in (LOCAL, REMOTE): if sync[i].changed: response = self.embrace_change(sync, i, other_side(i)) if response == FINISHED: self.finished(i, sync) break def temp_file(self): # prefer big random name over NamedTemp which can infinite loop in odd situations! ret = os.path.join(self.tempdir, os.urandom(16).hex()) log.debug("tempdir %s -> %s", self.tempdir, ret) return ret def finished(self, side, sync): sync[side].changed = 0 self.state.finished(sync) self.clean_temps(sync) @staticmethod def clean_temps(sync): # todo: move this to the sync obj for side in (LOCAL, REMOTE): if sync[side].temp_file: try: os.unlink(sync[side].temp_file) except FileNotFoundError: pass except OSError as e: log.debug("exception unlinking %s", e) except Exception as e: # any exceptions here are pointless log.warning("exception unlinking %s", e) sync[side].temp_file = None def download_changed(self, changed, sync): sync[changed].temp_file = sync[changed].temp_file or self.temp_file() assert sync[changed].oid if os.path.exists(sync[changed].temp_file): return True try: partial_temp = sync[changed].temp_file + ".tmp" log.debug("%s download %s to %s", self.providers[changed], sync[changed].oid, partial_temp) with open(partial_temp, "wb") as f: self.providers[changed].download(sync[changed].oid, f) os.rename(partial_temp, sync[changed].temp_file) return True except PermissionError as e: raise CloudTemporaryError("download or rename exception %s" % e) except CloudFileNotFoundError: log.debug("download from %s failed fnf, switch to not exists", self.providers[changed].name) sync[changed].exists = TRASHED return False def mkdir_synced(self, changed, sync, translated_path): synced = other_side(changed) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] if ents: for ent in ents: if ent[changed].otype == DIRECTORY: # these we can toss, they are other folders # keep the current one, since it exists for sure log.debug("discard %s", ent) self.discard_entry(ent) ents = [ent for ent in ents if not ent.discarded] ents = [ent for ent in ents if TRASHED not in ( ent[changed].exists, ent[synced].exists)] if ents: if not sync.punted: sync.punt() return REQUEUE raise NotImplementedError( "What to do if we create a folder when there's already a FILE") try: log.debug("translated %s as path %s", sync[changed].path, translated_path) # could have made a dir that already existed on my side or other side chents = list(self.state.lookup_path(changed, sync[changed].path)) syents = list(self.state.lookup_path(synced, translated_path)) notme_chents = [ent for ent in chents if ent != sync] conflicts = [] for ent in notme_chents: # dup dirs on remote side can be ignored if ent[synced].otype == DIRECTORY: log.debug("discard duplicate dir entry, caused by a mkdirs %s", ent) self.discard_entry(ent) else: conflicts.append(ent) # if a file exists with the same name on the sync side conflicts = [ent for ent in syents if ent[synced].exists != TRASHED and ent != sync] # TODO: check for a cycle here. If there is a cycle this will never sync up. see below comment for more info if conflicts: log.info("mkdir conflict %s letting other side handle it", sync) return FINISHED # make the dir oid = self.providers[synced].mkdirs(translated_path) log.debug("mkdir %s as path %s oid %s", self.providers[synced].name, translated_path, debug_sig(oid)) # did i already have that oid? if so, chuck it already_dir = self.state.lookup_oid(synced, oid) if already_dir and already_dir != sync and already_dir[synced].otype == DIRECTORY: log.debug("discard %s", already_dir) self.discard_entry(already_dir) sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=oid, path=translated_path) return FINISHED except CloudFileNotFoundError: if not sync.punted: sync.punt() return REQUEUE log.debug("mkdir %s : %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].name, translated_path) raise NotImplementedError("TODO mkdir, and make state etc") def upload_synced(self, changed, sync): assert sync[changed].temp_file synced = other_side(changed) try: info = self.providers[synced].upload( sync[synced].oid, open(sync[changed].temp_file, "rb")) log.debug("upload to %s as path %s", self.providers[synced].name, sync[synced].sync_path) sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = sync[synced].path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry( sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path) except CloudFileNotFoundError: log.debug("upload to %s failed fnf, TODO fix mkdir code and stuff", self.providers[synced].name) raise NotImplementedError("TODO mkdir, and make state etc") except CloudFileExistsError: # this happens if the remote oid is a folder log.debug("split bc upload to folder") defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync, self.providers) self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def _create_synced(self, changed, sync, translated_path): synced = other_side(changed) log.debug("create on %s as path %s", self.providers[synced].name, translated_path) try: info = self.providers[synced].create( translated_path, open(sync[changed].temp_file, "rb")) log.debug("created %s", info) except CloudFileExistsError: info = self.providers[synced].info_path(translated_path) if not info: raise with open(sync[changed].temp_file, "rb") as f: existing_hash = self.providers[synced].hash_data(f) if existing_hash != info.hash: raise log.debug("use existing %s", info) sync[synced].sync_hash = info.hash if info.path: sync[synced].sync_path = info.path else: sync[synced].sync_path = translated_path sync[changed].sync_hash = sync[changed].hash sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, exists=True, oid=info.oid, path=sync[synced].sync_path, hash=info.hash) def update_entry(self, ent, side, oid, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin # updates entry without marking as changed unless explicit # used internally self.state.update_entry(ent, side, oid, path=path, hash=hash, exists=exists, changed=changed, otype=otype, provider=self.providers[side]) def change_state(self, side, otype, oid, *, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin # looks up oid and changes state, marking changed as if it's an event # used only for testing self.state.update(side, otype, oid, path=path, hash=hash, exists=exists, prior_oid=prior_oid, provider=self.providers[side]) def create_synced(self, changed, sync, translated_path): synced = other_side(changed) try: self._create_synced(changed, sync, translated_path) return FINISHED except CloudFileNotFoundError: # parent presumably exists parent = self.providers[changed].dirname(sync[changed].path) log.debug("make %s first before %s", parent, sync[changed].path) if not self.state.lookup_path(changed, parent): info = self.providers[changed].info_path(parent) if info: self.state.update(changed, DIRECTORY, info.oid, self.providers[changed], path=parent) sync.punt() return REQUEUE except CloudFileExistsError: # there's a file or folder in the way, let that resolve if possible log.debug("can't create %s, try punting", translated_path) if sync.punted > 0: info = self.providers[synced].info_path(translated_path) if not info: log.debug("got a file exists, and then it didn't exist %s", sync) sync.punt() return REQUEUE sync[synced].oid = info.oid sync[synced].hash = info.hash sync[synced].path = translated_path self.update_entry(sync, synced, info.oid, path=translated_path) # maybe it's a hash conflict sync.punt() return REQUEUE else: # maybe it's a name conflict sync.punt() return REQUEUE def __resolve_file_likes(self, side_states): fhs = [] for ss in side_states: assert type(ss) is SideState if not ss.temp_file and ss.otype == FILE: ss.temp_file = self.temp_file() fhs.append(ResolveFile(ss, self.providers[ss.side])) assert ss.oid return fhs def __safe_call_resolver(self, fhs): if DIRECTORY in (fhs[0].otype, fhs[1].otype): if fhs[0].otype != fhs[1].otype: if fhs[0].otype == DIRECTORY: fh = fhs[0] else: fh = fhs[1] # for simplicity: directory conflicted with file, always favors directory return (fh, True) is_file_like = lambda f: hasattr(f, "read") and hasattr(f, "close") ret = None try: ret = self.__resolve_conflict(*fhs) if ret: if len(ret) != 2: log.error("bad return value for resolve conflict %s", ret) ret = None if not is_file_like(ret[0]): log.error("bad return value for resolve conflict %s", ret) ret = None except Exception as e: log.exception("exception during conflict resolution %s", e) if ret is None: # we defer to the remote... since this can prevent loops if fhs[0].side == REMOTE: ret = (fhs[0], True) else: ret = (fhs[1], True) return ret def __resolver_merge_upload(self, side_states, fh, keep): # we modified to both sides, need to merge the entries ent1, ent2 = side_states defer_ent = self.state.lookup_oid(ent1.side, ent1.oid) replace_ent = self.state.lookup_oid(ent2.side, ent2.oid) if keep: # both sides are being kept, so we have to upload since there are no entries fh.seek(0) info1 = self.providers[ent1.side].create(ent1.path, fh) fh.seek(0) info2 = self.providers[ent2.side].create(ent2.path, fh) ent1.oid = info1.oid ent2.oid = info2.oid self.update_entry(defer_ent, ent1.side, ent1.oid, path=ent1.path, hash=ent1.hash) ent1 = defer_ent[ent1.side] ent2 = defer_ent[ent2.side] ent2.sync_hash = info2.hash ent2.sync_path = info2.path ent1.sync_hash = info1.hash ent1.sync_path = info1.path # in case oids have changed self.update_entry(defer_ent, ent2.side, ent2.oid, path=ent2.path, hash=ent2.hash) defer_ent[ent2.side].sync_hash = ent2.sync_hash defer_ent[ent2.side].sync_path = ent2.sync_path self.discard_entry(replace_ent) def resolve_conflict(self, side_states): # pylint: disable=too-many-statements fhs = self.__resolve_file_likes(side_states) fh, keep = self.__safe_call_resolver(fhs) log.debug("got ret side %s", getattr(fh, "side", None)) defer = None for i, rfh in enumerate(fhs): this = side_states[i] that = side_states[1-i] if fh is not rfh: # user didn't opt to keep my rfh log.debug("replacing %s", this.side) if not keep: fh.seek(0) info2 = self.providers[this.side].upload(this.oid, fh) this.hash = info2.hash that.sync_hash = that.hash that.sync_path = that.path this.sync_hash = this.hash this.sync_path = this.path else: try: self._resolve_rename(this) except CloudFileNotFoundError: log.debug("there is no conflict, because the file doesn't exist? %s", this) if defer is None: defer = that.side else: defer = None if defer is not None: # toss the other side that was replaced sorted_states = sorted(side_states, key=lambda e: e.side) replace_side = other_side(defer) replace_ent = self.state.lookup_oid(replace_side, sorted_states[replace_side].oid) self.discard_entry(replace_ent) else: # both sides were modified.... self.__resolver_merge_upload(side_states, fh, keep) log.debug("RESOLVED CONFLICT: %s dide: %s", side_states, defer) log.debug("table\r\n%s", self.state.pretty_print()) def delete_synced(self, sync, changed, synced): log.debug("try sync deleted %s", sync[changed].path) # see if there are other entries for the same path, but other ids ents = list(self.state.lookup_path(changed, sync[changed].path)) ents = [ent for ent in ents if ent != sync] # ents2 = list(self.state.lookup_path(synced, sync[synced].path)) # ents += [ent for ent in ents2 if ent != sync] for ent in ents: if ent.is_creation(synced): log.debug("discard delete, pending create %s:%s", synced, ent) self.discard_entry(sync) return if sync[synced].oid: try: self.providers[synced].delete(sync[synced].oid) except CloudFileNotFoundError: pass else: log.debug("was never synced, ignoring deletion") sync[synced].exists = TRASHED self.discard_entry(sync) def check_disjoint_create(self, sync, changed, synced, translated_path): # check for creation of a new file with another in the table if sync[changed].otype != FILE: return False ents = list(self.state.lookup_path(synced, translated_path)) # filter for exists other_ents = [ent for ent in ents if ent != sync] if not other_ents: return False log.debug("found matching %s, other ents: %s", translated_path, other_ents) # ignoring trashed entries with different oids on the same path if all(TRASHED in (ent[synced].exists, ent[changed].exists) for ent in other_ents): return False other_untrashed_ents = [ent for ent in other_ents if TRASHED not in ( ent[synced].exists, ent[changed].exists)] assert len(other_untrashed_ents) == 1 log.debug("split conflict found : %s", other_untrashed_ents) self.handle_split_conflict( other_untrashed_ents[0], synced, sync, changed) return True def handle_path_change_or_creation(self, sync, changed, synced): # pylint: disable=too-many-branches, too-many-return-statements if not sync[changed].path: self.update_sync_path(sync, changed) log.debug("NEW SYNC %s", sync) if sync[changed].exists == TRASHED or sync.discarded: log.debug("requeue trashed event %s", sync) return REQUEUE translated_path = self.translate(synced, sync[changed].path) if translated_path is None: # ignore these return FINISHED if not sync[changed].path: log.debug("can't sync, no path %s", sync) if sync.is_creation(changed): # never synced this before, maybe there's another local path with # the same name already? if self.check_disjoint_create(sync, changed, synced, translated_path): return REQUEUE if sync.is_creation(changed): assert not sync[changed].sync_hash # looks like a new file if sync[changed].otype == DIRECTORY: return self.mkdir_synced(changed, sync, translated_path) if not self.download_changed(changed, sync): return REQUEUE if sync[synced].oid: self.upload_synced(changed, sync) else: return self.create_synced(changed, sync, translated_path) else: # handle rename if self.providers[synced].paths_match(sync[synced].sync_path, translated_path): return FINISHED log.debug("rename %s %s", sync[synced].sync_path, translated_path) try: new_oid = self.providers[synced].rename(sync[synced].oid, translated_path) except CloudFileNotFoundError: log.debug("ERROR: can't rename for now %s", sync) if sync.punted > 5: log.exception("punted too many times, giving up") return FINISHED else: sync.punt() return REQUEUE except CloudFileExistsError: log.debug("can't rename, file exists") if sync.punted: log.debug("rename for conflict") self.rename_to_fix_conflict(sync, synced, translated_path) sync.punt() return REQUEUE sync[synced].sync_path = translated_path sync[changed].sync_path = sync[changed].path self.update_entry(sync, synced, path=translated_path, oid=new_oid) return FINISHED def _resolve_rename(self, replace): replace_ent = self.state.lookup_oid(replace.side, replace.oid) _old_oid, new_oid, new_name = self.conflict_rename(replace.side, replace.path) if new_name is None: return False self.update_entry(replace_ent, side=replace.side, oid=new_oid) return True def rename_to_fix_conflict(self, sync, side, path): old_oid, new_oid, new_name = self.conflict_rename(side, path) if new_name is None: return False log.debug("rename to fix conflict %s -> %s", sync[side].path, new_name) # file can get renamed back, if there's a cycle if old_oid == sync[side].oid: self.update_entry(sync, side=side, oid=new_oid) else: ent = self.state.lookup_oid(side, old_oid) if ent: self.update_entry(ent, side=side, oid=new_oid) return True def conflict_rename(self, side, path): folder, base = self.providers[side].split(path) if base == "": raise ValueError("bad path %s" % path) index = base.find(".") if index >= 0: ext = base[index:] base = base[:index] else: base = base ext = "" oinfo = self.providers[side].info_path(path) if not oinfo: return None, None, None i = 1 new_oid = None conflict_name = base + ".conflicted" + ext while new_oid is None: try: conflict_path = self.providers[side].join(folder, conflict_name) new_oid = self.providers[side].rename(oinfo.oid, conflict_path) except CloudFileExistsError: i = i + 1 conflict_name = base + ".conflicted" + str(i) + ext log.debug("conflict renamed: %s -> %s", path, conflict_path) return oinfo.oid, new_oid, conflict_path def discard_entry(self, sync): sync.discard() self.state.storage_update(sync) def embrace_change(self, sync, changed, synced): log.debug("embrace %s", sync) if sync[changed].path: translated_path = self.translate(synced, sync[changed].path) if not translated_path: log.log(TRACE, ">>>Not a cloud path %s, discard", sync[changed].path) self.discard_entry(sync) return FINISHED # parent_conflict code parent = self.providers[changed].dirname(sync[changed].path) if any(e[changed].changed for e in self.state.lookup_path(changed, parent)): log.log(TRACE, "parent modify should happen first %s", sync[changed].path) sync.punt() return REQUEUE if sync[changed].exists == TRASHED: self.delete_synced(sync, changed, synced) return FINISHED if sync.is_path_change(changed) or sync.is_creation(changed): ret = self.handle_path_change_or_creation(sync, changed, synced) if ret == REQUEUE: return ret if sync[changed].hash != sync[changed].sync_hash: # not a new file, which means we must have last sync info log.debug("needs upload: %s index: %s", sync, synced) assert sync[synced].oid self.download_changed(changed, sync) self.upload_synced(changed, sync) return FINISHED log.debug("nothing changed %s", sync) return FINISHED def update_sync_path(self, sync, changed): assert sync[changed].oid info = self.providers[changed].info_oid(sync[changed].oid) if not info: sync[changed].exists = TRASHED return if not info.path: log.warning("impossible sync, no path. " "Probably a file that was shared, but not placed into a folder. discarding. %s", sync[changed]) self.discard_entry(sync) return log.debug("UPDATE PATH %s->%s", sync, info.path) self.update_entry( sync, changed, sync[changed].oid, path=info.path, exists=True) def handle_hash_conflict(self, sync): log.debug("splitting hash conflict %s", sync) # split the sync in two defer_ent, defer_side, replace_ent, replace_side \ = self.state.split(sync, self.providers) self.handle_split_conflict( defer_ent, defer_side, replace_ent, replace_side) def handle_split_conflict(self, defer_ent, defer_side, replace_ent, replace_side): if defer_ent[defer_side].otype == FILE: self.download_changed(defer_side, defer_ent) with open(defer_ent[defer_side].temp_file, "rb") as f: dhash = self.providers[replace_side].hash_data(f) if dhash == replace_ent[replace_side].hash: log.debug("same hash as remote, discard entry") self.discard_entry(replace_ent) return self.resolve_conflict((defer_ent[defer_side], replace_ent[replace_side])) def handle_path_conflict(self, sync): # consistent handling path1 = sync[0].path path2 = sync[1].path if path1 > path2: pick = 0 else: pick = 1 picked = sync[pick] other = sync[other_side(pick)] other_path = self.translate(other.side, picked.path) if other_path is None: return log.debug("renaming to handle path conflict: %s -> %s", other.oid, other_path) def _update_syncs(new_oid): self.update_entry(sync, other.side, new_oid, path=other_path) sync[other.side].sync_path = sync[other.side].path sync[picked.side].sync_path = sync[picked.side].path try: new_oid = self.providers[other.side].rename(other.oid, other_path) _update_syncs(new_oid) except CloudFileExistsError: # other side already agrees _update_syncs(other.oid) def detect_parent_conflict(self, sync: SyncEntry, changed) -> Optional[str]: provider = self.providers[changed] path = sync[changed].sync_path parent = provider.dirname(path) while path != parent: ents = list(self.state.lookup_path(changed, parent)) for ent in ents: ent: SyncEntry if ent[changed].changed: return ent[changed].path path = parent parent = provider.dirname(path) return None PKPO~* * cloudsync/sync/sqlite_storage.pyfrom typing import Dict, Any, Optional import logging import sqlite3 from cloudsync import Storage from cloudsync.log import TRACE log = logging.getLogger(__name__) class SqliteStorage(Storage): def __init__(self, filename: str): self._filename = filename self._ensure_table_exists() def _ensure_table_exists(self): self.db = sqlite3.connect(self._filename, uri=self._filename.startswith('file:'), check_same_thread=self._filename == ":memory:", timeout=5, isolation_level=None, ) self.db.execute("PRAGMA journal_mode=WAL;") self.db.execute("PRAGMA busy_timeout=5000;") # Not using AUTOINCREMENT: http://www.sqlitetutorial.net/sqlite-autoincrement/ self.db.execute('CREATE TABLE IF NOT EXISTS cloud (id INTEGER PRIMARY KEY, ' 'tag TEXT NOT NULL, serialization BLOB)') def create(self, tag: str, serialization: bytes) -> Any: db_cursor = self.db.execute('INSERT INTO cloud (tag, serialization) VALUES (?, ?)', [tag, serialization]) eid = db_cursor.lastrowid return eid def update(self, tag: str, serialization: bytes, eid: Any) -> int: log.log(TRACE, "updating eid%s", eid) db_cursor = self.db.execute('UPDATE cloud SET serialization = ? WHERE id = ? AND tag = ?', [serialization, eid, tag]) ret = db_cursor.rowcount if ret == 0: raise ValueError("id %s doesn't exist" % eid) return ret def delete(self, tag: str, eid: Any): log.log(TRACE, "deleting eid%s", eid) db_cursor = self.db.execute('DELETE FROM cloud WHERE id = ? AND tag = ?', [eid, tag]) if db_cursor.rowcount == 0: log.debug("ignoring delete: id %s doesn't exist", eid) return def read_all(self, tag: str) -> Dict[Any, bytes]: ret = {} db_cursor = self.db.execute('SELECT id, serialization FROM cloud WHERE tag = ?', [tag]) for row in db_cursor.fetchall(): eid, serialization = row ret[eid] = serialization return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: db_cursor = self.db.execute('SELECT serialization FROM cloud WHERE id = ? and tag = ?', [eid, tag]) for row in db_cursor.fetchall(): return row return None PKPOWggcloudsync/sync/state.pyimport copy import json import logging import time import traceback from abc import ABC, abstractmethod from enum import Enum from typing import Optional, Tuple, Any, List, Dict, Set from typing import Union from cloudsync.types import DIRECTORY, FILE, NOTKNOWN from cloudsync.types import OType from cloudsync.scramble import scramble from cloudsync.log import TRACE from .util import debug_sig log = logging.getLogger(__name__) __all__ = ['SyncState', 'SyncEntry', 'Storage', 'LOCAL', 'REMOTE', 'FILE', 'DIRECTORY'] # adds a repr to some classes class Reprable: # pylint: disable=too-few-public-methods def __repr__(self): return self.__class__.__name__ + ":" + debug_sig(id(self)) + str(self.__dict__) # safe ternary, don't allow traditional comparisons class Exists(Enum): UNKNOWN = None EXISTS = True TRASHED = False def __bool__(self): raise ValueError("never bool enums") UNKNOWN = Exists.UNKNOWN EXISTS = Exists.EXISTS TRASHED = Exists.TRASHED # state of a single object class SideState(Reprable): # pylint: disable=too-few-public-methods, too-many-instance-attributes def __init__(self, side: int, otype: OType): self.side: int = side # just for assertions self.otype: OType = otype self.hash: Optional[bytes] = None # hash at provider # time of last change (we maintain this) self.changed: Optional[float] = None self.sync_hash: Optional[bytes] = None # hash at last sync self.sync_path: Optional[str] = None # path at last sync self.path: Optional[str] = None # path at provider self.oid: Optional[str] = None # oid at provider self._exists: Exists = UNKNOWN # exists at provider self.temp_file: Optional[str] = None @property def exists(self): return self._exists # allow traditional sets of ternary @exists.setter def exists(self, val: Union[bool, Exists]): if val is False: val = TRASHED if val is True: val = EXISTS if val is None: val = UNKNOWN if type(val) != Exists: raise ValueError("use enum for exists") self._exists = val # these are not really local or remote # but it's easier to reason about using these labels LOCAL = 0 REMOTE = 1 def other_side(index): return 1-index class Storage(ABC): @abstractmethod def create(self, tag: str, serialization: bytes) -> Any: """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def update(self, tag: str, serialization: bytes, eid: Any) -> int: """ take a serialization str, update it in sqlite, return the count of rows updated """ ... @abstractmethod def delete(self, tag: str, eid: Any): """ take a serialization str, upsert it in sqlite, return the row id of the row as a persistence id""" ... @abstractmethod def read_all(self, tag: str) -> Dict[Any, bytes]: """yield all the serialized strings in a generator""" ... @abstractmethod def read(self, tag: str, eid: Any) -> Optional[bytes]: """return one serialized string or None""" ... # single entry in the syncs state collection class SyncEntry(Reprable): def __init__(self, otype: OType, storage_init: Optional[Tuple[Any, bytes]] = None): super().__init__() self.__states: List[SideState] = [SideState(0, otype), SideState(1, otype)] self.discarded: str = "" self.storage_id: Any = None self.dirty: bool = True self.punted: int = 0 if storage_init is not None: self.storage_id = storage_init[0] self.deserialize(storage_init) self.dirty = False def serialize(self) -> bytes: """converts SyncEntry into a json str""" def side_state_to_dict(side_state: SideState) -> dict: ret = dict() ret['otype'] = side_state.otype.value ret['side'] = side_state.side ret['hash'] = side_state.hash.hex() if isinstance( side_state.hash, bytes) else None ret['changed'] = side_state.changed ret['sync_hash'] = side_state.sync_hash.hex() if isinstance( side_state.sync_hash, bytes) else None ret['path'] = side_state.path ret['sync_path'] = side_state.sync_path ret['oid'] = side_state.oid ret['exists'] = side_state.exists.value ret['temp_file'] = side_state.temp_file # storage_id does not get serialized, it always comes WITH a serialization when deserializing return ret ser = dict() ser['side0'] = side_state_to_dict(self.__states[0]) ser['side1'] = side_state_to_dict(self.__states[1]) ser['discarded'] = self.discarded return json.dumps(ser).encode('utf-8') def deserialize(self, storage_init: Tuple[Any, bytes]): """loads the values in the serialization dict into self""" def dict_to_side_state(side, side_dict: dict) -> SideState: otype = OType(side_dict['otype']) side_state = SideState(side, otype) side_state.side = side_dict['side'] side_state.hash = bytes.fromhex( side_dict['hash']) if side_dict['hash'] else None side_state.changed = side_dict['changed'] side_state.sync_hash = bytes.fromhex( side_dict['sync_hash']) if side_dict['sync_hash'] else None side_state.sync_path = side_dict['sync_path'] side_state.path = side_dict['path'] side_state.oid = side_dict['oid'] side_state.exists = side_dict['exists'] side_state.temp_file = side_dict['temp_file'] return side_state self.storage_id = storage_init[0] ser: dict = json.loads(storage_init[1].decode('utf-8')) self.__states = [dict_to_side_state(0, ser['side0']), dict_to_side_state(1, ser['side1'])] self.discarded = ser['discarded'] def __getitem__(self, i): return self.__states[i] def __setitem__(self, i, val): assert type(val) is SideState assert val.side is None or val.side == i self.__states[i] = val self.dirty = True def hash_conflict(self): if self[0].hash and self[1].hash: return self[0].hash != self[0].sync_hash and self[1].hash != self[1].sync_hash return False def is_path_change(self, changed): return self[changed].path != self[changed].sync_path def is_creation(self, changed): return not self[changed].sync_path and self[changed].path def discard(self): self.discarded = ''.join(traceback.format_stack()) self.dirty = True @staticmethod def prettyheaders(): ret = "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s" % ( "EID", # _sig(id(self)), "SID", # _sig(self.storage_id), "Typ", # otype, "Change", # secs(self[LOCAL].changed), "Path", # self[LOCAL].path, "OID", # _sig(self[LOCAL].oid), "Last Sync Path E H", # str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, "Change", # secs(self[REMOTE].changed), "Path", # self[REMOTE].path, "OID", # _sig(self[REMOTE].oid), "Last Sync Path ", # str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, "Punt", # self.punted or "" ) return ret def pretty(self, fixed=True, use_sigs=True): if self.discarded: return "DISCARDED" def secs(t): if t: return str(round(t % 300, 3)).replace(".", "") else: return 0 def abbrev_bool(b, tup=('T', 'F', '?')): idx = 1-int(bool(b)) if b is None: idx = 2 return tup[idx] lexv = abbrev_bool(self[LOCAL].exists.value, ("E", "X", "?")) rexv = abbrev_bool(self[REMOTE].exists.value, ("E", "X", "?")) lhma = abbrev_bool(self[LOCAL].sync_hash != self[LOCAL].hash, ("H", "=", "?")) rhma = abbrev_bool(self[REMOTE].sync_hash != self[REMOTE].hash, ("H", "=", "?")) if use_sigs: _sig = debug_sig else: _sig = lambda a: a local_otype = self[LOCAL].otype.value if self[LOCAL].otype else '?' remote_otype = self[REMOTE].otype.value if self[REMOTE].otype else '?' if local_otype != remote_otype: otype = local_otype[0] + "-" + remote_otype[0] else: otype = local_otype if not fixed: return str((_sig(id(self)), otype, (secs(self[LOCAL].changed), self[LOCAL].path, _sig( self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma), (secs(self[REMOTE].changed), self[REMOTE].path, _sig( self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + lexv + ":" + lhma), self.punted)) ret = "%3s %3s %3s %6s %20s %6s %22s -- %6s %20s %6s %22s %s" % ( _sig(id(self)), _sig(self.storage_id), otype[:3], secs(self[LOCAL].changed), self[LOCAL].path, _sig(self[LOCAL].oid), str(self[LOCAL].sync_path) + ":" + lexv + ":" + lhma, secs(self[REMOTE].changed), self[REMOTE].path, _sig(self[REMOTE].oid), str(self[REMOTE].sync_path) + ":" + rexv + ":" + rhma, self.punted or "" ) return ret def __str__(self): return self.pretty(fixed=False) def store(self, tag: str, storage: Storage): if not self.storage_id: self.storage_id = storage.create(tag, self.serialize()) else: storage.update(tag, self.serialize(), self.storage_id) def punt(self): # do this one later # TODO provide help for making sure that we don't punt too many times self.punted += 1 class SyncState: def __init__(self, storage: Optional[Storage] = None, tag: Optional[str] = None, shuffle=True): self._oids = ({}, {}) self._paths = ({}, {}) self._changeset = set() self._storage: Optional[Storage] = storage self._tag = tag self.cursor_id = dict() self.shuffle = shuffle if self._storage: storage_dict = self._storage.read_all(tag) for eid, ent_ser in storage_dict.items(): ent = SyncEntry(None, (eid, ent_ser)) for side in [LOCAL, REMOTE]: path, oid = ent[side].path, ent[side].oid if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][oid] = ent self._oids[side][oid] = ent def _change_path(self, side, ent, path, provider): assert type(ent) is SyncEntry assert ent[side].oid prior_ent = ent prior_path = ent[side].path if prior_path: if prior_path in self._paths[side]: if prior_path == path and ent[side].oid in self._paths[side][prior_path]: return prior_ent = self._paths[side][prior_path].pop(ent[side].oid, None) if not self._paths[side][prior_path]: del self._paths[side][prior_path] if path: if path not in self._paths[side]: self._paths[side][path] = {} self._paths[side][path][ent[side].oid] = ent ent[side].path = path ent.dirty = True if prior_ent and prior_ent in self._changeset and prior_ent is not ent: log.debug("alter changeset") self._changeset.remove(prior_ent) self._changeset.add(ent) if ent[side].otype == DIRECTORY and prior_path != path and not prior_path is None: # changing directory also changes child paths for sub in self.get_all(): if not sub[side].path: continue relative = provider.is_subpath(prior_path, sub[side].path) if relative: new_path = provider.join(path, relative) self._change_path(side, sub, new_path, provider) if provider.oid_is_path: # TODO: state should not do online hits esp from event manager # either # a) have event manager *not* trigger this, maybe by passing none as the provider, etc # this may have knock on effects where the sync engine needs to process parent folders first # b) have a special oid_from_path function that is guaranteed not to be "online" # assert not _api() called, etc. new_info = provider.info_path(new_path) self._change_oid(side, sub, new_info.oid) assert ent in self.get_all() def _change_oid(self, side, ent, oid): assert type(ent) is SyncEntry prior_oid = ent[side].oid path = ent[side].path log.log(TRACE, "side(%s) of %s oid -> %s", side, ent, debug_sig(oid)) other = other_side(side) if ent[other].path: assert ent in self.lookup_path(other, ent[other].path), ("%s %s path not indexed" % (other, ent)) prior_ent = None if prior_oid: prior_ent = self._oids[side].pop(prior_oid, None) if oid: ent[side].oid = oid ent.dirty = True self._oids[side][oid] = ent else: log.log(TRACE, "removed oid from index") other = other_side(side) if ent[other].path: assert ent in self.lookup_path(other, ent[other].path), ("%s %s path not indexed" % (other, ent)) maybe_remove = set() if prior_ent and prior_ent is not ent and prior_ent in self._changeset: maybe_remove.add(prior_ent) self._changeset.add(ent) prior_ent = None if prior_oid and path and path in self._paths[side]: prior_ent = self._paths[side][path].pop(prior_oid, None) if oid and ent[side].path: if ent[side].path not in self._paths[side]: self._paths[side][ent[side].path] = {} self._paths[side][ent[side].path][oid] = ent if prior_ent and prior_ent is not ent and prior_ent in self._changeset: maybe_remove.add(prior_ent) for r in maybe_remove: if r in self.get_all(): continue log.debug("removing %s because oid and path not in index", r) self._changeset.remove(r) def lookup_oid(self, side, oid): try: ret = self._oids[side][oid] if not ret.discarded: return ret return None except KeyError: return None def lookup_path(self, side, path): try: ret = self._paths[side][path].values() if ret: return [e for e in ret if not e.discarded] return [] except KeyError: return [] def rename_dir(self, side, from_dir, to_dir, is_subpath, replace_path): """ when a directory changes, utility to rename all kids """ remove = [] # TODO: refactor this so that a list of affected items is gathered, then the alterations happen to the final # list, which will avoid having to remove after adding, which feels mildly risky # TODO: is this function called anywhere? ATM, it looks like no... It should be called or removed for path, oid_dict in self._paths[side].items(): if is_subpath(from_dir, path): new_path = replace_path(path, from_dir, to_dir) remove.append(path) self._paths[side][new_path] = oid_dict for ent in oid_dict.values(): ent[side].path = new_path for path in remove: self._paths[side].pop(path) def update_entry(self, ent, side, oid, provider, *, path=None, hash=None, exists=True, changed=False, otype=None): # pylint: disable=redefined-builtin, too-many-arguments if oid is not None: self._change_oid(side, ent, oid) if otype is not None: ent[side].otype = otype if otype is NOTKNOWN: assert not exists if path is not None: if provider is None: raise ValueError("Need provider info for path changes") self._change_path(side, ent, path, provider) if oid: assert ent in self.get_all() if hash is not None: ent[side].hash = hash ent.dirty = True if exists is not None and exists is not ent[side].exists: ent[side].exists = exists ent.dirty = True if changed: assert ent[side].path or ent[side].oid log.log(TRACE, "add %s to changeset", ent) self.mark_changed(side, ent) if oid: assert ent in self.get_all() log.log(TRACE, "updated %s", ent) def mark_changed(self, side, ent): ent[side].changed = time.time() self._changeset.add(ent) def storage_get_cursor(self, cursor_tag): if cursor_tag is None: return None retval = None if self._storage is not None: if cursor_tag in self.cursor_id: retval = self._storage.read(cursor_tag, self.cursor_id[cursor_tag]) if not retval: cursors = self._storage.read_all(cursor_tag) for eid, cursor in cursors.items(): self.cursor_id[cursor_tag] = eid retval = cursor if len(cursors) > 1: log.warning("Multiple cursors found for %s", cursor_tag) log.debug("storage_get_cursor id=%s cursor=%s", cursor_tag, str(retval)) return retval def storage_update_cursor(self, cursor_tag, cursor): if cursor_tag is None: return updated = 0 if self._storage is not None: if cursor_tag in self.cursor_id and self.cursor_id[cursor_tag]: updated = self._storage.update(cursor_tag, cursor, self.cursor_id[cursor_tag]) log.log(TRACE, "storage_update_cursor cursor %s %s", cursor_tag, cursor) if not updated: self.cursor_id[cursor_tag] = self._storage.create(cursor_tag, cursor) log.log(TRACE, "storage_update_cursor cursor %s %s", cursor_tag, cursor) def storage_update(self, ent: SyncEntry): log.log(TRACE, "storage_update eid%s", ent.storage_id) if self._storage is not None: if ent.storage_id is not None: if ent.discarded: log.debug("storage_update deleting eid%s", ent.storage_id) self._storage.delete(self._tag, ent.storage_id) else: self._storage.update(self._tag, ent.serialize(), ent.storage_id) else: if ent.discarded: log.error("Entry should not be discarded. Discard happened at: %s", ent.discarded) assert not ent.discarded # always raises, due to being in this if condition new_id = self._storage.create(self._tag, ent.serialize()) ent.storage_id = new_id log.debug("storage_update creating eid%s", ent.storage_id) ent.dirty = False def __len__(self): return len(self.get_all()) def update(self, side, otype, oid, provider, path=None, hash=None, exists=True, prior_oid=None): # pylint: disable=redefined-builtin, too-many-arguments log.log(TRACE, "lookup %s", debug_sig(oid)) ent = self.lookup_oid(side, oid) prior_ent = None if prior_oid and prior_oid != oid: prior_ent = self.lookup_oid(side, prior_oid) if not ent: ent = prior_ent prior_ent = None if ent and prior_ent: # oid_is_path conflict # the new entry has the same name as an old entry log.debug("rename o:%s path:%s prior:%s", debug_sig(oid), path, debug_sig(prior_oid)) log.debug("discarding old entry in favor of new %s", prior_ent) ent.discard() self.storage_update(ent) ent = prior_ent if prior_oid and prior_oid != oid: # this is an oid_is_path provider path_ents = self.lookup_path(side, path) if path_ents: if not ent: ent = path_ents[0] log.debug("matched existing entry %s:%s", debug_sig(oid), path) elif ent is not path_ents[0]: path_ents[0].discard() self.storage_update(path_ents[0]) log.debug("discarded existing entry %s:%s", debug_sig(oid), path) if not ent: log.debug("creating new entry because %s not found in %s", debug_sig(oid), side) ent = SyncEntry(otype) self.update_entry(ent, side, oid, provider, path=path, hash=hash, exists=exists, changed=True, otype=otype) self.storage_update(ent) def change(self, age): if not self._changeset: return None changes = self._changeset if self.shuffle: # at most 20 are randomized changes = scramble(changes, 20) earlier_than = time.time() - age for puntlevel in range(3): for e in changes: if not e.discarded and e.punted == puntlevel: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): return e ret = None remove = [] for e in self._changeset: if e.discarded: remove.append(e) else: if (e[LOCAL].changed and e[LOCAL].changed <= earlier_than) \ or (e[REMOTE].changed and e[REMOTE].changed <= earlier_than): ret = e for e in remove: self._changeset.discard(e) return ret def has_changes(self): return bool(self._changeset) def finished(self, ent): if ent[1].changed or ent[0].changed: log.info("not marking finished: %s", ent) return self._changeset.remove(ent) for e in self._changeset: e.punted = 0 def pretty_print(self, use_sigs=True): ret = SyncEntry.prettyheaders() + "\n" for e in self.get_all(): e: SyncEntry ret += e.pretty(fixed=True, use_sigs=use_sigs) + "\n" return ret def assert_index_is_correct(self): for ent in self._changeset: if not ent.discarded: assert ent in self.get_all(), ("%s in changeset, not in index" % ent) for ent in self.get_all(): assert ent if ent[LOCAL].path: assert ent in self.lookup_path(LOCAL, ent[LOCAL].path), ("%s local path not indexed" % ent) if ent[REMOTE].path: assert ent in self.lookup_path(REMOTE, ent[REMOTE].path), ("%s remote path not indexed" % ent) if ent[LOCAL].oid: assert ent is self.lookup_oid(LOCAL, ent[LOCAL].oid), ("%s local oid not indexed" % ent) if ent[REMOTE].oid: assert ent is self.lookup_oid(REMOTE, ent[REMOTE].oid), ("%s local oid not indexed" % ent) if ent[LOCAL].changed or ent[REMOTE].changed: if ent not in self._changeset: assert False, ("changeset missing %s" % ent) def get_all(self, discarded=False) -> Set['SyncState']: ents = set() for ent in self._oids[LOCAL].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) for ent in self._oids[REMOTE].values(): assert ent if ent.discarded and not discarded: continue ents.add(ent) return ents def entry_count(self): return len(self.get_all()) def split(self, ent, providers): log.debug("splitting %s", ent) defer = REMOTE replace = LOCAL defer_ent = ent replace_ent = SyncEntry(ent[replace].otype) replace_ent[replace] = copy.copy(ent[replace]) # copy in the replace state defer_ent[replace] = SideState(replace, ent[replace].otype) # clear out # fix indexes, so the defer ent no longer has replace stuff self.update_entry(defer_ent, replace, oid=None, path=None, exists=UNKNOWN, provider=providers[defer]) self.update_entry(defer_ent, defer, oid=defer_ent[defer].oid, changed=True, provider=providers[replace]) # add to index assert replace_ent[replace].oid self.update_entry( replace_ent, replace, oid=replace_ent[replace].oid, path=replace_ent[replace].path, changed=True, provider=providers[replace]) assert replace_ent[replace].oid # we aren't synced replace_ent[replace].sync_path = None replace_ent[replace].sync_hash = None # never synced defer_ent[defer].sync_path = None defer_ent[defer].sync_hash = None log.debug("split: %s", defer_ent) log.debug("split: %s", replace_ent) log.info("SPLIT\n%s", self.pretty_print()) assert replace_ent[replace].oid self.storage_update(defer_ent) self.storage_update(replace_ent) return defer_ent, defer, replace_ent, replace PKOeEDcloudsync/sync/util.pyfrom hashlib import md5 from base64 import b64encode # useful for converting oids and pointer nubmers into digestible nonces def debug_sig(t, size=3): if not t: return 0 return b64encode(md5(str(t).encode()).digest()).decode()[0:size] PKO==cloudsync/tests/__init__.pyfrom .fixtures.util import util from .test_provider import * PKOA%{{cloudsync/tests/conftest.pyimport cloudsync from .fixtures import * # pylint: disable=unused-import, unused-wildcard-import, wildcard-import cloudsync.logger.setLevel("TRACE") def pytest_configure(config): config.addinivalue_line("markers", "manual") def pytest_runtest_setup(item): if 'manual' in item.keywords and not item.config.getoption("--manual"): pytest.skip("need --manual option to run this test") def pytest_addoption(parser): parser.addoption("--provider", action="append", default=[], help="provider(s) to run tests for") parser.addoption("--manual", action="store_true", default=False, help="run the manual tests") PK.oOv&bcloudsync/tests/pytest.ini[pytest] log_format = %(asctime)s p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s: %(message)s log_date_format = %Y-%m-%d %H:%M:%S log_level = 5 log_cli = true # dont change this to debug, it will log passing tests as debug log_cli_level = warning PKPO^RCRCcloudsync/tests/test_cs.pyfrom io import BytesIO import logging import pytest from typing import List from .fixtures import MockProvider, MockStorage from cloudsync.sync.sqlite_storage import SqliteStorage from cloudsync import Storage, CloudSync, SyncState, SyncEntry, LOCAL, REMOTE, FILE, DIRECTORY from cloudsync.event import EventManager from .test_sync import WaitFor, RunUntilHelper log = logging.getLogger(__name__) @pytest.fixture(name="cs") def fixture_cs(mock_provider_generator): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass cs = CloudSyncMixin((mock_provider_generator(), mock_provider_generator()), roots, sleep=None) yield cs cs.done() def make_cs(mock_provider_creator, left, right): roots = ("/local", "/remote") class CloudSyncMixin(CloudSync, RunUntilHelper): pass return CloudSyncMixin((mock_provider_creator(*left), mock_provider_creator(*right)), roots, sleep=None) @pytest.fixture(name="multi_cs") def fixture_multi_cs(mock_provider_generator): storage_dict = dict() storage = MockStorage(storage_dict) class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = mock_provider_generator() p2 = mock_provider_generator() p3 = mock_provider_generator() roots1 = ("/local1", "/remote") roots2 = ("/local2", "/remote") cs1 = CloudSyncMixin((p1, p2), roots1, storage, sleep=None) cs2 = CloudSyncMixin((p1, p3), roots2, storage, sleep=None) yield cs1, cs2 cs1.done() cs2.done() def test_sync_multi(multi_cs): cs1, cs2 = multi_cs local_parent1 = "/local1" local_parent2 = "/local2" remote_parent1 = "/remote" remote_parent2 = "/remote" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path11 = "/local1/stuff1" local_path21 = "/local2/stuff1" local_path12 = "/local1/stuff2" local_path22 = "/local2/stuff2" cs1.providers[LOCAL].mkdir(local_parent1) cs1.providers[REMOTE].mkdir(remote_parent1) cs2.providers[LOCAL].mkdir(local_parent2) cs2.providers[REMOTE].mkdir(remote_parent2) linfo1 = cs1.providers[LOCAL].create(local_path11, BytesIO(b"hello1"), None) linfo2 = cs2.providers[LOCAL].create(local_path21, BytesIO(b"hello2"), None) rinfo1 = cs1.providers[REMOTE].create(remote_path2, BytesIO(b"hello3"), None) rinfo2 = cs2.providers[REMOTE].create(remote_path2, BytesIO(b"hello4"), None) assert linfo1 and linfo2 and rinfo1 and rinfo2 cs1.run_until_found( (LOCAL, local_path11), (LOCAL, local_path21), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) cs1.run(until=lambda: not cs1.state.has_changes(), timeout=1) log.info("TABLE 1\n%s", cs1.state.pretty_print()) log.info("TABLE 2\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 4 # 2 dirs, 2 files, 1 never synced (local2 file) try: cs2.run_until_found( (LOCAL, local_path12), (LOCAL, local_path22), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) except TimeoutError: log.info("Timeout: TABLE 1\n%s", cs1.state.pretty_print()) log.info("Timeout: TABLE 2\n%s", cs2.state.pretty_print()) raise linfo12 = cs1.providers[LOCAL].info_path(local_path12) rinfo11 = cs1.providers[REMOTE].info_path(remote_path1) linfo22 = cs2.providers[LOCAL].info_path(local_path22) rinfo21 = cs2.providers[REMOTE].info_path(remote_path1) assert linfo12.oid assert linfo22.oid assert rinfo11.oid assert rinfo21.oid assert linfo12.hash == rinfo1.hash assert linfo22.hash == rinfo2.hash # let cleanups/discards/dedups happen if needed cs2.run(until=lambda: not cs2.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs2.state.pretty_print()) assert len(cs1.state) == 4 assert len(cs2.state) == 4 def test_sync_basic(cs): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" remote_path2 = "/remote/stuff2" local_path2 = "/local/stuff2" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"hello2"), None) cs.run_until_found( (LOCAL, local_path1), (LOCAL, local_path2), (REMOTE, remote_path1), (REMOTE, remote_path2), timeout=2) linfo2 = cs.providers[LOCAL].info_path(local_path2) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo2.oid assert rinfo1.oid assert linfo2.hash == rinfo2.hash assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path2 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") # let cleanups/discards/dedups happen if needed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE\n%s", cs.state.pretty_print()) assert len(cs.state) == 4 assert not cs.state.has_changes() def setup_remote_local(cs, *names): remote_parent = "/remote" local_parent = "/local" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) found = [] for name in names: remote_path1 = "/remote/" + name local_path1 = "/local/" + name linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) found.append((REMOTE, remote_path1)) cs.run_until_found(*found) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) def test_sync_create_delete_same_name(cs): remote_parent = "/remote" local_parent = "/local" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello")) cs.run(until=lambda: not cs.state.has_changes(), timeout=2) rinfo = cs.providers[REMOTE].info_path(remote_path1) cs.emgrs[LOCAL].do() cs.providers[LOCAL].delete(linfo1.oid) cs.emgrs[LOCAL].do() linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) # run local event manager only... not sync cs.emgrs[LOCAL].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) # local and remote dirs can be disjoint # assert(len(cs.state) == 3 or len(cs.state) == 2) assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") rinfo = cs.providers[REMOTE].info_path(remote_path1) bio = BytesIO() cs.providers[REMOTE].download(rinfo.oid, bio) assert bio.getvalue() == b'goodbye' def test_sync_two_conflicts(cs): remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # the local delete/create doesn't add entries assert(len(cs.state) == 3) else: assert(len(cs.state) == 5) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 4) assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") b1 = BytesIO() b2 = BytesIO() cs.providers[LOCAL].download_path(local_path1, b1) cs.providers[LOCAL].download_path(local_path1 + ".conflicted", b2) assert b1.getvalue() in (b'goodbye', b'world') assert b2.getvalue() in (b'goodbye', b'world') assert b1.getvalue() != b2.getvalue() @pytest.mark.repeat(10) def test_sync_subdir_rename(cs): local_dir = "/local/a" local_base = "/local/a/stuff" local_dir2 = "/local/b" local_base2 = "/local/b/stuff" remote_dir = "/remote/a" remote_dir2 = "/remote/b" remote_base = "/remote/a/stuff" remote_base2 = "/remote/b/stuff" kid_count = 4 cs.providers[LOCAL].mkdir("/local") lpoid = cs.providers[LOCAL].mkdir(local_dir) lpaths = [] rpaths = [] rpaths2 = [] for i in range(kid_count): lpath = local_base + str(i) rpath = remote_base + str(i) rpath2 = remote_base2 + str(i) cs.providers[LOCAL].create(lpath, BytesIO(b'hello')) lpaths.append(lpath) rpaths.append((REMOTE, rpath)) rpaths2.append((REMOTE, rpath2)) cs.run_until_found(*rpaths, timeout=2) log.info("TABLE 1\n%s", cs.state.pretty_print()) cs.providers[LOCAL].rename(lpoid, local_dir2) for _ in range(10): cs.do() log.info("TABLE 2\n%s", cs.state.pretty_print()) cs.run_until_found(*rpaths2, timeout=2) log.info("TABLE 2\n%s", cs.state.pretty_print()) # this test is sensitive to the order in which things are processed # so run it a few times @pytest.mark.repeat(10) def test_sync_folder_conflicts_file(cs): remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff1/under" local_path1 = "/local/stuff1" setup_remote_local(cs, "stuff1") log.info("TABLE 0\n%s", cs.state.pretty_print()) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) cs.providers[LOCAL].delete(linfo1.oid) cs.providers[REMOTE].delete(rinfo1.oid) linfo2 = cs.providers[LOCAL].create(local_path1, BytesIO(b"goodbye")) linfo2 = cs.providers[REMOTE].mkdir(remote_path1) linfo2 = cs.providers[REMOTE].create(remote_path2, BytesIO(b"world")) # run event managers only... not sync cs.emgrs[LOCAL].do() cs.emgrs[REMOTE].do() log.info("TABLE 1\n%s", cs.state.pretty_print()) if cs.providers[LOCAL].oid_is_path: # there won't be 2 rows for /local/stuff1 is oid_is_path assert(len(cs.state) == 4) locs = cs.state.lookup_path(LOCAL, local_path1) assert locs and len(locs) == 1 loc = locs[0] assert loc[LOCAL].otype == FILE assert loc[REMOTE].otype == DIRECTORY else: # deleted /local/stuff, remote/stuff, remote/stuff/under, lcoal/stuff, /local assert(len(cs.state) == 6) cs.run_until_found((REMOTE, remote_path1), timeout=2) cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.info("TABLE 2\n%s", cs.state.pretty_print()) assert(len(cs.state) == 6 or len(cs.state) == 5) local_conf = cs.providers[LOCAL].info_path(local_path1 + ".conflicted") remote_conf = cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") assert local_conf and remote_conf @pytest.fixture(params=[(MockStorage, dict()), (SqliteStorage, 'file::memory:?cache=shared')], ids=["mock_storage", "sqlite_storage"], name="storage" ) def storage_fixture(request): return request.param def test_storage(storage): roots = ("/local", "/remote") storage_class = storage[0] storage_mechanism = storage[1] class CloudSyncMixin(CloudSync, RunUntilHelper): pass p1 = MockProvider(oid_is_path=False, case_sensitive=True) p2 = MockProvider(oid_is_path=False, case_sensitive=True) storage1: Storage = storage_class(storage_mechanism) cs1: CloudSync = CloudSyncMixin((p1, p2), roots, storage1, sleep=None) old_cursor = cs1.emgrs[0].state.storage_get_cursor(cs1.emgrs[0]._cursor_tag) assert old_cursor is not None log.debug("cursor=%s", old_cursor) test_sync_basic(cs1) # do some syncing, to get some entries into the state table storage2 = storage_class(storage_mechanism) cs2: CloudSync = CloudSyncMixin((p1, p2), roots, storage2, sleep=None) log.debug(f"state1 = {cs1.state.entry_count()}\n{cs1.state.pretty_print()}") log.debug(f"state2 = {cs2.state.entry_count()}\n{cs2.state.pretty_print()}") def not_dirty(s: SyncState): for se in s.get_all(): se: SyncEntry assert not se.dirty def compare_states(s1: SyncState, s2: SyncState) -> List[SyncEntry]: ret = [] found = False for e1 in s1.get_all(): e1: SyncEntry for e2 in s2.get_all(): e2: SyncEntry if e1.serialize() == e2.serialize(): found = True if not found: ret.append(e1) return ret not_dirty(cs1.state) missing1 = compare_states(cs1.state, cs2.state) missing2 = compare_states(cs2.state, cs1.state) for e in missing1: log.debug(f"entry in 1 not found in 2 {e.pretty()}") for e in missing2: log.debug(f"entry in 2 not found in 1 {e.pretty()}") assert not missing1 assert not missing2 new_cursor = cs1.emgrs[0].state.storage_get_cursor(cs1.emgrs[0]._cursor_tag) log.debug("cursor=%s %s", old_cursor, new_cursor) assert new_cursor is not None assert old_cursor != new_cursor @pytest.mark.parametrize("drain", [None, LOCAL, REMOTE]) def test_sync_already_there(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) linfo1 = cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) rinfo2 = cs.providers[REMOTE].create(remote_path1, BytesIO(b"hello"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert not cs.providers[LOCAL].info_path(local_path1 + ".conflicted") assert not cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") @pytest.mark.parametrize("drain", [LOCAL, REMOTE]) def test_sync_already_there_conflict(cs, drain: int): local_parent = "/local" remote_parent = "/remote" remote_path1 = "/remote/stuff1" local_path1 = "/local/stuff1" cs.providers[LOCAL].mkdir(local_parent) cs.providers[REMOTE].mkdir(remote_parent) cs.providers[LOCAL].create(local_path1, BytesIO(b"hello"), None) cs.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye"), None) if drain is not None: # one of the event managers is not reporting events cs.emgrs[drain]._drain() # fill up the state table cs.do() # all changes processed cs.run(until=lambda: not cs.state.has_changes(), timeout=1) linfo1 = cs.providers[LOCAL].info_path(local_path1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) assert linfo1.hash == rinfo1.hash assert cs.providers[LOCAL].info_path(local_path1 + ".conflicted") or cs.providers[REMOTE].info_path(remote_path1 + ".conflicted") @pytest.mark.parametrize('right', (True, False), ids=["right_cs", "right_in"]) @pytest.mark.parametrize('left', (True, False), ids=["left_cs", "left_in"]) def test_sync_rename_folder_case(mock_provider_creator, left, right): cs = make_cs(mock_provider_creator, (True, left), (False, right)) local_parent = "/local" remote_parent = "/remote" local_path1 = "/local/a" local_path2 = "/local/a/b" local_path3 = "/local/A" remote_path1 = "/remote/A" remote_path2 = "/remote/A/b" cs.providers[LOCAL].mkdir(local_parent) ldir = cs.providers[LOCAL].mkdir(local_path1) linfo = cs.providers[LOCAL].create(local_path2, BytesIO(b"hello"), None) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1) log.debug("--- cs: %s ---", [e.case_sensitive for e in cs.providers]) cs.providers[LOCAL].log_debug_state() cs.providers[REMOTE].log_debug_state() cs.providers[LOCAL].rename(ldir, local_path3) cs.do() cs.run(until=lambda: not cs.state.has_changes(), timeout=1) rinfo1 = cs.providers[REMOTE].info_path(remote_path1) rinfo2 = cs.providers[REMOTE].info_path(remote_path2) assert rinfo1 and rinfo2 PKPO).cloudsync/tests/test_events.pyimport time from io import BytesIO import threading import pytest from cloudsync import EventManager, SyncState, LOCAL @pytest.fixture(name="manager") def fixture_manager(mock_provider_generator): # TODO extend this to take any provider state = SyncState() provider = mock_provider_generator() yield EventManager(provider, state, LOCAL) def test_event_basic(manager): provider = manager.provider state = manager.state info = provider.create("/dest", BytesIO(b'hello')) assert not state.lookup_path(LOCAL, "/dest") oid = None # this is normally a blocking function that runs forever def done(): nonlocal oid states = state.get_all() if states: oid = list(states)[0][LOCAL].oid return state.lookup_oid(LOCAL, oid) return False # loop the sync until the file is found manager.run(timeout=1, until=done) assert oid info = provider.info_oid(oid) assert info.path == "/dest" def test_events_shutdown_event_shouldnt_process(manager): handle = threading.Thread(target=manager.run, **{'kwargs': {'sleep': .3}}) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) try: manager.events.__next__() except StopIteration: assert False try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() def test_events_shutdown_force_process_event(manager): handle = threading.Thread(target=manager.run, **{'kwargs': {'sleep': .3}}) handle.start() try: provider = manager.provider provider.create("/dest", BytesIO(b'hello')) manager.stop() time.sleep(.4) manager.do() try: manager.events.__next__() assert False except StopIteration: pass finally: manager.stop() PKO|Vcloudsync/tests/test_mux.pyimport threading from cloudsync.muxer import Muxer def test_simple_mux(): def gen(): yield from range(4) m1 = Muxer(gen) m2 = Muxer(gen) assert len(list(m1)) == 4 assert len(list(m2)) == 4 def test_thready_mux(): threads = 10 count = 1000 def gen(): yield from range(count) def counter(m): def inner(): inner.count = 0 for _ in m: inner.count += 1 return inner m = [None] * threads c = [None] * threads t = [None] * threads for i in range(threads): m[i] = Muxer(gen) c[i] = counter(m[i]) t[i] = threading.Thread(target=c[i]) for i in range(threads): t[i].start() for i in range(threads): t[i].join() assert c[i].count == count def test_later_mux(): def gen(): yield from range(4) m1 = Muxer(gen) assert next(m1) == 0 m2 = Muxer(gen) assert len(m1.listeners) == 2 assert len(list(m1)) == 3 assert len(list(m2)) == 3 def test_restart_mux(): def gen(): yield from range(4) m1 = Muxer(gen, restart=True) m2 = Muxer(gen, restart=True) assert len(m1.listeners) == 2 assert len(list(m1)) == 4 assert len(list(m2)) == 8 assert len(list(m1)) == 8 assert len(list(m2)) == 8 def test_del(): def gen(): yield from range(4) m1 = Muxer(gen) _ = next(m1) m2 = Muxer(gen) lm2 = list(m2) assert len(m2.listeners) == 2 assert len(lm2) == 3 m2.__del__() assert len(m1.listeners) == 1 _ = list(m1) assert gen in Muxer.already m1.__del__() assert len(m1.listeners) == 0 assert gen not in Muxer.already PKPO~.dd cloudsync/tests/test_provider.pyimport os import time import logging from io import BytesIO from unittest.mock import patch from typing import Union, NamedTuple, Optional, Generator import pytest import cloudsync from cloudsync import Event, CloudFileNotFoundError, CloudTemporaryError, CloudFileExistsError, FILE from cloudsync.tests.fixtures import Provider, mock_provider, mock_provider_instance from cloudsync.runnable import time_helper from cloudsync.types import OInfo # from cloudsync.providers import GDriveProvider, DropboxProvider log = logging.getLogger(__name__) ProviderMixin = Union[Provider, "ProviderHelper"] class ProviderHelper(): def __init__(self, prov): self.api_retry = True self.prov = prov self.test_root = getattr(self.prov, "test_root", None) self.event_timeout = getattr(self.prov, "event_timeout", 20) self.event_sleep = getattr(self.prov, "event_sleep", 1) self.creds = getattr(self.prov, "creds", {}) self.prov_api_func = self.prov._api self.prov._api = lambda *ar, **kw: self.__api_retry(self._api, *ar, **kw) self.prov.connect(self.creds) assert prov.connection_id if not self.test_root: # if the provider class doesn't specify a testing root # then just make one up self.test_root = "/" + os.urandom(16).hex() self.prov.mkdir(self.test_root) def _api(self, *ar, **kw): return self.prov_api_func(*ar, **kw) def __api_retry(self: ProviderMixin, func, *ar, **kw): # the cloud providers themselves should *not* have their own backoff logic # rather, they should punt rate limit and temp errors to the sync system # since we're not testing the sync system here, we need to make our own if not self.api_retry: return func(*ar, **kw) for _ in time_helper(timeout=self.event_timeout, sleep=self.event_sleep, multiply=2): try: return func(*ar, **kw) except CloudTemporaryError: log.info("api retry %s %s %s", func, ar, kw) # TEST-ROOT WRAPPER def __getattr__(self, k): return getattr(self.prov, k) def events(self) -> Generator[Event, None, None]: for e in self.prov.events(): if self.__filter_root(e): yield e def walk(self, path, since=None): path = self.__add_root(path) log.debug("WALK %s", path) for e in self.prov.walk(path): if self.__filter_root(e): yield e def download(self, *args, **kwargs): return self.__strip_root(self.prov.download(*args, **kwargs)) def create(self, path, file_like, metadata=None): path = self.__add_root(path) log.debug("CREATE %s", path) return self.__strip_root(self.prov.create(path, file_like, metadata)) def upload(self, *args, **kwargs): return self.__strip_root(self.prov.upload(*args, **kwargs)) def rename(self, oid, path): path = self.__add_root(path) return self.__strip_root(self.prov.rename(oid, path)) def mkdir(self, path): path = self.__add_root(path) return self.__strip_root(self.prov.mkdir(path)) def delete(self, *args, **kwargs): return self.__strip_root(self.prov.delete(*args, **kwargs)) def exists_oid(self, oid): return self.prov.exists_oid(oid) def exists_path(self, path): path = self.__add_root(path) return self.prov.exists_path(path) def info_path(self, path: str) -> Optional[OInfo]: path = self.__add_root(path) return self.__strip_root(self.prov.info_path(path)) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: return self.__strip_root(self.prov.info_oid(oid)) def listdir(self, oid): for e in self.prov.listdir(oid): if self.__filter_root(e): yield e def __add_root(self, path): return self.join(self.test_root, path) def __filter_root(self, obj): if hasattr(obj, "path"): raw_path = obj.path if not raw_path: info = self.prov.info_oid(obj.oid) if info: raw_path = info.path if not raw_path: # pathless objects always get passed through # so isolation is not perfect return True if not self.is_subpath(self.test_root, raw_path): return False self.__strip_root(obj) return True def __strip_root(self, obj): if hasattr(obj, "path"): path = obj.path if path: relative = self.is_subpath(self.test_root, path) assert relative path = relative if not path.startswith(self.sep): path = self.sep + path obj.path = path return obj # HELPERS def temp_name(self: ProviderMixin, name="tmp", *, folder=None): fname = self.join(folder or self.sep, os.urandom(16).hex() + "." + name) return fname def events_poll(self: ProviderMixin, timeout=None, until=None) -> Generator[Event, None, None]: if timeout is None: timeout = self.event_timeout if timeout == 0: yield from self.events() return for _ in time_helper(timeout, sleep=self.event_sleep, multiply=2): got = False for e in self.events(): yield e got = True if not until and got: break elif until and until(): break def __cleanup(self: ProviderMixin, oid): try: for info in self.prov.listdir(oid): if info.otype == FILE: log.debug("cleaning %s", info) self.delete(info.oid) else: self.__cleanup(info.oid) log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileNotFoundError: pass def test_cleanup(self: ProviderMixin, timeout=None, until=None): info = self.prov.info_path(self.test_root) self.__cleanup(info.oid) info = self.prov.info_path(self.test_root) if info: try: log.debug("cleaning %s", info) self.delete(info.oid) except CloudFileExistsError: # deleting the root might now be supported pass def mixin_provider(prov): assert prov assert isinstance(prov, Provider) prov = ProviderHelper(prov) yield prov prov.test_cleanup() @pytest.fixture def provider_params(): return None class ProviderConfig: def __init__(self, name, param=(), param_id=None): if param_id is None: param_id = name self.name = name if name == "mock": assert param self.param = param self.param_id = param_id def __repr__(self): return "%s(%s)" % (type(self), self.__dict__) @pytest.fixture def config_provider(request, provider_config): try: request.raiseerror("foo") except Exception as e: FixtureLookupError = type(e) if provider_config.name == "external": # if there's a fixture available, use it return request.getfixturevalue("cloudsync_provider") # deferring imports to prevent needing deps we don't want to require for everyone elif provider_config.name == "mock": return mock_provider_instance(*provider_config.param) elif provider_config.name == "gdrive": from .providers.gdrive import gdrive_provider return gdrive_provider() elif provider_config.name == "dropbox": from .providers.dropbox import dropbox_provider return dropbox_provider() else: assert False, "Must provide a valid --provider name or use the -p " known_providers = ('gdrive', 'external', 'dropbox', 'mock') def configs_from_name(name): provs = [] if name == "mock": provs += [ProviderConfig("mock", (False, True), "mock_oid_cs")] provs += [ProviderConfig("mock", (True, True), "mock_path_cs")] else: provs += [ProviderConfig(name)] return provs def configs_from_keyword(kw): provs = [] # crappy approximation of pytest evaluation routine, because false = {} for known_prov in known_providers: false[known_prov] = False for known_prov in known_providers: if known_prov == kw or '[' + known_prov + ']' == kw: ok = True else: ids = false.copy() ids[known_prov] = True try: ok = eval(kw, {}, ids) except NameError as e: ok = False except Exception as e: log.error("%s %s", type(e), e) ok = False if type(ok) is list: ok = any(ok) if ok: provs += configs_from_name(known_prov) return provs _registered = False def pytest_generate_tests(metafunc): global _registered if not _registered: for known_prov in known_providers: metafunc.config.addinivalue_line( "markers", known_prov ) _registered = True if "provider_config" in metafunc.fixturenames: provs = [] for e in metafunc.config.getoption("provider", []): for n in e.split(","): provs += configs_from_name(n) if not provs: kw = metafunc.config.getoption("keyword", "") if kw: provs += configs_from_keyword(kw) if not provs: provs += configs_from_name("mock") ids = [p.param_id for p in provs] marks = [pytest.param(p, marks=[getattr(pytest.mark, p.name)]) for p in provs] metafunc.parametrize("provider_config", marks, ids=ids) @pytest.fixture(name="provider") def provider_fixture(config_provider): yield from mixin_provider(config_provider) def test_join(mock_provider): assert "/a/b/c" == mock_provider.join("a", "b", "c") assert "/a/c" == mock_provider.join("a", None, "c") assert "/a/b/c" == mock_provider.join("/a", "/b", "/c") assert "/a/c" == mock_provider.join("a", "/", "c") def test_connect(provider): assert provider.connected def test_create_upload_download(provider): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) info2 = provider.upload(info1.oid, data()) assert info1.oid == info2.oid assert info1.hash == info2.hash assert provider.exists_path(dest) dest = BytesIO() provider.download(info2.oid, dest) dest.seek(0) assert dest.getvalue() == dat def test_rename(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") info1 = provider.create(dest, data()) dest2 = provider.temp_name("dest2") provider.rename(info1.oid, dest2) assert provider.exists_path(dest2) assert not provider.exists_path(dest) # test that renaming a folder renames the children folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() file_name = os.urandom(16).hex() file_path1 = provider.join(folder_name1, file_name) file_path2 = provider.join(folder_name2, file_name) sub_folder_name = os.urandom(16).hex() sub_folder_path1 = provider.join(folder_name1, sub_folder_name) sub_folder_path2 = provider.join(folder_name2, sub_folder_name) sub_file_name = os.urandom(16).hex() sub_file_path1 = provider.join(sub_folder_path1, sub_file_name) sub_file_path2 = provider.join(sub_folder_path2, sub_file_name) folder_oid = provider.mkdir(folder_name1) sub_folder_oid = provider.mkdir(sub_folder_path1) file_info = provider.create(file_path1, data()) sub_file_info = provider.create(sub_file_path1, data()) assert provider.exists_path(file_path1) assert not provider.exists_path(file_path2) assert provider.exists_path(sub_file_path1) assert not provider.exists_path(sub_file_path2) assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) provider.rename(folder_oid, folder_name2) assert provider.exists_path(file_path2) assert not provider.exists_path(file_path1) assert provider.exists_path(sub_file_path2) assert not provider.exists_path(sub_file_path1) if not provider.oid_is_path: assert provider.exists_oid(file_info.oid) assert provider.exists_oid(sub_file_info.oid) assert provider.info_oid(file_info.oid).path == file_path2 assert provider.info_oid(sub_file_info.oid).path == sub_file_path2 else: assert not provider.exists_oid(file_info.oid) assert not provider.exists_oid(sub_file_info.oid) def test_mkdir(provider: ProviderMixin): dat = os.urandom(32) def data(): return BytesIO(dat) dest = provider.temp_name("dest") provider.mkdir(dest) info = provider.info_path(dest) assert info.otype == cloudsync.DIRECTORY sub_f = provider.temp_name("dest", folder=dest) log.debug("parent = %s, sub = %s", dest, sub_f) with pytest.raises(CloudFileExistsError): provider.create(dest, data(), None) assert provider.exists_path(dest) log.debug("folder %s exists", dest) provider.create(sub_f, data(), None) def test_walk(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) folder = provider.temp_name("folder") provider.mkdir(folder) subfolder = provider.join(folder, provider.temp_name("subfolder")) provider.mkdir(subfolder) dest0 = provider.temp_name("dest0") dest1 = provider.join(folder, provider.temp_name("dest1")) dest2 = provider.join(subfolder, provider.temp_name("dest2")) oids = {} info = provider.create(dest0, temp, None) oids[dest0] = info.oid info = provider.create(dest1, temp, None) oids[dest1] = info.oid info = provider.create(dest2, temp, None) oids[dest2] = info.oid got_event = False found = {} for e in provider.walk("/"): if e.otype == cloudsync.DIRECTORY: continue log.debug("WALK %s", e) path = e.path if path is None: path = provider.info_oid(e.oid).path assert oids[path] == e.oid found[path] = True assert e.mtime assert e.exists got_event = True for x in [dest0, dest1, dest2]: assert found.get(x, False) is True log.debug("found %s", x) assert got_event def check_event_path(event: Event, provider: ProviderMixin, target_path): # confirms that the path in the event matches the target_path # if the provider doesn't provide the path in the event, look it up by the oid in the event # if we can't get the path, that's OK if the file doesn't exist event_path = event.path if event_path is None: try: event_path = provider.info_oid(event.oid).path assert event_path == target_path except CloudFileNotFoundError: if event.exists: raise def test_event_basic(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep, 1)): log.debug("event %s", e) wait_sleep_cycles = 30 info1 = provider.create(dest, temp, None) assert info1 is not None # TODO: check info1 for more things received_event = None event_count = 0 done = False waiting = None wait_secs = min(provider.event_sleep * wait_sleep_cycles, 2) for e in provider.events_poll(until=lambda: done): log.debug("got event %s", e) # you might get events for the root folder here or other setup stuff if e.exists: if not e.path: info = provider.info_oid(e.oid) if info: e.path = info.path if e.path == dest: received_event = e event_count += 1 log.debug("%s vs %s", e.path, dest) if e.path == dest and not waiting: waiting = time.monotonic() + wait_secs if waiting and time.monotonic() > waiting: # wait for extra events up to 10 sleep cycles, or 2 seconds done = True assert event_count == 1 assert received_event is not None assert received_event.oid path = received_event.path if path is None: path = provider.info_oid(received_event.oid).path assert path == dest assert received_event.mtime assert received_event.exists deleted_oid = received_event.oid provider.delete(oid=deleted_oid) provider.delete(oid=deleted_oid) # Tests that deleting a non-existing file does not raise a FNFE received_event = None event_count = 0 for e in provider.events_poll(): log.debug("event %s", e) if e.exists and e.path is None: info2 = provider.info_oid(e.oid) if info2: e.path = info2.path else: e.exists = False # assert not e.exists or e.path is not None # This is actually OK, google will do this legitimately log.debug("event %s", e) if (not e.exists and e.oid == deleted_oid) or (e.path and path in e.path): # assert not e.exists or e.path is not None, "non-trashed event without a path? %s" % e # this can happen on google, it's not a problem received_event = e event_count += 1 assert event_count == 1 assert received_event is not None assert received_event.oid assert not received_event.exists if received_event.path is not None: assert received_event.path == dest assert received_event.oid == deleted_oid assert received_event.mtime def test_event_del_create(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) temp2 = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) provider.delete(info1.oid) info2 = provider.create(dest, temp2) last_event = None saw_first_delete = False saw_first_create = False disordered = False done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path if path == dest or e.exists is False: last_event = e if e.oid == info1.oid: if e.exists: saw_first_create = True if saw_first_delete and not provider.oid_is_path: log.debug("disordered!") disordered = True else: saw_first_delete = True if e.exists and e.oid == info2.oid: if provider.oid_is_path: if saw_first_delete and saw_first_create: done = True else: done = True # the important thing is that we always get a create after the delete event assert last_event, "Event loop timed out before getting any events" assert done, "Event loop timed out after the delete, but before the create, " \ "saw_first_delete=%s, saw_first_create=%s, disordered=%s" % (saw_first_delete, saw_first_create, disordered) assert last_event.exists is True # The provider may compress out the first create, or compress out the first create and delete, or deliver both # So, if we saw the first create, make sure we got the delete. If we didn't see the first create, # it doesn't matter if we saw the first delete. if saw_first_create: assert saw_first_delete assert not disordered def test_event_rename(provider: ProviderMixin): temp = BytesIO(os.urandom(32)) dest = provider.temp_name("dest") dest2 = provider.temp_name("dest") dest3 = provider.temp_name("dest") # just get the cursor going for e in provider.events_poll(timeout=min(provider.event_sleep * 10, 1)): log.debug("event %s", e) info1 = provider.create(dest, temp) oid2 = provider.rename(info1.oid, dest2) if provider.oid_is_path: info1.oid = provider.info_path(dest2).oid oid3 = provider.rename(info1.oid, dest3) if provider.oid_is_path: info1.oid = provider.info_path(dest3).oid seen = set() last_event = None second_to_last = None done = False for e in provider.events_poll(provider.event_timeout * 2, until=lambda: done): if provider.oid_is_path: assert e.path log.debug("event %s", e) # you might get events for the root folder here or other setup stuff path = e.path if not e.path: info = provider.info_oid(e.oid) if info: path = info.path last_event = e seen.add(e.oid) if provider.oid_is_path: # 2 and 3 are in order if path == dest2: second_to_last = True if path == dest3 and (second_to_last or not provider.oid_is_path): done = True else: done = info1.oid in seen if provider.oid_is_path: # providers with path based oids need to send intermediate renames accurately and in order assert len(seen) > 2 assert last_event.path == dest3 assert last_event.prior_oid == oid2 else: # oid based providers just need to let us know something happend to that oid assert info1.oid in seen def test_api_failure(provider): # assert that the cloud # a) uses an api function # b) does not trap CloudTemporaryError's def side_effect(*a, **k): raise CloudTemporaryError("fake disconnect") with patch.object(provider, "_api", side_effect=side_effect): with patch.object(provider, "api_retry", False): with pytest.raises(CloudTemporaryError): provider.exists_path("/notexists") def test_file_not_found(provider: ProviderMixin): # Test that operations on nonexistent file system objects raise CloudFileNotFoundError # when appropriate, and don't when inappropriate dat = os.urandom(32) def data(): return BytesIO(dat) test_file_deleted_path = provider.temp_name("dest1") # Created, then deleted test_file_deleted_info = provider.create(test_file_deleted_path, data(), None) test_file_deleted_oid = test_file_deleted_info.oid provider.delete(test_file_deleted_oid) test_folder_deleted_path = provider.temp_name("dest1") # Created, then deleted test_folder_deleted_oid = provider.mkdir(test_folder_deleted_path) provider.delete(test_folder_deleted_oid) test_path_made_up = provider.temp_name("dest2") # Never created test_oid_made_up = "never created" # TODO: consider mocking info_path to always return None, and then call all the provider methods # to see if they are handling the None, and not raising exceptions other than FNF # Tests: # exists_path # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_path(test_file_deleted_path) is False assert provider.exists_path(test_folder_deleted_path) is False assert provider.exists_path(test_path_made_up) is False # exists_oid # deleted file, returns false, does not raise # deleted folder, returns false, does not raise # never existed fsobj, returns false, does not raise assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_oid(test_folder_deleted_oid) is False assert provider.exists_oid(test_oid_made_up) is False # info_path # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_path(test_file_deleted_path) is None assert provider.info_path(test_folder_deleted_path) is None assert provider.info_path(test_path_made_up) is None # info_oid # deleted file returns None # deleted folder returns None # never existed fsobj returns None assert provider.info_oid(test_file_deleted_oid) is None assert provider.info_oid(test_folder_deleted_oid) is None assert provider.info_oid(test_oid_made_up) is None # hash_oid # deleted file returns None # never existed file returns None # if getattr(provider, "hash_oid", False): # TODO implement hash_oid in gdrive, then don't have this be conditional # assert provider.hash_oid(test_file_deleted_oid) is None # assert provider.hash_oid(test_oid_made_up) is None # upload # to a deleted file raises FNF, or untrashes the file, either is OK # to a made up oid raises FNF # TODO: uploading to a deleted file might not raise an FNF, it might just untrash the file assert provider.exists_oid(test_file_deleted_oid) is False assert provider.exists_path(test_file_deleted_path) is False try: info = provider.upload(test_file_deleted_oid, data(), None) # This succeeded so the file must exist now, at the same oid as before assert info.oid == test_file_deleted_oid assert provider.exists_path(test_file_deleted_path) is True assert provider.exists_oid(test_file_deleted_oid) is True re_delete = True except CloudFileNotFoundError: re_delete = False pass if re_delete: provider.delete(test_file_deleted_oid) with pytest.raises(CloudFileNotFoundError): provider.upload(test_oid_made_up, data(), None) # create # to a non-existent folder, raises FNF # to a previously deleted folder, raises FNF with pytest.raises(CloudFileNotFoundError): provider.create(test_path_made_up + "/junk", data(), None) with pytest.raises(CloudFileNotFoundError): provider.create(test_folder_deleted_path + "/junk", data(), None) # upload: to the OID of a deleted folder, raises FNFE with pytest.raises(CloudFileNotFoundError): provider.upload(test_folder_deleted_oid, data(), None) # download # on a deleted oid raises FNF # on a made up oid raises FNF with pytest.raises(CloudFileNotFoundError): provider.download(test_file_deleted_oid, data()) with pytest.raises(CloudFileNotFoundError): provider.download(test_oid_made_up, data()) # rename # from a deleted oid raises FNF # from a made up oid raises FNF # to a non-existent folder raises [something], conditionally # to a previously deleted folder raises # check the rename source to see if there are others with pytest.raises(CloudFileNotFoundError): provider.rename(test_file_deleted_oid, test_file_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_folder_deleted_oid, test_folder_deleted_path) with pytest.raises(CloudFileNotFoundError): provider.rename(test_oid_made_up, test_path_made_up) # mkdir # to a non-existent folder raises FNF # to a previously deleted folder as parent folder raises FNF # to a previously deleted file as parent folder raises FNF with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_path_made_up + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_folder_deleted_path + "/junk") with pytest.raises(CloudFileNotFoundError): provider.mkdir(test_file_deleted_path + "/junk") # delete # on a deleted file oid does not raise # on a deleted folder oid does not raise # on a made up oid does not raise provider.delete(test_file_deleted_oid) provider.delete(test_folder_deleted_oid) provider.delete(test_oid_made_up) # delete: create a file, delete it, then create a new file at that path, then re-delete the deleted oid, raises FNFE temp_path = provider.temp_name() info1 = provider.create(temp_path, BytesIO(b"Hello")) provider.delete(info1.oid) info2 = provider.create(temp_path, BytesIO(b"world")) if provider.oid_is_path: assert provider.exists_oid(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) else: assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) provider.delete(info1.oid) assert provider.exists_path(temp_path) assert provider.exists_oid(info2.oid) # listdir # on a deleted file raises FNF # on a deleted folder raises FNF # on a made up path raises FNF with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_file_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_folder_deleted_oid)) with pytest.raises(CloudFileNotFoundError): list(provider.listdir(test_oid_made_up)) # TODO: Google drive raises FNF when it can't find the root... can we test for that here? def test_file_exists(provider: ProviderMixin): dat = os.urandom(32) def data(da=dat): return BytesIO(da) def create_and_delete_file(): create_and_delete_file_name = provider.temp_name() file_info = provider.create(create_and_delete_file_name, data(), None) provider.delete(file_info.oid) return create_and_delete_file_name, file_info.oid def create_and_delete_folder(): create_and_delete_folder_name = provider.temp_name() create_and_delete_folder_oid = provider.mkdir(create_and_delete_folder_name) provider.delete(create_and_delete_folder_oid) return create_and_delete_folder_name, create_and_delete_folder_oid def create_and_rename_file(): file_name1 = provider.temp_name() file_name2 = provider.temp_name() assert file_name1 != file_name2 file_info1 = provider.create(file_name1, data(), None) provider.rename(file_info1.oid, file_name2) return file_name1, file_info1.oid def create_and_rename_folder(): folder_name1 = provider.temp_name() folder_name2 = provider.temp_name() assert folder_name1 != folder_name2 folder_oid1 = provider.mkdir(folder_name1) provider.rename(folder_oid1, folder_name2) return folder_name1, folder_oid1 def create_file(create_file_name=None): if create_file_name is None: create_file_name = provider.temp_name() file_info = provider.create(create_file_name, data(), None) return create_file_name, file_info.oid def create_folder(create_folder_name=None): if create_folder_name is None: create_folder_name = provider.temp_name() create_folder_oid = provider.mkdir(create_folder_name) return create_folder_name, create_folder_oid # Test that operations on existent file system objects raise CloudExistsError # when appropriate, and don't when inappropriate # api methods to check for FileExists: # vulnerable to existing paths: # mkdir, create, rename # Possible issues to potentially check each of the vulnerable api methods: # target path has a component in the parent folder that already exists as a file # target path exists # target path exists, but the type of the existing object at that location is different from expected # target path exists, but the type of the existing object at that location is what was expected # target path existed, but was deleted, different type as source # target path existed, but was deleted, same type as source # target path existed, but was renamed, different type as source # target path existed, but was renamed, same type as source # # vulnerable to existing OIDs: # upload, delete # Possible issues to potentially check each of the vulnerable api methods: # target OID exists, but the type of the existing object at that location is different from expected # target OID existed, but was trashed, should un-trash the object # target OID is a non-empty folder, delete should raise FEx # # The enumerated tests: # mkdir: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name + "/junk") # mkdir: where target path exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): provider.mkdir(name) # mkdir: where target path exists as a folder, does not raise name1, oid1 = create_folder() oid2 = provider.mkdir(name1) assert oid1 == oid2 # mkdir: creating a file, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: creating a folder, deleting it, then creating a folder at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() oid2 = provider.mkdir(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as file, but was renamed name1, oid1 = create_and_rename_file() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # mkdir: target path existed as folder, but was renamed name1, oid1 = create_and_rename_folder() _, oid2 = create_folder(name1) assert oid1 != oid2 or provider.oid_is_path # upload: where target OID is a folder, raises FEx _, oid = create_folder() with pytest.raises(CloudFileExistsError): provider.upload(oid, data(), None) # delete: a non-empty folder, raises FEx name1, oid1 = create_folder() create_file(name1 + "/junk") with pytest.raises(CloudFileExistsError): provider.delete(oid1) # create: where target path exists, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name) def get_contents(oid): temp_contents = BytesIO() provider.download(oid, temp_contents) temp_contents.seek(0) return temp_contents.getvalue() # create: creating a file, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_file() _, oid2 = create_file(name1) assert oid1 != oid2 or provider.oid_is_path if provider.oid_is_path: assert provider.exists_oid(oid1) else: assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) # piggyback test -- uploading to the deleted oid should not step on the file that replaced it at that path if not provider.oid_is_path: try: new_contents = b"yo" # bytes(os.urandom(16).hex(), 'utf-8') new_info = provider.upload(oid1, BytesIO(new_contents)) assert new_info.oid != oid1 assert new_info.oid != oid2 assert not provider.exists_oid(oid1) contents2 = get_contents(oid2) assert contents2 == new_contents contents1 = get_contents(oid1) assert contents1 == new_contents except CloudFileNotFoundError: pass # create: creating a folder, deleting it, then creating a file at the same path, should not raise an FEx name1, oid1 = create_and_delete_folder() _, oid2 = create_file(name1) assert oid1 != oid or provider.oid_is_path # create: where target path has a parent folder that already exists as a file, raises FEx name, _ = create_file() with pytest.raises(CloudFileExistsError): create_file(name + "/junk") # create: target path existed as folder, but was renamed name, _ = create_and_rename_folder() create_file(name) # create: target path existed as file, but was renamed name, _ = create_and_rename_file() create_file(name) # rename: rename folder over empty folder succeeds name1, oid1 = create_folder() create_file(name1 + "/junk") name2, oid2 = create_folder() assert oid1 != oid2 contents1 = [x.name for x in provider.listdir(oid1)] provider.rename(oid1, name2) if provider.oid_is_path: log.debug("oid1 %s, oid2 %s", oid1, oid2) assert not provider.exists_oid(oid1) assert provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid2)] else: assert provider.exists_oid(oid1) assert not provider.exists_oid(oid2) contents2 = [x.name for x in provider.listdir(oid1)] assert contents1 == contents2 # rename: rename folder over non-empty folder raises FEx _, oid1 = create_folder() name2, oid2 = create_folder() assert oid1 != oid2 create_file(name2 + "/junk") with pytest.raises(CloudFileExistsError): provider.rename(oid1, name2) # rename: target has a parent folder that already exists as a file, raises FEx folder_name, _ = create_file() # notice that I am creating a file, and calling it a folder with pytest.raises(CloudFileExistsError): create_file(folder_name + "/junk") # rename: renaming file over empty folder, raises FEx folder_name, folder_oid = create_folder() file_name, file_oid = create_file() other_file_name, other_file_oid = create_file() with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # rename: renaming file over non-empty folder, raises FEx create_file(folder_name + "/test") with pytest.raises(CloudFileExistsError): provider.rename(file_oid, folder_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(folder_oid, file_name) # reuse the same file and folder from the last test # rename: renaming a folder over a file, raises FEx with pytest.raises(CloudFileExistsError): provider.rename(file_oid, other_file_name) # reuse the same file and folder from the last test # rename: create a file, delete it, then rename a file to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_file() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename file to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_file() provider.rename(oid2, deleted_folder_name) # rename: create a file, delete it, then rename a folder to the same path as the deleted, does not raise deleted_file_name, deleted_file_oid = create_and_delete_file() name2, oid2 = create_folder() provider.rename(oid2, deleted_file_name) # rename: create a folder, delete it, then rename folder to the same path as the deleted, does not raise deleted_folder_name, deleted_folder_oid1 = create_and_delete_folder() name2, oid2 = create_folder() provider.rename(oid2, deleted_folder_name) # rename: target folder path existed, but was renamed away, folder type as source name1, oid1 = create_and_rename_folder() name2, oid2 = create_folder() provider.rename(oid2, name1) # rename: target folder path existed, but was renamed away, file type as source name1, oid1 = create_folder() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, folder type as source name1, oid1 = create_file() name2, oid2 = create_folder() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # rename: target file path existed, but was renamed away, file type as source name1, oid1 = create_file() name2, oid2 = create_file() temp = provider.temp_name() provider.rename(oid1, temp) provider.rename(oid2, name1) # TODO: test that renaming A over B replaces B's OID with A's OID, and B's OID is trashed def test_listdir(provider: ProviderMixin): outer = provider.temp_name() root = provider.dirname(outer) temp_name = provider.is_subpath(root, outer) outer_oid_rm = provider.mkdir(outer) assert [] == list(provider.listdir(outer_oid_rm)) provider.delete(outer_oid_rm) outer_oid = provider.mkdir(outer) assert provider.exists_path(outer) assert provider.exists_oid(outer_oid) inner = outer + temp_name inner_oid = provider.mkdir(inner) assert provider.exists_oid(inner_oid) provider.create(outer + "/file1", BytesIO(b"hello")) provider.create(outer + "/file2", BytesIO(b"there")) provider.create(inner + "/file3", BytesIO(b"world")) contents = [x.name for x in provider.listdir(outer_oid)] assert len(contents) == 3 expected = ["file1", "file2", temp_name[1:]] assert contents.sort() == expected.sort() def test_upload_to_a_path(provider: ProviderMixin): temp_name = provider.temp_name() provider.create(temp_name, BytesIO(b"test")) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) def test_upload_zero_bytes(provider: ProviderMixin): temp_name = provider.temp_name() info = provider.create(temp_name, BytesIO(b"")) provider.upload(info.oid, BytesIO(b"")) dest = BytesIO() provider.download(info.oid, dest) def test_delete_doesnt_cross_oids(provider: ProviderMixin): temp_name = provider.temp_name() info1 = provider.create(temp_name, BytesIO(b"test1")) provider.delete(info1.oid) info2 = provider.create(temp_name, BytesIO(b"test2")) if not provider.oid_is_path: assert info1.oid != info2.oid assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) if not provider.oid_is_path: provider.delete(info1.oid) assert not provider.exists_oid(info1.oid) assert provider.exists_oid(info2.oid) # test uploading to a path instead of an OID. should raise something # This test will need to flag off whether the provider uses paths as OIDs or not with pytest.raises(Exception): provider.upload(temp_name, BytesIO(b"test2")) def test_rename_case_change(provider: ProviderMixin): temp_namel = provider.temp_name().lower() temp_nameu = temp_namel.upper() infol = provider.create(temp_namel, BytesIO(b"test")) assert infol.path == temp_namel new_oid = provider.rename(infol.oid, temp_nameu) assert new_oid infou = provider.info_oid(new_oid) assert infou.path == temp_nameu PKOSjjcloudsync/tests/test_sync.pyimport logging from io import BytesIO import pytest from typing import NamedTuple from cloudsync import SyncManager, SyncState, CloudFileNotFoundError, LOCAL, REMOTE, FILE, DIRECTORY from cloudsync.provider import Provider from cloudsync.types import OInfo class WaitFor(NamedTuple): side: int = None path: str = None hash: bytes = None oid: str = None exists: bool = True log = logging.getLogger(__name__) TIMEOUT = 2 class RunUntilHelper: def run_until_found(self: SyncManager, *files, timeout=TIMEOUT): log.debug("running until found") last_error = None def found(): ok = True for info in files: if type(info) is tuple: info = WaitFor(side=info[0], path=info[1]) try: other_info = self.providers[info.side].info_path(info.path) except CloudFileNotFoundError: other_info = None if other_info is None: nonlocal last_error if info.exists is False: log.debug("waiting not exists %s", info.path) continue log.debug("waiting exists %s", info.path) last_error = CloudFileNotFoundError(info.path) ok = False break if info.exists is False: ok = False break if info.hash and info.hash != other_info.hash: log.debug("waiting hash %s", info.path) ok = False break return ok self.run(timeout=timeout, until=found) if not found(): if last_error: raise TimeoutError("timed out while waiting: %s" % last_error) else: raise TimeoutError("timed out while waiting") class SyncMgrMixin(SyncManager, RunUntilHelper): pass @pytest.fixture(name="sync") def fixture_sync(mock_provider_generator): state = SyncState() def translate(to, path): if to == LOCAL: return "/local" + path.replace("/remote", "") if to == REMOTE: return "/remote" + path.replace("/local", "") raise ValueError("bad path: %s", path) def resolve(f1, f2): return None # two providers and a translation function that converts paths in one to paths in the other sync = SyncMgrMixin(state, (mock_provider_generator(), mock_provider_generator(oid_is_path=False)), translate, resolve) yield sync sync.done() def test_sync_state_basic(mock_provider): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo", provider=mock_provider) assert state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_sync_state_rename(mock_provider): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo", provider=mock_provider) state.update(LOCAL, FILE, path="foo2", oid="123", provider=mock_provider) assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_sync_state_rename2(mock_provider): state = SyncState() state.update(LOCAL, FILE, path="foo", oid="123", hash=b"foo", provider=mock_provider) state.update(LOCAL, FILE, path="foo2", oid="456", prior_oid="123", provider=mock_provider) assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") assert state.lookup_oid(LOCAL, oid="456") assert not state.lookup_oid(LOCAL, oid="123") state.assert_index_is_correct() def test_sync_state_rename3(mock_provider): state = SyncState() ahash = "ah" bhash = "bh" state.update(LOCAL, FILE, path="a", oid="a", hash=ahash, provider=mock_provider) state.update(LOCAL, FILE, path="b", oid="b", hash=bhash, provider=mock_provider) infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") assert infoa[LOCAL].hash == ahash assert infob[LOCAL].hash == bhash # rename in a circle state.update(LOCAL, FILE, path="c", oid="c", prior_oid="a", provider=mock_provider) log.debug("TABLE 0:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="a", oid="a", prior_oid="b", provider=mock_provider) log.debug("TABLE 1:\n%s", state.pretty_print(use_sigs=False)) state.update(LOCAL, FILE, path="b", oid="b", prior_oid="c", provider=mock_provider) log.debug("TABLE 2:\n%s", state.pretty_print(use_sigs=False)) assert state.lookup_path(LOCAL, "a") assert state.lookup_path(LOCAL, "b") infoa = state.lookup_oid(LOCAL, "a") infob = state.lookup_oid(LOCAL, "b") # hashes should be flipped assert infoa[LOCAL].hash == bhash assert infob[LOCAL].hash == ahash state.assert_index_is_correct() def test_sync_state_multi(mock_provider): state = SyncState() state.update(LOCAL, FILE, path="foo2", oid="123", provider=mock_provider) assert state.lookup_path(LOCAL, path="foo2") assert not state.lookup_path(LOCAL, path="foo") state.assert_index_is_correct() def test_sync_state_kids(mock_provider): # annoyingly, the state manager now interacts with the provider # this means that the state manager needs to know how to get an oid # TODO: make a layer that knows about providers and state, and ANOTHER layer that just knows about state # that way we can go back to have a pure state/storage manager state = SyncState() state.update(LOCAL, DIRECTORY, path="/dir", oid="123", provider=mock_provider) assert state.lookup_path(LOCAL, path="/dir") state.update(LOCAL, FILE, path="/dir/foo", oid="124", provider=mock_provider) assert state.lookup_path(LOCAL, path="/dir/foo") new_oid = mock_provider.mkdir("/dir2") mock_provider.create("/dir2/foo", BytesIO(b'hi')) state.update(LOCAL, DIRECTORY, path="/dir2", oid=new_oid, provider=mock_provider, prior_oid="123") log.debug("TABLE:\n%s", state.pretty_print(use_sigs=False)) state.assert_index_is_correct() assert len(state) == 2 assert state.lookup_path(LOCAL, "/dir2/foo") def test_sync_basic(sync: "SyncMgrMixin"): remote_parent = "/remote" local_parent = "/local" remote_path1 = Provider.join(remote_parent, "stuff1") local_path1 = sync.translate(LOCAL, remote_path1) local_path1.replace("\\", "/") assert local_path1 == "/local/stuff1" Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(LOCAL, FILE, oid=linfo.oid, exists=True) assert sync.state.entry_count() == 1 rinfo = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello2")) # inserts info about some cloud path sync.change_state(REMOTE, FILE, oid=rinfo.oid, path=remote_path2, hash=rinfo.hash) def done(): has_info = [None] * 4 try: has_info[0] = sync.providers[LOCAL].info_path("/local/stuff1") has_info[1] = sync.providers[LOCAL].info_path("/local/stuff2") has_info[2] = sync.providers[REMOTE].info_path("/remote/stuff2") has_info[3] = sync.providers[REMOTE].info_path("/remote/stuff2") except CloudFileNotFoundError as e: log.debug("waiting for %s", e) pass return all(has_info) # loop the sync until the file is found sync.run(timeout=TIMEOUT, until=done) assert done() info = sync.providers[LOCAL].info_path("/local/stuff2") assert info.hash == sync.providers[LOCAL].hash_oid(info.oid) assert info.oid log.debug("all state %s", sync.state.get_all()) sync.state.assert_index_is_correct() def test_sync_conflict_rename_path(sync): base = "/some@.o dd/cr azy.path" join = sync.providers[LOCAL].join sync.providers[LOCAL].mkdirs(base) path = join(base, "to()a.doc.zip") sync.providers[LOCAL].create(path, BytesIO(b"hello")) oid, new, cpath = sync.conflict_rename(LOCAL, path) assert cpath == join(base, "to()a.conflicted.doc.zip") sync.providers[LOCAL].create(path, BytesIO(b"hello")) oid, new, cpath = sync.conflict_rename(LOCAL, path) assert cpath == join(base, "to()a.conflicted2.doc.zip") def test_sync_rename(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" local_path2 = Provider.join(local_parent, "stuff2") # "/local/stuff2" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" remote_path2 = Provider.join(remote_parent, "stuff2") # "/remote/stuff2" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) new_oid = sync.providers[LOCAL].rename(linfo.oid, local_path2) sync.change_state(LOCAL, FILE, path=local_path2, oid=new_oid, hash=linfo.hash, prior_oid=linfo.oid) sync.run_until_found((REMOTE, remote_path2)) assert sync.providers[REMOTE].info_path("/remote/stuff") is None sync.state.assert_index_is_correct() def test_sync_hash(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" remote_path1 = "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) linfo = sync.providers[LOCAL].upload(linfo.oid, BytesIO(b"hello2")) sync.change_state(LOCAL, FILE, linfo.oid, hash=linfo.hash) sync.run_until_found(WaitFor(REMOTE, remote_path1, hash=linfo.hash)) info = sync.providers[REMOTE].info_path(remote_path1) check = BytesIO() sync.providers[REMOTE].download(info.oid, check) assert check.getvalue() == b"hello2" sync.state.assert_index_is_correct() def test_sync_rm(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) sync.providers[LOCAL].delete(linfo.oid) sync.change_state(LOCAL, FILE, linfo.oid, exists=False) sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state.assert_index_is_correct() def test_sync_mkdir(sync): local_dir1 = "/local" local_path1 = "/local/stuff" remote_dir1 = "/remote" remote_path1 = "/remote/stuff" local_dir_oid1 = sync.providers[LOCAL].mkdir(local_dir1) local_path_oid1 = sync.providers[LOCAL].mkdir(local_path1) # inserts info about some local path sync.change_state(LOCAL, DIRECTORY, path=local_dir1, oid=local_dir_oid1) sync.change_state(LOCAL, DIRECTORY, path=local_path1, oid=local_path_oid1) sync.run_until_found((REMOTE, remote_dir1)) sync.run_until_found((REMOTE, remote_path1)) log.debug("BEFORE DELETE\n %s", sync.state.pretty_print()) sync.providers[LOCAL].delete(local_path_oid1) sync.change_state(LOCAL, FILE, local_path_oid1, exists=False) log.debug("AFTER DELETE\n %s", sync.state.pretty_print()) log.debug("wait for delete") sync.run_until_found(WaitFor(REMOTE, remote_path1, exists=False)) assert sync.providers[REMOTE].info_path(remote_path1) is None sync.state.assert_index_is_correct() def test_sync_conflict_simul(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo = sync.providers[REMOTE].create(remote_path1, BytesIO(b"goodbye")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(REMOTE, FILE, path=remote_path1, oid=rinfo.oid, hash=rinfo.hash) # one of them is a conflict sync.run(until=lambda: sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") or sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") ) sync.run_until_found( (REMOTE, "/remote/stuff1"), (LOCAL, "/local/stuff1") ) sync.providers[LOCAL].log_debug_state("LOCAL") sync.providers[REMOTE].log_debug_state("REMOTE") b1 = BytesIO() b2 = BytesIO() if sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted"): sync.providers[REMOTE].download_path("/remote/stuff1.conflicted", b1) sync.providers[REMOTE].download_path("/remote/stuff1", b2) else: sync.providers[LOCAL].download_path("/local/stuff1.conflicted", b2) sync.providers[LOCAL].download_path("/local/stuff1", b1) # both files are intact assert b1.getvalue() != b2.getvalue() assert b1.getvalue() in (b"hello", b"goodbye") assert b2.getvalue() in (b"hello", b"goodbye") sync.state.assert_index_is_correct() MERGE = 2 @pytest.mark.parametrize("keep", [True, False]) @pytest.mark.parametrize("side", [LOCAL, REMOTE, MERGE]) def test_sync_conflict_resolve(sync, side, keep): data = (b"hello", b"goodbye", b"merge") def resolver(f1, f2): if side == MERGE: return (BytesIO(data[MERGE]), keep) if f1.side == side: return (f1, keep) return (f2, keep) sync.set_resolver(resolver) remote_parent = "/remote" local_parent = "/local" local_path1 = Provider.join(local_parent, "stuff1") # "/local/stuff1" remote_path1 = Provider.join(remote_parent, "stuff1") # "/remote/stuff1" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(data[LOCAL])) rinfo = sync.providers[REMOTE].create(remote_path1, BytesIO(data[REMOTE])) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.change_state(REMOTE, FILE, path=remote_path1, oid=rinfo.oid, hash=rinfo.hash) # ensure events are flushed a couple times sync.run(until=lambda: not sync.state.has_changes(), timeout=1) sync.providers[LOCAL].log_debug_state("LOCAL") sync.providers[REMOTE].log_debug_state("REMOTE") b1 = BytesIO() b2 = BytesIO() sync.providers[REMOTE].download_path("/remote/stuff1", b2) sync.providers[LOCAL].download_path("/local/stuff1", b1) # both files are intact assert b1.getvalue() == data[side] assert b2.getvalue() == data[side] if not keep: assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") else: assert sync.providers[LOCAL].exists_path("/local/stuff1.conflicted") or sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted") assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted.conflicted") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted.conflicted") assert not sync.providers[LOCAL].exists_path("/local/stuff1.conflicted2") assert not sync.providers[REMOTE].exists_path("/remote/stuff1.conflicted2") sync.state.assert_index_is_correct() def test_sync_conflict_path(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff" remote_path1 = "/remote/stuff" local_path2 = "/local/stuff-l" remote_path2 = "/remote/stuff-r" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo.oid, hash=linfo.hash) sync.run_until_found((REMOTE, remote_path1)) rinfo = sync.providers[REMOTE].info_path(remote_path1) assert len(sync.state.get_all()) == 1 ent = sync.state.get_all().pop() sync.providers[REMOTE].log_debug_state("BEFORE") new_oid_l = sync.providers[LOCAL].rename(linfo.oid, local_path2) new_oid_r = sync.providers[REMOTE].rename(rinfo.oid, remote_path2) sync.providers[REMOTE].log_debug_state("AFTER") sync.change_state(LOCAL, FILE, path=local_path2, oid=new_oid_l, hash=linfo.hash, prior_oid=linfo.oid) assert len(sync.state.get_all()) == 1 assert ent[REMOTE].oid == new_oid_r sync.change_state(REMOTE, FILE, path=remote_path2, oid=new_oid_r, hash=rinfo.hash, prior_oid=rinfo.oid) assert len(sync.state.get_all()) == 1 log.debug("TABLE 0:\n%s", sync.state.pretty_print()) # currently defers to the alphabetcially greater name, rather than conflicting sync.run_until_found((LOCAL, "/local/stuff-r")) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) assert not sync.providers[LOCAL].exists_path(local_path1) assert not sync.providers[LOCAL].exists_path(local_path2) sync.state.assert_index_is_correct() def test_sync_cycle(sync): sync: SyncMgrMixin l_parent = "/local" r_parent = "/remote" lp1, lp2, lp3 = "/local/a", "/local/b", "/local/c", rp1, rp2, rp3 = "/remote/a", "/remote/b", "/remote/c", templ = "/local/d" sync.providers[LOCAL].mkdir(l_parent) sync.providers[REMOTE].mkdir(r_parent) linfo1 = sync.providers[LOCAL].create(lp1, BytesIO(b"hello1")) sync.change_state(LOCAL, FILE, path=lp1, oid=linfo1.oid, hash=linfo1.hash) sync.run_until_found((REMOTE, rp1), timeout=1) rinfo1 = sync.providers[REMOTE].info_path(rp1) linfo2 = sync.providers[LOCAL].create(lp2, BytesIO(b"hello2")) sync.change_state(LOCAL, FILE, path=lp2, oid=linfo2.oid, hash=linfo2.hash) sync.run_until_found((REMOTE, rp2)) rinfo2 = sync.providers[REMOTE].info_path(rp2) linfo3 = sync.providers[LOCAL].create(lp3, BytesIO(b"hello3")) sync.change_state(LOCAL, FILE, path=lp3, oid=linfo3.oid, hash=linfo3.hash) sync.run_until_found((REMOTE, rp3)) rinfo3 = sync.providers[REMOTE].info_path(rp3) sync.providers[REMOTE].log_debug_state("BEFORE") tmp1oid = sync.providers[LOCAL].rename(linfo1.oid, templ) lp1oid = sync.providers[LOCAL].rename(linfo3.oid, lp1) lp3oid = sync.providers[LOCAL].rename(linfo2.oid, lp3) lp2oid = sync.providers[LOCAL].rename(tmp1oid, lp2) # a->temp, c->a, b->c, temp->b log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=templ, oid=tmp1oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp1, oid=lp1oid, hash=linfo3.hash, prior_oid=linfo3.oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp3, oid=lp3oid, hash=linfo2.hash, prior_oid=linfo2.oid) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) sync.change_state(LOCAL, FILE, path=lp2, oid=lp2oid, hash=linfo1.hash, prior_oid=tmp1oid) log.debug("TABLE 4:\n%s", sync.state.pretty_print()) assert len(sync.state.get_all()) == 3 sync.providers[REMOTE].log_debug_state("MIDDLE") sync.run(until=lambda: not sync.state.has_changes(), timeout=1) sync.providers[REMOTE].log_debug_state("AFTER") i1 = sync.providers[REMOTE].info_path(rp1) i2 = sync.providers[REMOTE].info_path(rp2) i3 = sync.providers[REMOTE].info_path(rp3) assert i1 and i2 and i3 assert i1.hash == rinfo3.hash assert i2.hash == rinfo1.hash assert i3.hash == rinfo2.hash def test_sync_conflict_path_combine(sync): remote_parent = "/remote" local_parent = "/local" local_path1 = "/local/stuff1" local_path2 = "/local/stuff2" remote_path1 = "/remote/stuff1" remote_path2 = "/remote/stuff2" local_path3 = "/local/stuff" remote_path3 = "/remote/stuff" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_path1, BytesIO(b"hello")) rinfo2 = sync.providers[REMOTE].create(remote_path2, BytesIO(b"hello")) # inserts info about some local path sync.change_state(LOCAL, FILE, path=local_path1, oid=linfo1.oid, hash=linfo1.hash) sync.change_state(REMOTE, FILE, path=remote_path2, oid=rinfo2.oid, hash=rinfo2.hash) sync.run_until_found((REMOTE, remote_path1), (LOCAL, local_path2)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) new_oid1 = sync.providers[LOCAL].rename(linfo1.oid, local_path3) prior_oid = sync.providers[LOCAL].oid_is_path and linfo1.oid sync.change_state(LOCAL, FILE, path=local_path3, oid=new_oid1, prior_oid=prior_oid) new_oid2 = sync.providers[REMOTE].rename(rinfo2.oid, remote_path3) prior_oid = sync.providers[REMOTE].oid_is_path and rinfo2.oid sync.change_state(REMOTE, FILE, path=remote_path3, oid=new_oid2, prior_oid=prior_oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) ok = lambda: ( sync.providers[REMOTE].exists_path("/remote/stuff.conflicted") or sync.providers[LOCAL].exists_path("/local/stuff.conflicted") ) sync.run(until=ok, timeout=3) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) assert ok() def test_create_then_move(sync): # TODO: combine with the reverse test remote_parent = "/remote" local_parent = "/local" local_folder = "/local/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" sync.providers[LOCAL].mkdir(local_parent) sync.providers[REMOTE].mkdir(remote_parent) linfo1 = sync.providers[LOCAL].create(local_file1, BytesIO(b"hello")) sync.change_state(LOCAL, FILE, path=local_file1, oid=linfo1.oid, hash=linfo1.hash) sync.run_until_found((REMOTE, remote_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[LOCAL].mkdir(local_folder) sync.change_state(LOCAL, DIRECTORY, path=local_folder, oid=folder_oid, hash=None) new_oid = sync.providers[LOCAL].rename(linfo1.oid, local_file2) sync.change_state(LOCAL, FILE, path=local_file2, oid=new_oid, hash=linfo1.hash, prior_oid=linfo1.oid) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((REMOTE, remote_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def test_create_then_move_reverse(sync): # TODO: see if this can be combined with the reverse test remote_parent = "/remote" local_parent = "/local" remote_folder = "/remote/folder" local_file1 = "/local/file" remote_file1 = "/remote/file" local_file2 = "/local/folder/file" remote_file2 = "/remote/folder/file" oid = sync.providers[LOCAL].mkdir(local_parent) oid = sync.providers[REMOTE].mkdir(remote_parent) rinfo1 = sync.providers[REMOTE].create(remote_file1, BytesIO(b"hello")) sync.change_state(REMOTE, FILE, path=remote_file1, oid=rinfo1.oid, hash=rinfo1.hash) sync.run_until_found((LOCAL, local_file1)) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) folder_oid = sync.providers[REMOTE].mkdir(remote_folder) sync.change_state(REMOTE, DIRECTORY, path=remote_folder, oid=folder_oid, hash=None) sync.providers[REMOTE].rename(rinfo1.oid, remote_file2) sync.change_state(REMOTE, FILE, path=remote_file2, oid=rinfo1.oid, hash=rinfo1.hash) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) sync.run_until_found((LOCAL, local_file2), timeout=2) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) def _test_rename_folder_with_kids(sync, source, dest): parent = ["/local", "/remote"] folder1 = ["/local/folder1", "/remote/folder1"] file1 = ["/local/folder1/file", "/remote/folder1/file"] folder2 = ["/local/folder2", "/remote/folder2"] file2 = ["/local/folder2/file", "/remote/folder2/file"] for loc in (source, dest): sync.providers[loc].mkdir(parent[loc]) folder_oid = sync.providers[source].mkdir(folder1[source]) sync.change_state(source, DIRECTORY, path=folder1[source], oid=folder_oid, hash=None) file_info: OInfo = sync.providers[source].create(file1[source], BytesIO(b"hello")) sync.change_state(source, FILE, path=file1[source], oid=file_info.oid, hash=None) log.debug("TABLE 0:\n%s", sync.state.pretty_print()) sync.run_until_found((dest, folder1[dest])) log.debug("TABLE 1:\n%s", sync.state.pretty_print()) new_oid = sync.providers[source].rename(folder_oid, folder2[source]) sync.change_state(source, DIRECTORY, path=folder2[source], oid=new_oid, hash=None, prior_oid=folder_oid) log.debug("TABLE 2:\n%s", sync.state.pretty_print()) sync.run_until_found((source, file2[source])) sync.run_until_found((dest, file2[dest])) sync.run_until_found( (source, file2[source]), (dest, file2[dest]), ) log.debug("TABLE 3:\n%s", sync.state.pretty_print()) @pytest.mark.parametrize("ordering", [(LOCAL, REMOTE), (REMOTE, LOCAL)]) def test_rename_folder_with_kids(sync, ordering): _test_rename_folder_with_kids(sync, *ordering) # TODO: test to confirm that a file that is both a rename and an update will be both renamed and updated # TODO: test to confirm that a sync with an updated path name that is different but matches the old name will be ignored (not sure what this means EA 8/26) PKOMM$cloudsync/tests/fixtures/__init__.pyfrom .util import * from .mock_provider import * from .mock_storage import * PKPO >>)cloudsync/tests/fixtures/mock_provider.pyimport os import time import copy import logging from hashlib import md5 from typing import Dict, List, Any, Optional, Generator import pytest from cloudsync.event import Event from cloudsync.provider import Provider from cloudsync.types import OInfo, OType, DirInfo from cloudsync.exceptions import CloudFileNotFoundError, CloudFileExistsError log = logging.getLogger(__name__) class MockFSObject: # pylint: disable=too-few-public-methods FILE = 'mock file' DIR = 'mock dir' def __init__(self, path, object_type, oid_is_path, contents=None, mtime=None): # self.display_path = path # TODO: used for case insensitive file systems if contents is None and type == MockFSObject.FILE: contents = b"" self.path = path.rstrip("/") self.contents = contents self.oid = path if oid_is_path else str(id(self)) self.exists = True self.type = object_type if self.type == self.FILE: # none is not valid for empty file if self.contents is None: self.contents = b'' self.mtime = mtime or time.time() @property def otype(self): if self.type == self.FILE: return OType.FILE else: return OType.DIRECTORY def hash(self) -> Optional[bytes]: if self.type == self.DIR: return None return md5(self.contents).digest() def update(self): self.mtime = time.time() def copy(self): return copy.copy(self) class MockEvent: # pylint: disable=too-few-public-methods ACTION_CREATE = "provider create" ACTION_RENAME = "provider rename" ACTION_UPDATE = "provider modify" ACTION_DELETE = "provider delete" def __init__(self, action, target_object: MockFSObject, prior_oid=None): self._target_object = copy.copy(target_object) self._action = action self._prior_oid = prior_oid self._timestamp = time.time() def serialize(self): ret_val = {"action": self._action, "id": self._target_object.oid, "object type": self._target_object.type, "path": self._target_object.path, "mtime": self._target_object.mtime, "prior_oid": self._prior_oid, "trashed": not self._target_object.exists, } return ret_val class MockProvider(Provider): default_sleep = 0.01 connected = True name = "Mock" # TODO: normalize names to get rid of trailing slashes, etc. def __init__(self, oid_is_path: bool, case_sensitive: bool): """Constructor for MockProvider :param oid_is_path: Act as a filesystem or other oid-is-path provider :param case_sensitive: Paths are case sensistive """ super().__init__() log.debug("mock mode: o:%s, c:%s", oid_is_path, case_sensitive) self.oid_is_path = oid_is_path self.case_sensitive = case_sensitive self._fs_by_path: Dict[str, MockFSObject] = {} self._fs_by_oid: Dict[str, MockFSObject] = {} self._events: List[MockEvent] = [] self._latest_cursor = -1 self._cursor = -1 self._type_map = { MockFSObject.FILE: OType.FILE, MockFSObject.DIR: OType.DIRECTORY, } self.event_timeout = 1 self.event_sleep = 0.001 self.creds = {} self.connection_id = os.urandom(2).hex() def _register_event(self, action, target_object, prior_oid=None): event = MockEvent(action, target_object, prior_oid) self._events.append(event) target_object.update() self._latest_cursor = len(self._events) - 1 def _get_by_oid(self, oid): self._api() return self._fs_by_oid.get(oid, None) def normalize(self, path): if self.case_sensitive: return path else: return path.lower() def _get_by_path(self, path): path = self.normalize(path) # TODO: normalize the path, support case insensitive lookups, etc self._api() return self._fs_by_path.get(path, None) def _store_object(self, fo: MockFSObject): # TODO: support case insensitive storage assert fo.path == fo.path.rstrip("/") self._fs_by_path[self.normalize(fo.path)] = fo self._fs_by_oid[fo.oid] = fo def _unstore_object(self, fo: MockFSObject): # TODO: do I need to check if the path and ID exist before del to avoid a key error, # or perhaps just catch and swallow that exception? del self._fs_by_path[self.normalize(fo.path)] del self._fs_by_oid[fo.oid] def _translate_event(self, pe: MockEvent, cursor) -> Event: event = pe.serialize() provider_type = event.get("object type", None) standard_type = self._type_map.get(provider_type, None) assert standard_type oid = event.get("id", None) mtime = event.get("mtime", None) trashed = event.get("trashed", None) prior_oid = event.get("prior_oid", None) path = None if self.oid_is_path: path = event.get("path") retval = Event(standard_type, oid, path, None, not trashed, mtime, prior_oid, new_cursor=cursor) return retval def _api(self, *args, **kwargs): pass @property def latest_cursor(self): return self._latest_cursor @property def current_cursor(self): return self._cursor def events(self) -> Generator[Event, None, None]: self._api() while self._cursor < self._latest_cursor: self._cursor += 1 pe = self._events[self._cursor] yield self._translate_event(pe, self._cursor) def walk(self, path, since=None): # TODO: implement "since" parameter self._api() for obj in self._fs_by_oid.values(): if self.is_subpath(path, obj.path, strict=False): yield Event(obj.otype, obj.oid, obj.path, obj.hash(), obj.exists, obj.mtime) def upload(self, oid, file_like, metadata=None) -> OInfo: self._api() file = self._fs_by_oid.get(oid, None) if file is None or not file.exists: raise CloudFileNotFoundError(oid) if file.type != MockFSObject.FILE: raise CloudFileExistsError("Only files may be uploaded, and %s is not a file" % file.path) contents = file_like.read() file.contents = contents self._register_event(MockEvent.ACTION_UPDATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def listdir(self, oid) -> Generator[DirInfo, None, None]: folder_obj = self._get_by_oid(oid) if not (folder_obj and folder_obj.exists and folder_obj.type == MockFSObject.DIR): raise CloudFileNotFoundError(oid) path = folder_obj.path for obj in self._fs_by_oid.values(): if obj.exists: relative = self.is_subpath(path, obj.path, strict=True) if relative: relative = relative.lstrip("/") if "/" not in relative: yield DirInfo(otype=obj.otype, oid=obj.oid, hash=obj.hash(), path=obj.path, name=relative) def create(self, path, file_like, metadata=None) -> OInfo: # TODO: store the metadata self._api() file = self._get_by_path(path) if file is not None and file.exists: raise CloudFileExistsError("Cannot create, '%s' already exists" % file.path) self._verify_parent_folder_exists(path) if file is None or not file.exists: file = MockFSObject(path, MockFSObject.FILE, self.oid_is_path) self._store_object(file) file.contents = file_like.read() file.exists = True log.debug("created %s %s", file.oid, file.type) self._register_event(MockEvent.ACTION_CREATE, file) return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def download(self, oid, file_like): self._api() file = self._fs_by_oid.get(oid, None) if file is None or file.exists is False: raise CloudFileNotFoundError(oid) if file.type == MockFSObject.DIR: raise CloudFileExistsError("is a directory") file_like.write(file.contents) def rename(self, oid, path) -> str: log.debug("renaming %s -> %s", oid, path) self._api() # TODO: folders are implied by the path of the file... # actually check to make sure the folder exists and raise a FileNotFound if not object_to_rename = self._fs_by_oid.get(oid, None) if not (object_to_rename and object_to_rename.exists): raise CloudFileNotFoundError(oid) possible_conflict = self._get_by_path(path) if possible_conflict and possible_conflict.oid == oid: possible_conflict = None self._verify_parent_folder_exists(path) if possible_conflict and possible_conflict.exists: if possible_conflict.type != object_to_rename.type: log.debug("rename %s:%s conflicts with existing object of another type", oid, object_to_rename.path) raise CloudFileExistsError(path) if possible_conflict.type == MockFSObject.DIR: try: next(self.listdir(possible_conflict.oid)) raise CloudFileExistsError(path) except StopIteration: pass # Folder is empty, rename over it no problem else: raise CloudFileExistsError(path) log.debug("secretly deleting empty folder %s", path) self.delete(possible_conflict.oid) if object_to_rename.path == path: return oid prior_oid = None if self.oid_is_path: prior_oid = object_to_rename.oid if object_to_rename.type == MockFSObject.FILE: self._rename_single_object(object_to_rename, path, event=True) else: # object to rename is a directory old_path = object_to_rename.path for obj in set(self._fs_by_oid.values()): if self.is_subpath(old_path, obj.path, strict=True): new_obj_path = self.replace_path(obj.path, old_path, path) self._rename_single_object(obj, new_obj_path, event=False) # only parent generates event self._rename_single_object(object_to_rename, path, event=True) if self.oid_is_path: assert object_to_rename.oid != prior_oid, "rename %s to %s" % (prior_oid, path) else: assert object_to_rename.oid == oid, "rename %s to %s" % (object_to_rename.oid, oid) return object_to_rename.oid def _rename_single_object(self, source_object: MockFSObject, destination_path, *, event): destination_path = destination_path.rstrip("/") # This will assume all validation has already been done, and just rename the thing # without trying to rename contents of folders, just rename the object itself log.debug("renaming %s to %s", source_object.path, destination_path) prior_oid = source_object.oid if self.oid_is_path else None self._unstore_object(source_object) source_object.path = destination_path if self.oid_is_path: source_object.oid = destination_path self._store_object(source_object) self._register_event(MockEvent.ACTION_RENAME, source_object, prior_oid) log.debug("rename complete %s", source_object.path) self.log_debug_state() def mkdir(self, path) -> str: self._api() self._verify_parent_folder_exists(path) file = self._get_by_path(path) if file and file.exists: if file.type == MockFSObject.FILE: raise CloudFileExistsError(path) else: log.debug("Skipped creating already existing folder: %s", path) return file.oid new_fs_object = MockFSObject(path, MockFSObject.DIR, self.oid_is_path) self._store_object(new_fs_object) self._register_event(MockEvent.ACTION_CREATE, new_fs_object) return new_fs_object.oid def delete(self, oid): log.debug("delete %s", oid) self._api() file = self._fs_by_oid.get(oid, None) log.debug("got %s", file) if file and file.exists: if file.otype == OType.DIRECTORY: try: next(self.listdir(file.oid)) raise CloudFileExistsError("Cannot delete non-empty folder %s:%s" % (oid, file.path)) except StopIteration: pass # Folder is empty, delete it no problem else: path = file.path if file else "" log.debug("Deleting non-existent oid %s:%s ignored", oid, path) return file.exists = False self._register_event(MockEvent.ACTION_DELETE, file) def exists_oid(self, oid): self._api() file = self._fs_by_oid.get(oid, None) return file is not None and file.exists def exists_path(self, path) -> bool: self._api() file = self._get_by_path(path) return file is not None and file.exists def hash_oid(self, oid) -> Any: file = self._fs_by_oid.get(oid, None) if file and file.exists: return file.hash() else: return None @staticmethod def hash_data(file_like) -> bytes: return md5(file_like.read()).digest() def info_path(self, path: str) -> Optional[OInfo]: self._api() file: MockFSObject = self._get_by_path(path) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) def info_oid(self, oid, use_cache=True) -> Optional[OInfo]: self._api() file: MockFSObject = self._fs_by_oid.get(oid, None) if not (file and file.exists): return None return OInfo(otype=file.otype, oid=file.oid, hash=file.hash(), path=file.path) # @staticmethod # def _slurp(path): # with open(path, "rb") as x: # return x.read() # # @staticmethod # def _burp(path, contents): # with open(path, "wb") as x: # x.write(contents) def log_debug_state(self, msg=""): log.debug("%s: mock provider state %s", msg, list(self.walk("/"))) ################### def mock_provider_instance(oid_is_path, case_sensitive): prov = MockProvider(oid_is_path, case_sensitive) prov.event_timeout = 1 prov.event_sleep = 0.001 prov.creds = {} return prov @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider(request): return mock_provider_instance(*request.param) @pytest.fixture(params=[(False, True), (True, True)], ids=["mock_oid_cs", "mock_path_cs"]) def mock_provider_generator(request): return lambda oid_is_path=None, case_sensitive=None: \ mock_provider_instance( request.param[0] if oid_is_path is None else oid_is_path, request.param[1] if case_sensitive is None else case_sensitive) @pytest.fixture def mock_provider_creator(): return mock_provider_instance def test_mock_basic(): """ basic spot-check, more tests are in test_providers with mock as one of the providers """ from io import BytesIO m = MockProvider(False, True) info = m.create("/hi.txt", BytesIO(b'hello')) assert info.hash assert info.oid b = BytesIO() m.download(info.oid, b) assert b.getvalue() == b'hello' PKO)(cloudsync/tests/fixtures/mock_storage.pyfrom threading import Lock from typing import Dict, Any, Tuple, Optional import logging from cloudsync import Storage, LOCAL, REMOTE log = logging.getLogger(__name__) class MockStorage(Storage): # Does not actually persist the data... but it's just a mock top_lock = Lock() lock_dict = dict() def __init__(self, storage_dict: Dict[str, Dict[int, bytes]]): self.storage_dict = storage_dict self.cursor: int = 0 # the next eid def _get_internal_storage(self, tag: str) -> Tuple[Lock, Dict[int, bytes]]: with self.top_lock: lock: Lock = self.lock_dict.setdefault(tag, Lock()) return lock, self.storage_dict.setdefault(tag, dict()) def create(self, tag: str, serialization: bytes) -> Any: lock, storage = self._get_internal_storage(tag) with lock: current_index = self.cursor self.cursor += 1 storage[current_index] = serialization return current_index def update(self, tag: str, serialization: bytes, eid: Any): lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) storage[eid] = serialization return 1 def delete(self, tag: str, eid: Any): lock, storage = self._get_internal_storage(tag) log.debug("deleting eid%s", eid) with lock: if eid not in storage: log.debug("ignoring delete: id %s doesn't exist" % eid) return del storage[eid] def read_all(self, tag: str) -> Dict[Any, bytes]: lock, storage = self._get_internal_storage(tag) with lock: ret: Dict[Any, bytes] = storage.copy() return ret def read(self, tag: str, eid: Any) -> Optional[bytes]: lock, storage = self._get_internal_storage(tag) with lock: if eid not in storage: raise ValueError("id %s doesn't exist" % eid) return storage[eid] PKOoo cloudsync/tests/fixtures/util.pyimport pytest import os import tempfile import shutil from inspect import getframeinfo, stack import logging from cloudsync.provider import Provider log = logging.getLogger(__name__) log.setLevel(logging.INFO) class Util: def __init__(self): self.base = tempfile.mkdtemp(suffix=".cloudsync") log.debug("temp files will be in: %s", self.base) @staticmethod def get_context(level): caller = getframeinfo(stack()[level+1][0]) return caller def temp_file(self, *, fill_bytes=None): # pretty names for temps caller = self.get_context(1) fn = os.path.basename(caller.filename) if not fn: fn = "unk" else: fn = os.path.splitext(fn)[0] func = caller.function name = fn + '-' + func + "." + os.urandom(16).hex() fp = Provider.join(self.base, name) if fill_bytes is not None: with open(fp, "wb") as f: f.write(os.urandom(fill_bytes)) log.debug("temp file %s", fp) return fp def do_cleanup(self): shutil.rmtree(self.base) @pytest.fixture(scope="module") def util(request): # user can override at the module level or class level # if tehy want to look at the temp files made cleanup = getattr(getattr(request, "cls", None), "util_cleanup", True) if cleanup: cleanup = getattr(request.module, "util_cleanup", True) u = Util() yield u if cleanup: u.do_cleanup() def test_util(util): log.setLevel(logging.DEBUG) f = util.temp_file(fill_bytes=32) assert len(open(f, "rb").read()) == 32 PKO%cloudsync/tests/providers/__init__.pyPKO?~%%$cloudsync/tests/providers/dropbox.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.dropbox import DropboxProvider def dropbox_creds(): token_set = os.environ.get("DROPBOX_TOKEN") if not token_set: return None tokens = token_set.split(",") creds = { "key": tokens[random.randrange(0, len(tokens))], } return creds def dropbox_provider(): cls = DropboxProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = dropbox_creds() return cls() @pytest.fixture def cloudsync_provider(): return dropbox_provider() def test_connect(): creds = dropbox_creds() if not creds: pytest.skip('requires dropbox token and client secret') sync_root = "/" + os.urandom(16).hex() gd = DropboxProvider(sync_root) gd.connect(creds) assert gd.client quota = gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass PKOʮ#cloudsync/tests/providers/gdrive.pyimport os import random import pytest from cloudsync.exceptions import CloudFileNotFoundError from cloudsync.providers.gdrive import GDriveProvider # move this to provider ci_creds() function? def gdrive_creds(): token_set = os.environ.get("GDRIVE_TOKEN") cli_sec = os.environ.get("GDRIVE_CLI_SECRET") if not token_set or not cli_sec: return None tokens = token_set.split(",") creds = { "refresh_token": tokens[random.randrange(0, len(tokens))], "client_secret": cli_sec, "client_id": '433538542924-ehhkb8jn358qbreg865pejbdpjnm31c0.apps.googleusercontent.com', } return creds def gdrive_provider(): cls = GDriveProvider cls.event_timeout = 60 cls.event_sleep = 2 cls.creds = gdrive_creds() return cls() @pytest.fixture def cloudsync_provider(): gdrive_provider() def connect_test(want_oauth: bool): creds = gdrive_creds() if not creds: pytest.skip('requires gdrive token and client secret') if want_oauth: creds.pop("refresh_token", None) # triggers oauth to get a new refresh token sync_root = "/" + os.urandom(16).hex() gd = GDriveProvider() gd.connect(creds) assert gd.client gd.get_quota() try: info = gd.info_path(sync_root) if info and info.oid: gd.delete(info.oid) except CloudFileNotFoundError: pass def test_connect(): connect_test(False) @pytest.mark.manual def test_oauth_connect(): connect_test(True) PKPO2!cloudsync/tests/providers/mock.pyimport pytest from ..fixtures.mock_provider import MockProvider @pytest.fixture def cloudsync_provider(): cls = MockProvider cls.event_timeout = 20 cls.event_sleep = 2 cls.creds = {} return cls PK!H!J4P*cloudsync-0.1.9.dist-info/entry_points.txtN+I/N.,()J/M)Kr3%%H ~ Everyone is permitted to copy and distribute verbatim copies of this licensedocument, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, “this License” refers to version 3 of the GNU Lesser General Public License, and the “GNU GPL” refers to version 3 of the GNU General Public License. “The Library” refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An “Application” is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A “Combined Work” is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the “Linked Version”. The “Minimal Corresponding Source” for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The “Corresponding Application Code” for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License “or any later version” applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. PK!HPOcloudsync-0.1.9.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!HCn{"cloudsync-0.1.9.dist-info/METADATAUMo6W M  Ek4X"LqX%Ŗ̼y|/E)H~G$nh)j̡PԔ^?DOM] {!X+tdmH=*@ +6X˒߈~#zqym9Y܉Z(Udgm Yvmz `-ʡ}c@H.Y4MV%HI{>y>~g\]IpNn$R.B-ȄL!L(o@#jBY+Y@ ,Ww u-Boo>aj=Ʒs\>w&/T6s9k}m$)Ŀ9,\%ÏsSF-h*LLJ M?^ a])8U.\{|]5r*KZ- KO3.*d`#bC;;pߊ3; "\}5N!oϯ?6.h3<WbؾR3v5v -@jqz䢓Uj#AxxBa\E*yW2]c!>Y + sZ0d)xݿgzePK!H=i cloudsync-0.1.9.dist-info/RECORDɖy= f }" H_3,"n܉K'Q>am?h1~t+2!$ȿH/!xhV#+_t^8)@΋wqf?XCOIeG<.;}yҰ aںxÏ exϕgUU]-' WN[k0IWܑl oOL~S=(jyo#6N2%͸cګأl!΀-DI7m-M{s*Bۊ<گáhZՔ+3 VV^ 7Z2+&ӫ Ϝsw98Z lu0Ƞ~..-C(I:cܞb;'%ȟQ§B؆1$.`[FdHl9T~+Iܸ]NEAc!Ӈ^1nS-ʓ j,=iS16Mxu-isguXԢ3Ya:@6aQ*2zHSGIE")M,۴0#87ظvv:ܻJx\/tPp=S&3Y+5!⋖ _/5K.q(hOgrldgq/.x_}eGfo(f8; NkI$':|m,g|FPC/,iMº3irfFs|W+{FU%)ٖ"A~Sc`ӅρЖo^ i:? ~V цy!֋mh謺9v=~pzNe eLzhii9ڲ{pGA `9ٷGԿXT/.r#S/j+u\ί$,G5@bc2_ႛNl?ۣ˴b/R j4!$Fm+03lv)iIY5Jcck.7%o+OŮAa7^su:'f@y?6I|XRYc3li8k.^O~Ԕ=pڿ_ b[(H#{'|, 0j4)<@PN};vGT-/xvN.O%Yc wwnʮҜ68Pp 8ȝ]%JtwLQ ~-QA fAP[yrȚ~Z;Vj7;B |Rb}Sdr1ug*HK1a_(/Ȅc#6*Ö"dXEkgc,n2s+S,d0~(8 )?ؙJZmqXC:Z+PpЛV94^nAKɇ_2;g/f5S`vwZp\#}4sYSS5HTCs.sUo<\c|!.ùES](κ f"w0>Er&_/W>z]Xz?$6)?̏]ѤנkQ2.' 7kg+I] 19l/PK+{` 󱫤sy Nd ͦU o,zݒncloudsync/tests/conftest.pyPK.oOv&b Acloudsync/tests/pytest.iniPKPO^RCRCABcloudsync/tests/test_cs.pyPKPO).˅cloudsync/tests/test_events.pyPKO|Vcloudsync/tests/test_mux.pyPKPO~.dd cloudsync/tests/test_provider.pyPKOSjjAcloudsync/tests/test_sync.pyPKOMM$~cloudsync/tests/fixtures/__init__.pyPKPO >>) cloudsync/tests/fixtures/mock_provider.pyPKO)(pcloudsync/tests/fixtures/mock_storage.pyPKOoo cloudsync/tests/fixtures/util.pyPKO%ycloudsync/tests/providers/__init__.pyPKO?~%%$cloudsync/tests/providers/dropbox.pyPKOʮ##cloudsync/tests/providers/gdrive.pyPKPO2!Mcloudsync/tests/providers/mock.pyPK!H!J4P*ecloudsync-0.1.9.dist-info/entry_points.txtPKfO[0}WW!cloudsync-0.1.9.dist-info/LICENSEPK!HPOw&cloudsync-0.1.9.dist-info/WHEELPK!HCn{"'cloudsync-0.1.9.dist-info/METADATAPK!H=i *cloudsync-0.1.9.dist-info/RECORDPK** f2