PK! quare/__init__.py# -*- coding: utf-8 -*- """Top-level package for quare.""" __author__ = """James Estevez""" __email__ = "j@jstvz.org" __version__ = "0.1.0" PK!7(w quare/cli.py# -*- coding: utf-8 -*- """Console script for quare.""" import json import html import click import quare.utils as utils from .quare import get_user_info, get_user_token, set_user_token from .quip import QuipClient, QuipError from .quip_classes import QuipMessage, QuipThread from .quip_websocket import stream_updates from .ui_messages import AUTH_ERROR_MESSAGE @click.group("main", help="") @click.version_option() def main(): """Main quare CLI entry point. This runs as the first step in processing any CLI command. """ @click.command() @click.option("-a", "--alias", default="Default", show_default=True) @click.option("--json", "json_", is_flag=True) def whoami(alias, json_): """Print information about the logged in users.""" try: alias, token = get_user_token(alias) user, user_table = get_user_info(token, alias=alias) if json_: click.echo(json.dumps(user)) else: click.echo(user_table) except QuipError as ex: if ex.code == 401 and "Invalid access_token" in str(ex): raise click.ClickException(AUTH_ERROR_MESSAGE) raise # reraise @click.command() @click.option("-a", "--alias", default="Default") @click.option("-t", "--token", prompt=True) def auth(alias, token): """Store an API authentication token.""" set_user_token(alias, token.strip()) click.secho("Token stored!", fg="bright_green") main.add_command(whoami) main.add_command(auth) @main.group("doc") def document(): """Commands for interacting with documents.""" @document.command(name="append") @click.option( "-a", "--alias", default="Default", help="Quip `auth' alias", show_default=True ) @click.option("-c", "--content", help="Markdown to append to document.") @click.option( "-f", "--file", "fd", type=click.File(), help="Markdown to append to document. Will ignore --content if provided.", ) @click.option("-i", "--id", "doc_id", required=True) @click.option("--json", "json_", is_flag=True) def doc_append(alias, content, fd, doc_id, json_): """Append content to an existing Quip document.""" _, token = get_user_token(alias) content = fd.read() if fd else content if not content: raise click.UsageError('You must pass either "--content" or "--file".') client = QuipClient(token) client.edit_document(doc_id, f"\n{content}\n", format="markdown") @main.group("msg") def message(): """Commands for interacting with chats/comment threads.""" @message.command(name="get") @click.option("-r", "--room", required=True) @click.option("-s", "--since", type=str, default="1999") @click.option("-a", "--alias", default="Default", show_default=True) @click.option("--json", "json_", is_flag=True) @click.option( "-l", "--last", default=200, show_default=True, type=int, help="Last n messages" ) @click.option( "--decending/--ascending", "is_desc", default=False, help="Created date/time sort order", ) def msg_get(room, last, since, alias, json_, is_desc): """Retrieve messages from Quip.""" token, thread = get_thread(alias, room) start_usec = utils.get_usec(since) messages = thread.get_messages(token, last_n=last, start_usec=start_usec) if not is_desc: messages.reverse() print_msgs(messages, json_) @message.command(name="stream") @click.option("-a", "--alias", default="Default", show_default=True) def msg_stream(alias): """Stream messages via Quip's websocket. Close using CTRL-C""" _, token = get_user_token(alias) stream_updates(token) @message.command(name="send") @click.option("-a", "--alias", default="Default", show_default=True) @click.option( "-c", "--content", help="The body of the message to send. Use '-' for stdin" ) @click.option("-r", "--room", required=True) @click.option( "--frame", type=click.Choice(["bubble", "card", "line"]), help="Adjust the display of the message", ) @click.option("--silent", is_flag=True) @click.option( "-m", "--monospace", help="Format the message as monospace code.", is_flag=True ) @click.option("--file", "file_", type=click.File("r")) def msg_send(alias, content, room, frame, silent, monospace, file_): """Send messages to a quip chat/document.""" token, thread = get_thread(alias, room) content = click.get_text_stream("stdin").read() if content == "-" else content content = file_.read() if file_ else content parts = format_msg_content(content, monospace=monospace) msg = thread.send_message( token, content=content, frame=frame, silent=silent, parts=parts ) if msg: messages = thread.get_messages(token, last_n=5) messages.reverse() print_msgs(messages) def format_msg_content(content, monospace=False): if monospace: content = ( "
"
            + html.escape(content, quote=True).replace("\n", "
").replace(" ", " ") + "
" ) return json.dumps([["monospace", content]]) else: return None def get_thread(alias, room): """Send messages to a quip chat/document.""" _, token = get_user_token(alias) thread = QuipThread.get_thread(token, room) return token, thread def print_msgs(messages, json_=False): """Print and format messages.""" if json_: click.echo(json.dumps(messages)) return else: for message in messages: utils.print_quip_message(QuipMessage(**message)) PK! quare/config.py# -*- coding: utf-8 -*- """Configuration handling. Exports config class.""" import os from pathlib import Path import yaml from .exceptions import ConfigNotFoundError from .ui_messages import MISSING_CONF_ERROR_MESSAGE class quareConfig: """Parses a configuration file and stores/retrieves its values.""" DEFAULT_CONFIG_FILENAME = os.path.join(str(Path.home()), ".quare-conf.yaml") def __init__(self, config_path=None): self._find_config(config_path) self.raw_config = {} self._read_config() self._chats = [] self._docs = [] def _find_config(self, config_path): if config_path is not None and os.path.isfile(config_path): self.config_path = config_path else: self.config_path = self.DEFAULT_CONFIG_FILENAME def _read_config(self): if os.path.isfile(self.config_path): with open(self.config_path) as conf: self.raw_config = yaml.safe_load(conf.read()) self.defaults = self.raw_config.get("defaults", {}) else: raise ConfigNotFoundError(MISSING_CONF_ERROR_MESSAGE) @property def default_docs(self): if not self._docs: self._docs = self.defaults.get("documents", []) return self._docs @property def default_chats(self): if not self._chats: self._chats = self.defaults.get("chats", []) return self._chats PK!#N b--quare/exceptions.py"""Custom exceptions for quare and QuipClient""" class QuipTokenNotFoundError(Exception): """ Thrown if a Quip token is not in the keyring, and the QUIP_TOKEN environment variable is not set.""" class ConfigNotFoundError(Exception): """Thrown if a quare config file does not exist.""" PK!0`i i quare/quare.py# -*- coding: utf-8 -*- """Contains business logic for interactions with the Quip API.""" import os import keyring import terminaltables from quare import quip, ui_messages from quare.exceptions import QuipTokenNotFoundError from quare.quip_classes import QuipFolder SVC_NAME = "quare" TOKEN_ENV_VAR = "QUIP_TOKEN" # nosec def get_user_token(alias="Default"): """Retreive an api token from the keyring or environment variable. Keyword args: alias -- the alias under which the token is stored (default "Default") """ token = keyring.get_password(SVC_NAME, alias) if type(token) is str: return alias, token elif TOKEN_ENV_VAR in os.environ and os.environ[TOKEN_ENV_VAR]: return alias, os.environ[TOKEN_ENV_VAR] else: raise QuipTokenNotFoundError(ui_messages.TOKEN_ERROR_MESSAGE) def set_user_token(alias, token): """Store an API token and associated metadata.""" keyring.set_password(SVC_NAME, alias, token) def get_user_info(token, alias="User"): """Call the Quip API and get information about the authenticated user. Keyword args: alias -- the alias under which the token is stored (default "Default") """ client = quip.QuipClient(token) user = client.get_authenticated_user() table = terminaltables.SingleTable(_format_user_matrix(user), alias) table.inner_row_border = True return user, table.table def send_messages(token, alias, room): """Call the Quip API and send a message.""" client = quip.QuipClient(token) messages = client.send_messages(room) return messages def _format_user_matrix(user): """Given a Quip user dict, return a matrix that terminal tables will accept.""" return [ ["Name", user["name"]], ["Email(s)", ", ".join(user["emails"]) if "emails" in user else ""], ["Quip User ID", user["id"]], ] def _get_user_favorites(user, token): """Given a user dict, recursively retrieve the user's starred items.""" starred_id = user["starred_folder_id"] client = quip.QuipClient(token) folder = client.get_folder(starred_id) print(folder) folder = QuipFolder(**folder) return _get_children(folder, token) def _get_children(folder, token): """Get all child threads for this folder.""" client = quip.QuipClient(token) threads = client.get_threads(folder.child_threads) return threads PK!| quare/quip.py# Copyright 2014 Quip # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """A Quip API client library. For full API documentation, visit https://quip.com/api/. Typical usage: client = quip.QuipClient(access_token=...) user = client.get_authenticated_user() starred = client.get_folder(user["starred_folder_id"]) print "There are", len(starred["children"]), "items in your starred folder" In addition to standard getters and setters, we provide a few convenience methods for document editing. For example, you can use `add_to_first_list` to append items (in Markdown) to the first bulleted or checklist in a given document, which is useful for automating a task list. """ import datetime import json import ssl import sys import urllib.error import urllib.parse import urllib.request import xml.etree.cElementTree from importlib import reload Request = urllib.request.Request urlencode = urllib.parse.urlencode urlopen = urllib.request.urlopen HTTPError = urllib.error.HTTPError iteritems = dict.items try: reload(sys) sys.setdefaultencoding("utf8") except Exception: # Can't change default encoding usually... pass try: ssl.PROTOCOL_TLSv1_1 except AttributeError: raise Exception( "Using the Quip API requires an SSL library that supports TLS versions " ">= 1.1; your Python + OpenSSL installation must be upgraded." ) # After 2017-02, the Quip API HTTPS endpoint requires TLS version 1.1 or later; # TLS version 1.0 is disabled due to extensive security vulnerabilities. # # - macOS: At this time of this writing, macOS ships with Python 2.7 and # OpenSSL, but the version of OpenSSL is outdated and only supports TLS 1.0. # (This is even true of the most recent version of macOS (Sierra) with all # security patches installed; see # https://eclecticlight.co/2016/03/23/the-tls-mess-in-os-x-el-capitan/ .) # # To use this module on a macOS system, you will need to install your own # copy of Python and OpenSSL. Simple suggestions: # # 1) Install Homebrew from http://brew.sh; run "brew install openssl python" # 2) Install Miniconda from https://conda.io/miniconda.html # # - Google App Engine (GAE): Per App Engine's documentation, you must request # version 2.7.11 of the "ssl" library in your app.yaml file. See: # https://cloud.google.com/appengine/docs/python/sockets/ssl_support class QuipError(Exception): def __init__(self, code, message, http_error): Exception.__init__(self, "%d: %s" % (code, message)) self.code = code self.http_error = http_error class QuipClient(object): """A Quip API client""" # Edit operations APPEND, PREPEND, AFTER_SECTION, BEFORE_SECTION, REPLACE_SECTION, DELETE_SECTION = range( 6 ) # Folder colors MANILA, RED, ORANGE, GREEN, BLUE = range(5) def __init__( self, access_token=None, client_id=None, client_secret=None, base_url=None, request_timeout=None, ): """Constructs a Quip API client. If `access_token` is given, all of the API methods in the client will work to read and modify Quip documents. Otherwise, only `get_authorization_url` and `get_access_token` work, and we assume the client is for a server using the Quip API's OAuth endpoint. """ self.access_token = access_token self.client_id = client_id self.client_secret = client_secret self.base_url = base_url if base_url else "https://platform.quip.com" self.request_timeout = request_timeout if request_timeout else 10 def get_authorization_url(self, redirect_uri, state=None): """Returns the URL the user should be redirected to to sign in.""" return self._url( "oauth/login", redirect_uri=redirect_uri, state=state, response_type="code", client_id=self.client_id, ) def get_access_token( self, redirect_uri, code, grant_type="authorization_code", refresh_token=None ): """Exchanges a verification code for an access_token. Once the user is redirected back to your server from the URL returned by `get_authorization_url`, you can exchange the `code` argument with this method. """ return self._fetch_json( "oauth/access_token", redirect_uri=redirect_uri, code=code, grant_type=grant_type, refresh_token=refresh_token, client_id=self.client_id, client_secret=self.client_secret, ) def get_authenticated_user(self): """Returns the user corresponding to our access token.""" return self._fetch_json("users/current") def get_user(self, id): """Returns the user with the given ID.""" return self._fetch_json("users/" + id) def get_users(self, ids): """Returns a dictionary of users for the given IDs.""" return self._fetch_json("users/", post_data={"ids": ",".join(ids)}) def update_user(self, user_id, picture_url=None): return self._fetch_json( "users/update", post_data={"user_id": user_id, "picture_url": picture_url} ) def get_contacts(self): """Returns a list of the users in the authenticated user's contacts.""" return self._fetch_json("users/contacts") def get_folder(self, id): """Returns the folder with the given ID.""" return self._fetch_json("folders/" + id) def get_folders(self, ids): """Returns a dictionary of folders for the given IDs.""" return self._fetch_json("folders/", post_data={"ids": ",".join(ids)}) def new_folder(self, title, parent_id=None, color=None, member_ids=[]): return self._fetch_json( "folders/new", post_data={ "title": title, "parent_id": parent_id, "color": color, "member_ids": ",".join(member_ids), }, ) def update_folder(self, folder_id, color=None, title=None): return self._fetch_json( "folders/update", post_data={"folder_id": folder_id, "color": color, "title": title}, ) def add_folder_members(self, folder_id, member_ids): """Adds the given users to the given folder.""" return self._fetch_json( "folders/add-members", post_data={"folder_id": folder_id, "member_ids": ",".join(member_ids)}, ) def remove_folder_members(self, folder_id, member_ids): """Removes the given users from the given folder.""" return self._fetch_json( "folders/remove-members", post_data={"folder_id": folder_id, "member_ids": ",".join(member_ids)}, ) def get_teams(self): """Returns the teams for the user corresponding to our access token.""" return self._fetch_json("teams/current") def get_messages(self, thread_id, max_created_usec=None, count=None): """Returns the most recent messages for the given thread. To page through the messages, use max_created_usec, which is the sort order for the returned messages. count should be an integer indicating the number of messages you want returned. The maximum is 100. """ return self._fetch_json( "messages/" + thread_id, max_created_usec=max_created_usec, count=count ) def new_message(self, thread_id, content=None, **kwargs): """Sends a message on the given thread. `content` is plain text, not HTML. """ args = {"thread_id": thread_id, "content": content} args.update(kwargs) return self._fetch_json("messages/new", post_data=args) def get_thread(self, id): """Returns the thread with the given ID.""" return self._fetch_json("threads/" + id) def get_threads(self, ids): """Returns a dictionary of threads for the given IDs.""" return self._fetch_json("threads/", post_data={"ids": ",".join(ids)}) def get_recent_threads(self, max_updated_usec=None, count=None, **kwargs): """Returns the recently updated threads for a given user.""" return self._fetch_json( "threads/recent", max_updated_usec=max_updated_usec, count=count, **kwargs ) def get_matching_threads( self, query, count=None, only_match_titles=False, **kwargs ): """Returns the recently updated threads for a given user.""" return self._fetch_json( "threads/search", query=query, count=count, only_match_titles=False, **kwargs ) def add_thread_members(self, thread_id, member_ids): """Adds the given folder or user IDs to the given thread.""" return self._fetch_json( "threads/add-members", post_data={"thread_id": thread_id, "member_ids": ",".join(member_ids)}, ) def delete_thread(self, thread_id): """Deletes the thread with the given thread id or secret""" return self._fetch_json("threads/delete", post_data={"thread_id": thread_id}) def remove_thread_members(self, thread_id, member_ids): """Removes the given folder or user IDs from the given thread.""" return self._fetch_json( "threads/remove-members", post_data={"thread_id": thread_id, "member_ids": ",".join(member_ids)}, ) def pin_to_desktop(self, thread_id, **kwargs): """Pins the given thread to desktop.""" args = {"thread_id": thread_id} args.update(kwargs) return self._fetch_json("threads/pin-to-desktop", post_data=args) def move_thread(self, thread_id, source_folder_id, destination_folder_id): """Moves the given thread from the source folder to the destination one. """ self.add_thread_members(thread_id, [destination_folder_id]) self.remove_thread_members(thread_id, [source_folder_id]) def new_chat(self, message, title=None, member_ids=[]): """Creates a chat with the given title and members, and send the initial message.""" return self._fetch_json( "threads/new-chat", post_data={ "message": message, "title": title, "member_ids": ",".join(member_ids), }, ) def new_document(self, content, format="html", title=None, member_ids=[]): """Creates a new document from the given content. To create a document in a folder, include the folder ID in the list of member_ids, e.g., client = quip.QuipClient(...) user = client.get_authenticated_user() client.new_document(..., member_ids=[user["archive_folder_id"]]) """ return self._fetch_json( "threads/new-document", post_data={ "content": content, "format": format, "title": title, "member_ids": ",".join(member_ids), }, ) def copy_document( self, thread_id, folder_ids=None, member_ids=None, title=None, values=None, **kwargs ): """Copies the given document, optionally replaces template variables in the document with values in 'values' arg. The values argument must be a dictionary that contains string keys and values that are either strings, numbers or dictionaries. """ args = {"thread_id": thread_id} if folder_ids: args["folder_ids"] = ",".join(folder_ids) if member_ids: args["member_ids"] = ",".join(member_ids) if title: args["title"] = title if values: args["values"] = json.dumps(values) args.update(kwargs) return self._fetch_json("threads/copy-document", post_data=args) def merge_comments(self, original_id, children_ids, ignore_user_ids=[]): """Given an original document and a set of exact duplicates, copies all comments and messages on the duplicates to the original. Impersonates the commentors if the access token used has permission, but does not add them to the thread. """ import re threads = self.get_threads(children_ids + [original_id]) original_section_ids = re.findall( r" id='([a-zA-Z0-9]{11})'", threads[original_id]["html"] ) for thread_id in children_ids: thread = threads[thread_id] child_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'", thread["html"]) parent_map = dict(zip(child_section_ids, original_section_ids)) messages = self.get_messages(thread_id) for message in reversed(messages): if message["author_id"] in ignore_user_ids: continue kwargs = { "user_id": message["author_id"], "frame": "bubble", "service_id": message["id"], } if "parts" in message: kwargs["parts"] = json.dumps(message["parts"]) else: kwargs["content"] = message["text"] if "annotation" in message: section_id = None if "highlight_section_ids" in message["annotation"]: section_id = message["annotation"]["highlight_section_ids"][0] else: anno_loc = thread["html"].find( '= 0 and loc >= 0: section_id = thread["html"][loc + 4 : loc + 15] if section_id and section_id in parent_map: kwargs["section_id"] = parent_map[section_id] if "files" in message: attachments = [] for blob_info in message["files"]: blob = self.get_blob(thread_id, blob_info["hash"]) new_blob = self.put_blob( original_id, blob, name=blob_info["name"] ) attachments.append(new_blob["id"]) if attachments: kwargs["attachments"] = ",".join(attachments) self.new_message(original_id, **kwargs) def edit_document( self, thread_id, content, operation=APPEND, format="html", section_id=None, **kwargs ): """Edits the given document, adding the given content. `operation` should be one of the constants described above. If `operation` is relative to another section of the document, you must also specify the `section_id`. """ # Since our cell ids in 10x contain ';', which is a valid cgi # parameter separator, we are replacing them with '_' in 10x cell # sections. This should be no op for all other sections. section_id = None if not section_id else section_id.replace(";", "_") args = { "thread_id": thread_id, "content": content, "location": operation, "format": format, "section_id": section_id, } args.update(kwargs) return self._fetch_json("threads/edit-document", post_data=args) def add_to_first_list(self, thread_id, *items, **kwargs): """Adds the given items to the first list in the given document. client = quip.QuipClient(...) client.add_to_first_list(thread_id, "Try the Quip API") """ items = [item.replace("\n", " ") for item in items] args = { "thread_id": thread_id, "content": "\n\n".join(items), "format": "markdown", "operation": self.AFTER_SECTION, } args.update(kwargs) if "section_id" not in args: first_list = self.get_first_list( thread_id, kwargs.pop("document_html", None) ) if first_list: args["section_id"] = self.get_last_list_item_id(first_list) if not args.get("section_id"): args["operation"] = self.APPEND args["content"] = "\n\n".join([" * %s" % i for i in items]) return self.edit_document(**args) def add_to_spreadsheet(self, thread_id, *rows, **kwargs): """Adds the given rows to the named (or first) spreadsheet in the given document. client = quip.QuipClient(...) client.add_to_spreadsheet(thread_id, ["5/1/2014", 2.24]) """ content = "".join( [ "%s" % "".join(["%s" % cell for cell in row]) for row in rows ] ) if kwargs.get("name"): spreadsheet = self.get_named_spreadsheet(kwargs["name"], thread_id) else: spreadsheet = self.get_first_spreadsheet(thread_id) if kwargs.get("add_to_top"): section_id = self.get_first_row_item_id(spreadsheet) operation = self.BEFORE_SECTION else: section_id = self.get_last_row_item_id(spreadsheet) operation = self.AFTER_SECTION return self.edit_document( thread_id=thread_id, content=content, section_id=section_id, operation=operation, ) def update_spreadsheet_row(self, thread_id, header, value, updates, **args): """Finds the row where the given header column is the given value, and applies the given updates. Updates is a dict from header to new value. In both cases headers can either be a string that matches, or "A", "B", "C", 1, 2, 3 etc. If no row is found, adds a new one. client = quip.QuipClient(...) client.update_spreadsheet_row( thread_id, "customer", "Acme", {"Billed": "6/24/2015"}) """ response = None if args.get("name"): spreadsheet = self.get_named_spreadsheet(args["name"], thread_id) else: spreadsheet = self.get_first_spreadsheet(thread_id) headers = self.get_spreadsheet_header_items(spreadsheet) row = self.find_row_from_header(spreadsheet, header, value) if row: ids = self.get_row_ids(row) for head, val in iteritems(updates): index = self.get_index_of_header(headers, head) if not index or index >= len(ids) or not ids[index]: continue response = self.edit_document( thread_id=thread_id, content=val, format="markdown", section_id=ids[index], operation=self.REPLACE_SECTION, **args ) else: updates[header] = value response = self.add_spreadsheet_row( thread_id, spreadsheet, updates, headers=headers, **args ) return response def add_spreadsheet_row( self, thread_id, spreadsheet, updates, headers=None, **args ): if not headers: headers = self.get_spreadsheet_header_items(spreadsheet) indexed_items = {} extra_items = [] for head, val in iteritems(updates): index = self.get_index_of_header(headers, head, default=None) if index is None or index in indexed_items: extra_items.append(val) else: indexed_items[index] = val cells = [] if indexed_items: for i in range(max(indexed_items.keys()) + 1): if i in indexed_items: cells.append(indexed_items[i]) elif len(extra_items): cells.append(extra_items.pop(0)) else: cells.append("") cells.extend(extra_items) content = "%s" % "".join(["%s" % cell for cell in cells]) section_id = self.get_last_row_item_id(spreadsheet) response = self.edit_document( thread_id=thread_id, content=content, section_id=section_id, operation=self.AFTER_SECTION, **args ) return response def toggle_checkmark(self, thread_id, item, checked=True): """Sets the checked state of the given list item to the given state. client = quip.QuipClient(...) list = client.get_first_list(thread_id) client.toggle_checkmark(thread_id, list[0]) """ if checked: item.attrib["class"] = "checked" else: item.attrib["class"] = "" return self.edit_document( thread_id=thread_id, content=xml.etree.cElementTree.tostring(item), section_id=item.attrib["id"], operation=self.REPLACE_SECTION, ) def get_first_list(self, thread_id=None, document_html=None): """Returns the `ElementTree` of the first list in the document. The list can be any type (bulleted, numbered, or checklist). If `thread_id` is given, we download the document. If you have already downloaded the document, you can specify `document_html` directly. """ return self._get_container(thread_id, document_html, "ul", 0) def get_last_list(self, thread_id=None, document_html=None): """Like `get_first_list`, but the last list in the document.""" return self._get_container(thread_id, document_html, "ul", -1) def get_section(self, section_id, thread_id=None, document_html=None): if not document_html: document_html = self.get_thread(thread_id).get("html") if not document_html: return None tree = self.parse_document_html(document_html) element = list(tree.iterfind(".//*[@id='%s']" % section_id)) if not element: return None return element[0] def get_named_spreadsheet(self, name, thread_id=None, document_html=None): if not document_html: document_html = self.get_thread(thread_id).get("html") if not document_html: return None tree = self.parse_document_html(document_html) element = list(tree.iterfind(".//*[@title='%s']" % name)) if not element: return None return element[0] def _get_container(self, thread_id, document_html, container, index): if not document_html: document_html = self.get_thread(thread_id).get("html") if not document_html: return None tree = self.parse_document_html(document_html) lists = list(tree.iter(container)) if not lists: return None try: return lists[index] except IndexError: return None def get_last_list_item_id(self, list_tree): """Returns the last item in the given list `ElementTree`.""" items = list(list_tree.iter("li")) return items[-1].attrib["id"] if items else None def get_first_list_item_id(self, list_tree): """Like `get_last_list_item_id`, but the first item in the list.""" for item in list_tree.iter("li"): return item.attrib["id"] return None def get_first_spreadsheet(self, thread_id=None, document_html=None): """Returns the `ElementTree` of the first spreadsheet in the document. If `thread_id` is given, we download the document. If you have already downloaded the document, you can specify `document_html` directly. """ return self._get_container(thread_id, document_html, "table", 0) def get_last_spreadsheet(self, thread_id=None, document_html=None): """Like `get_first_spreadsheet`, but the last spreadsheet.""" return self._get_container(thread_id, document_html, "table", -1) def get_last_row_item_id(self, spreadsheet_tree): """Returns the last row in the given spreadsheet `ElementTree`.""" items = list(spreadsheet_tree.iter("tr")) return items[-1].attrib["id"] if items else None def get_first_row_item_id(self, spreadsheet_tree): """Returns the last row in the given spreadsheet `ElementTree`.""" items = list(spreadsheet_tree.iter("tr")) return items[1].attrib["id"] if items else None def get_row_items(self, row_tree): """Returns the text of items in the given row `ElementTree`.""" return [(list(x.itertext()) or [None])[0] for x in row_tree] def get_row_ids(self, row_tree): """Returns the ids of items in the given row `ElementTree`.""" return [x.attrib["id"] for x in row_tree] def get_spreadsheet_header_items(self, spreadsheet_tree): """Returns the header row in the given spreadsheet `ElementTree`.""" return self.get_row_items(list(spreadsheet_tree.iterfind(".//tr"))[0]) def get_index_of_header(self, header_items, header, default=0): """Find the index of the given header in the items""" if header: header = str(header) lower_headers = [str(h).lower() for h in header_items] if header in header_items: return header_items.index(header) elif header.lower() in lower_headers: return lower_headers.index(header.lower()) elif header.isdigit(): return int(header) elif len(header) == 1: char = ord(header.upper()) if ord("A") < char < ord("Z"): return char - ord("A") + 1 else: pass return default def find_row_from_header(self, spreadsheet_tree, header, value): """Find the row in the given spreadsheet `ElementTree` where header is value. """ headers = self.get_spreadsheet_header_items(spreadsheet_tree) index = self.get_index_of_header(headers, header) for row in spreadsheet_tree.iterfind(".//tr"): if len(row) <= index: continue cell = row[index] if cell.tag != "td": continue if list(cell.itertext())[0].lower() == value.lower(): return row def parse_spreadsheet_contents(self, spreadsheet_tree): """Returns a python-friendly representation of the given spreadsheet `ElementTree` """ import collections spreadsheet = { "id": spreadsheet_tree.attrib.get("id"), "headers": self.get_spreadsheet_header_items(spreadsheet_tree), "rows": [], } for row in spreadsheet_tree.iterfind(".//tr"): value = {"id": row.attrib.get("id"), "cells": collections.OrderedDict()} for i, cell in enumerate(row): if cell.tag != "td": continue data = {"id": cell.attrib.get("id")} images = list(cell.iter("img")) if images: data["content"] = images[0].attrib.get("src") else: data["content"] = list(cell.itertext())[0].replace(u"\u200b", "") style = cell.attrib.get("style") if style and "background-color:#" in style: sharp = style.find("#") data["color"] = style[sharp + 1 : sharp + 7] value["cells"][spreadsheet["headers"][i]] = data if len(value["cells"]): spreadsheet["rows"].append(value) return spreadsheet def parse_document_html(self, document_html): """Returns an `ElementTree` for the given Quip document HTML""" document_xml = "" + document_html + "" return xml.etree.cElementTree.fromstring(document_xml.encode("utf-8")) def parse_micros(self, usec): """Returns a `datetime` for the given microsecond string""" return datetime.datetime.utcfromtimestamp(usec / 1000000.0) def get_blob(self, thread_id, blob_id): """Returns a file-like object with the contents of the given blob from the given thread. The object is described in detail here: https://docs.python.org/2/library/urllib2.html#urllib2.urlopen """ request = Request(url=self._url("blob/%s/%s" % (thread_id, blob_id))) if self.access_token: request.add_header("Authorization", "Bearer " + self.access_token) try: return urlopen(request, timeout=self.request_timeout) except HTTPError as error: try: # Extract the developer-friendly error message from the response message = json.loads(error.read().decode())["error_description"] except Exception: raise error raise QuipError(error.code, message, error) def put_blob(self, thread_id, blob, name=None): """Uploads an image or other blob to the given Quip thread. Returns an ID that can be used to add the image to the document of the thread. blob can be any file-like object. Requires the 'requests' module. """ import requests url = "blob/" + thread_id headers = None if self.access_token: headers = {"Authorization": "Bearer " + self.access_token} if name: blob = (name, blob) try: response = requests.request( "post", self._url(url), timeout=self.request_timeout, files={"blob": blob}, headers=headers, ) response.raise_for_status() return response.json() except requests.RequestException as error: try: # Extract the developer-friendly error message from the response message = error.response.json()["error_description"] except Exception: raise error raise QuipError(error.response.status_code, message, error) def new_websocket(self, **kwargs): """Gets a websocket URL to connect to. """ return self._fetch_json("websockets/new", **kwargs) def _fetch_json(self, path, post_data=None, **args): request = Request(url=self._url(path, **args)) if post_data: post_data = dict( (k, v) for k, v in post_data.items() if v or isinstance(v, int) ) request_data = urlencode(self._clean(**post_data)) request.data = request_data.encode() if self.access_token: request.add_header("Authorization", "Bearer " + self.access_token) try: return json.loads( urlopen(request, timeout=self.request_timeout).read().decode() ) except HTTPError as error: try: # Extract the developer-friendly error message from the response message = json.loads(error.read().decode())["error_description"] except Exception: raise error raise QuipError(error.code, message, error) def _clean(self, **args): return dict( (k, str(v) if isinstance(v, int) else v.encode("utf-8")) for k, v in args.items() if v or isinstance(v, int) ) def _url(self, path, **args): url = self.base_url + "/1/" + path args = self._clean(**args) if args: url += "?" + urlencode(args) return url PK!L  quare/quip_classes.py# -*- coding: utf-8 -*- """Quip objects. Contains filtering logic, but no API calls.""" import time import click from dataclasses import dataclass from typing import Any, Dict, List import quare.quip as quip @dataclass class Meta: """Quip stores metadata in a `meta` property. This is the base class for common properties.""" id: str created_usec: int updated_usec: int @dataclass class FolderMeta(Meta): title: str color: str = "manila" parent_id: str = None creator_id: str = None @dataclass class ThreadMeta(Meta): link: str title: str author_id: str thread_class: Any = None sharing: str = None type: Any = None document_id: str = None @dataclass class QuipFolder: member_ids: List[str] children: List[Dict[str, str]] folder: Dict def __post_init__(self): """Unpack the folder dictionary into the meta field.""" self.meta = FolderMeta(**self.folder) @property def child_threads(self): return self._filter_children("thread_id") @property def child_folders(self): return self._filter_children("folder_id") def _filter_children(self, key): return [child[key] for child in self.children if key in child] @dataclass class QuipMessage: """Represents a quip message.""" id: str created_usec: int updated_usec: int text: str author_name: str author_id: str visible: str annotation: Dict = None parts: List[List[str]] = None like_user_ids: List[str] = None mention_user_ids: List[str] = None files: List[str] = None @property def created(self): return self._parse_time(self.created_usec) @property def updated(self): return self._parse_time(self.updated_usec) def _parse_time(self, usecs): return time.asctime(time.localtime(usecs / 1e6)) def __hash__(self): """ On the remote chance that we have two messages with the same created_usec, sum the characters in that message's ID and add it to the created_usec. Almost certainly a case of YAGNI. """ return self.created_usec + sum([ord(c) for c in self.id]) def print(self, style=True, thread=None): thread = f"| ({thread}) " if thread else "" msg_header = f"[{self.created} {thread}| @{self.author_name}]" msg_header = click.style(msg_header, bold=True) if style else msg_header click.echo(f"{msg_header} {self.text}") @dataclass class QuipThread: """A Quip Thread. Either a chat or document. Args: user_ids (List[str]): User IDs with access to this thread. shared_folder_ids (List[str]): List of QuipFolder IDs containing this thread. expanded_user_ids (List[str]): User IDs with access to this thread. invited_user_emails (List[str]): Emails of users with access to this thread. html (str): The contents as HTML, if this thread is a document. thread (Dict): Dictionary containing metadata meta (ThreadMeta): ThreadMeta instance containing metadata token (str): API access token. """ user_ids: List[str] thread: Dict shared_folder_ids: List[str] = None expanded_user_ids: List[str] = None invited_user_emails: List[str] = None html: str = None def __post_init__(self): """Unpack the folder dictionary into the meta field.""" self.meta = ThreadMeta(**self.thread) self.id = self.meta.id # Instance variables self.token = None @staticmethod def get_thread(token, room): """Instantiate a thread by calling the QuipAPI.""" client = quip.QuipClient(token) thread_json = client.get_thread(room) return QuipThread(**thread_json) def get_messages( self, token=None, start_usec=1000, last_n=None, return_raw_json=False ): """Returns the most recent messages for this thread. Args: start_usec: Return all messages since this timestamp. last_n: An integer indicating the number of messages you want returned. This is distinct from the `count` param accepted by the Quip API, in that it is the total number of messages to return, not the total to return for a given request. Returns: A list of message dicts, sorted by their created date in ascending order """ self.messages = [] client = quip.QuipClient(token) COUNT = last_count = 200 # For the first iteration of the while loop last_usec = None while last_count == COUNT: # I think the non list comprehension version is equally unreadable message_dicts = [ msg for msg in client.get_messages( self.meta.id, max_created_usec=last_usec, count=last_n ) if msg["created_usec"] > start_usec ] last_count = len(message_dicts) self.messages.extend(message_dicts) if last_count == last_n: break return self.messages def send_message( self, token=None, frame=None, content=None, silent=False, return_raw_json=False, parts=None, ) -> QuipMessage: """Send a message to this thread. Args: frame: Controls how Quip displays the message. body: An integer indicating the number of messages you want returned. This is distinct from the `count` param accepted by the Quip API, in that it is the total number of messages to return, not the total to return for a given request. Returns: A QuipMessage instance """ client = quip.QuipClient(token) # Using the vendored library, note that it passes keywords to the api via **kwargs msg_dict = client.new_message( self.id, content=content, silent=silent, frame=frame, parts=parts ) return QuipMessage(**msg_dict) PK!quare/quip_websocket.py# -*- coding: utf-8 -*- """Opens a websocket and returns messages""" import json import time import click import websocket from quare.quip import QuipClient from quare.quip_classes import QuipMessage HEARTBEAT_INTERVAL = 20 def stream_updates(token): def on_message(ws, message): message = json.loads(message) if message.get("type") == "message": thread = message["thread"]["title"] try: msg = QuipMessage(**message.get("message")) msg.print(msg, thread=thread) except Exception as e: print(e) ws.close() def on_error(ws, error): pass def on_close(ws): click.secho("\n") click.secho("...connection closed.", bold=True) def on_open(ws): click.secho("Streaming updates... press Ctrl-C to exit.", bold=True) def run(*args): while True: time.sleep(HEARTBEAT_INTERVAL) ws.send(json.dumps({"type": "heartbeat"})) client = QuipClient(token) url = client.new_websocket().get("url") # websocket.enableTrace(True) ws = websocket.WebSocketApp( url, on_message=on_message, on_error=on_error, on_close=on_close ) ws.on_open = on_open ws.run_forever() PK!nquare/ui_messages.py# -*- coding: utf-8 -*- """Console messages to the user for quare.""" import click _ERROR_COLOR = "bright_red" _ERROR_MSG_COLOR = "magenta" def _error(msg): """Applies default error header style.""" return click.style(msg, fg=_ERROR_COLOR, bold=True) # console messages printed via ClickException or Quip* AUTH_ERROR_MESSAGE = f""" {_error('Quip says this is an invalid access token.')} To generate a new one to add via `quare auth', visit: https://quip.com/dev/token """ TOKEN_ERROR_MESSAGE = f""" {_error('No default token is set.')} Use `quare auth' or set a QUIP_TOKEN environment variable containing your authentication token. """ MISSING_CONF_ERROR_MESSAGE = f"""{_error('Config is not a file.')} Check your path.""" PK!Zquare/utils.pyimport click import dateparser def print_quip_message(msg, thread=None): thread = f"| ({thread}) " if thread else "" msg_header = f"[{msg.created} {thread}| @{msg.author_name}]" click.echo(f"""{click.style(msg_header, bold=True)} {msg.text}""") def get_usec(value): if value is None: return dt = dateparser.parse(value, settings={"RETURN_AS_TIMEZONE_AWARE": True}) if dt is None: raise click.BadParameter("Could not parse datetime") usecs = int(dt.timestamp() * 1e6) return usecs PK!H3&(&quare-0.1.0.dist-info/entry_points.txtN+I/N.,()*,M,Jz9Vy\\PK!Square-0.1.0.dist-info/LICENSE GNU LESSER GENERAL PUBLIC LICENSE Version 3, 29 June 2007 Copyright (C) 2007 Free Software Foundation, Inc. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, "this License" refers to version 3 of the GNU Lesser General Public License, and the "GNU GPL" refers to version 3 of the GNU General Public License. "The Library" refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An "Application" is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A "Combined Work" is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the "Linked Version". The "Minimal Corresponding Source" for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The "Corresponding Application Code" for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License "or any later version" applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. PK!Hu)GTUquare-0.1.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/R(O-)$qzd&Y)r$UV&UrPK!H(quare-0.1.0.dist-info/METADATA_o0)#UIvthŘ2wnCE?C)ԧ{ -rPU X-y1<7\޺)K*f2)4hDͼ/D9 &raf2Uv^GaeKiG,-+1`,cɅaaJ 2{wprEg#=HUr)2j.x_T-nal)_D.=LZmXf#EF"PCtAMj䖐AҪ_mLx7#PRǥZ嚗{*{˳n cc= n,NnXp')H;(`W,Nv}QƚkWs{/Y1!7^8'8DqT'Dm`.WtЊ6g}ɚh$6FԷ7pe\tX˧i"aDDWЮJW{c}#}Wj;CbC#8j(Xì]M=y)ie!N/|%  E% S 70*K( 6{#T3 ^wd% e/H) FфJQgpM ̣CXkG}:S'*c bYLS^Q*ƊbsT,%˓;pc*TXkQS] ̹9_[j<[=z=(a[0ӔT Fs,Vb$-9w\åE`z)1my16#خFPzeM5]fVt/J1OpbCnM'îxC;4mЂz_:E=ɪVϻݵ˭ҷx17bۤٛOm8vB[Z2(],x|{g0ex#mk>kyКM\AuדMkZ /DSDYĿڥGf%=zg5bTjTKFm_6&TGþPK! quare/__init__.pyPK!7(w quare/cli.pyPK! square/config.pyPK!#N b--Lquare/exceptions.pyPK!0`i i quare/quare.pyPK!| ?'quare/quip.pyPK!L  quare/quip_classes.pyPK!ƿquare/quip_websocket.pyPK!nquare/ui_messages.pyPK!Z%quare/utils.pyPK!H3&(&equare-0.1.0.dist-info/entry_points.txtPK!Square-0.1.0.dist-info/LICENSEPK!Hu)GTUquare-0.1.0.dist-info/WHEELPK!H({quare-0.1.0.dist-info/METADATAPK!Hx('8Yquare-0.1.0.dist-info/RECORDPK