PK ! quare/__init__.py# -*- coding: utf-8 -*- """Top-level package for quare.""" __author__ = """James Estevez""" __email__ = "j@jstvz.org" __version__ = "0.1.0" PK ! 7(w quare/cli.py# -*- coding: utf-8 -*- """Console script for quare.""" import json import html import click import quare.utils as utils from .quare import get_user_info, get_user_token, set_user_token from .quip import QuipClient, QuipError from .quip_classes import QuipMessage, QuipThread from .quip_websocket import stream_updates from .ui_messages import AUTH_ERROR_MESSAGE @click.group("main", help="") @click.version_option() def main(): """Main quare CLI entry point. This runs as the first step in processing any CLI command. """ @click.command() @click.option("-a", "--alias", default="Default", show_default=True) @click.option("--json", "json_", is_flag=True) def whoami(alias, json_): """Print information about the logged in users.""" try: alias, token = get_user_token(alias) user, user_table = get_user_info(token, alias=alias) if json_: click.echo(json.dumps(user)) else: click.echo(user_table) except QuipError as ex: if ex.code == 401 and "Invalid access_token" in str(ex): raise click.ClickException(AUTH_ERROR_MESSAGE) raise # reraise @click.command() @click.option("-a", "--alias", default="Default") @click.option("-t", "--token", prompt=True) def auth(alias, token): """Store an API authentication token.""" set_user_token(alias, token.strip()) click.secho("Token stored!", fg="bright_green") main.add_command(whoami) main.add_command(auth) @main.group("doc") def document(): """Commands for interacting with documents.""" @document.command(name="append") @click.option( "-a", "--alias", default="Default", help="Quip `auth' alias", show_default=True ) @click.option("-c", "--content", help="Markdown to append to document.") @click.option( "-f", "--file", "fd", type=click.File(), help="Markdown to append to document. Will ignore --content if provided.", ) @click.option("-i", "--id", "doc_id", required=True) @click.option("--json", "json_", is_flag=True) def doc_append(alias, content, fd, doc_id, json_): """Append content to an existing Quip document.""" _, token = get_user_token(alias) content = fd.read() if fd else content if not content: raise click.UsageError('You must pass either "--content" or "--file".') client = QuipClient(token) client.edit_document(doc_id, f"\n{content}\n", format="markdown") @main.group("msg") def message(): """Commands for interacting with chats/comment threads.""" @message.command(name="get") @click.option("-r", "--room", required=True) @click.option("-s", "--since", type=str, default="1999") @click.option("-a", "--alias", default="Default", show_default=True) @click.option("--json", "json_", is_flag=True) @click.option( "-l", "--last", default=200, show_default=True, type=int, help="Last n messages" ) @click.option( "--decending/--ascending", "is_desc", default=False, help="Created date/time sort order", ) def msg_get(room, last, since, alias, json_, is_desc): """Retrieve messages from Quip.""" token, thread = get_thread(alias, room) start_usec = utils.get_usec(since) messages = thread.get_messages(token, last_n=last, start_usec=start_usec) if not is_desc: messages.reverse() print_msgs(messages, json_) @message.command(name="stream") @click.option("-a", "--alias", default="Default", show_default=True) def msg_stream(alias): """Stream messages via Quip's websocket. Close using CTRL-C""" _, token = get_user_token(alias) stream_updates(token) @message.command(name="send") @click.option("-a", "--alias", default="Default", show_default=True) @click.option( "-c", "--content", help="The body of the message to send. Use '-' for stdin" ) @click.option("-r", "--room", required=True) @click.option( "--frame", type=click.Choice(["bubble", "card", "line"]), help="Adjust the display of the message", ) @click.option("--silent", is_flag=True) @click.option( "-m", "--monospace", help="Format the message as monospace code.", is_flag=True ) @click.option("--file", "file_", type=click.File("r")) def msg_send(alias, content, room, frame, silent, monospace, file_): """Send messages to a quip chat/document.""" token, thread = get_thread(alias, room) content = click.get_text_stream("stdin").read() if content == "-" else content content = file_.read() if file_ else content parts = format_msg_content(content, monospace=monospace) msg = thread.send_message( token, content=content, frame=frame, silent=silent, parts=parts ) if msg: messages = thread.get_messages(token, last_n=5) messages.reverse() print_msgs(messages) def format_msg_content(content, monospace=False): if monospace: content = ( "
"
+ html.escape(content, quote=True).replace("\n", "
").replace(" ", " ")
+ ""
)
return json.dumps([["monospace", content]])
else:
return None
def get_thread(alias, room):
"""Send messages to a quip chat/document."""
_, token = get_user_token(alias)
thread = QuipThread.get_thread(token, room)
return token, thread
def print_msgs(messages, json_=False):
"""Print and format messages."""
if json_:
click.echo(json.dumps(messages))
return
else:
for message in messages:
utils.print_quip_message(QuipMessage(**message))
PK !
quare/config.py# -*- coding: utf-8 -*-
"""Configuration handling. Exports config class."""
import os
from pathlib import Path
import yaml
from .exceptions import ConfigNotFoundError
from .ui_messages import MISSING_CONF_ERROR_MESSAGE
class quareConfig:
"""Parses a configuration file and stores/retrieves its values."""
DEFAULT_CONFIG_FILENAME = os.path.join(str(Path.home()), ".quare-conf.yaml")
def __init__(self, config_path=None):
self._find_config(config_path)
self.raw_config = {}
self._read_config()
self._chats = []
self._docs = []
def _find_config(self, config_path):
if config_path is not None and os.path.isfile(config_path):
self.config_path = config_path
else:
self.config_path = self.DEFAULT_CONFIG_FILENAME
def _read_config(self):
if os.path.isfile(self.config_path):
with open(self.config_path) as conf:
self.raw_config = yaml.safe_load(conf.read())
self.defaults = self.raw_config.get("defaults", {})
else:
raise ConfigNotFoundError(MISSING_CONF_ERROR_MESSAGE)
@property
def default_docs(self):
if not self._docs:
self._docs = self.defaults.get("documents", [])
return self._docs
@property
def default_chats(self):
if not self._chats:
self._chats = self.defaults.get("chats", [])
return self._chats
PK ! #Nb- - quare/exceptions.py"""Custom exceptions for quare and QuipClient"""
class QuipTokenNotFoundError(Exception):
"""
Thrown if a Quip token is not in the keyring, and the QUIP_TOKEN environment variable is not set."""
class ConfigNotFoundError(Exception):
"""Thrown if a quare config file does not exist."""
PK ! 0`i i quare/quare.py# -*- coding: utf-8 -*-
"""Contains business logic for interactions with the Quip API."""
import os
import keyring
import terminaltables
from quare import quip, ui_messages
from quare.exceptions import QuipTokenNotFoundError
from quare.quip_classes import QuipFolder
SVC_NAME = "quare"
TOKEN_ENV_VAR = "QUIP_TOKEN" # nosec
def get_user_token(alias="Default"):
"""Retreive an api token from the keyring or environment variable.
Keyword args:
alias -- the alias under which the token is stored (default "Default")
"""
token = keyring.get_password(SVC_NAME, alias)
if type(token) is str:
return alias, token
elif TOKEN_ENV_VAR in os.environ and os.environ[TOKEN_ENV_VAR]:
return alias, os.environ[TOKEN_ENV_VAR]
else:
raise QuipTokenNotFoundError(ui_messages.TOKEN_ERROR_MESSAGE)
def set_user_token(alias, token):
"""Store an API token and associated metadata."""
keyring.set_password(SVC_NAME, alias, token)
def get_user_info(token, alias="User"):
"""Call the Quip API and get information about the authenticated user.
Keyword args:
alias -- the alias under which the token is stored (default "Default")
"""
client = quip.QuipClient(token)
user = client.get_authenticated_user()
table = terminaltables.SingleTable(_format_user_matrix(user), alias)
table.inner_row_border = True
return user, table.table
def send_messages(token, alias, room):
"""Call the Quip API and send a message."""
client = quip.QuipClient(token)
messages = client.send_messages(room)
return messages
def _format_user_matrix(user):
"""Given a Quip user dict, return a matrix that terminal tables will accept."""
return [
["Name", user["name"]],
["Email(s)", ", ".join(user["emails"]) if "emails" in user else ""],
["Quip User ID", user["id"]],
]
def _get_user_favorites(user, token):
"""Given a user dict, recursively retrieve the user's starred items."""
starred_id = user["starred_folder_id"]
client = quip.QuipClient(token)
folder = client.get_folder(starred_id)
print(folder)
folder = QuipFolder(**folder)
return _get_children(folder, token)
def _get_children(folder, token):
"""Get all child threads for this folder."""
client = quip.QuipClient(token)
threads = client.get_threads(folder.child_threads)
return threads
PK ! |
quare/quip.py# Copyright 2014 Quip
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""A Quip API client library.
For full API documentation, visit https://quip.com/api/.
Typical usage:
client = quip.QuipClient(access_token=...)
user = client.get_authenticated_user()
starred = client.get_folder(user["starred_folder_id"])
print "There are", len(starred["children"]), "items in your starred folder"
In addition to standard getters and setters, we provide a few convenience
methods for document editing. For example, you can use `add_to_first_list`
to append items (in Markdown) to the first bulleted or checklist in a
given document, which is useful for automating a task list.
"""
import datetime
import json
import ssl
import sys
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.cElementTree
from importlib import reload
Request = urllib.request.Request
urlencode = urllib.parse.urlencode
urlopen = urllib.request.urlopen
HTTPError = urllib.error.HTTPError
iteritems = dict.items
try:
reload(sys)
sys.setdefaultencoding("utf8")
except Exception:
# Can't change default encoding usually...
pass
try:
ssl.PROTOCOL_TLSv1_1
except AttributeError:
raise Exception(
"Using the Quip API requires an SSL library that supports TLS versions "
">= 1.1; your Python + OpenSSL installation must be upgraded."
)
# After 2017-02, the Quip API HTTPS endpoint requires TLS version 1.1 or later;
# TLS version 1.0 is disabled due to extensive security vulnerabilities.
#
# - macOS: At this time of this writing, macOS ships with Python 2.7 and
# OpenSSL, but the version of OpenSSL is outdated and only supports TLS 1.0.
# (This is even true of the most recent version of macOS (Sierra) with all
# security patches installed; see
# https://eclecticlight.co/2016/03/23/the-tls-mess-in-os-x-el-capitan/ .)
#
# To use this module on a macOS system, you will need to install your own
# copy of Python and OpenSSL. Simple suggestions:
#
# 1) Install Homebrew from http://brew.sh; run "brew install openssl python"
# 2) Install Miniconda from https://conda.io/miniconda.html
#
# - Google App Engine (GAE): Per App Engine's documentation, you must request
# version 2.7.11 of the "ssl" library in your app.yaml file. See:
# https://cloud.google.com/appengine/docs/python/sockets/ssl_support
class QuipError(Exception):
def __init__(self, code, message, http_error):
Exception.__init__(self, "%d: %s" % (code, message))
self.code = code
self.http_error = http_error
class QuipClient(object):
"""A Quip API client"""
# Edit operations
APPEND, PREPEND, AFTER_SECTION, BEFORE_SECTION, REPLACE_SECTION, DELETE_SECTION = range(
6
)
# Folder colors
MANILA, RED, ORANGE, GREEN, BLUE = range(5)
def __init__(
self,
access_token=None,
client_id=None,
client_secret=None,
base_url=None,
request_timeout=None,
):
"""Constructs a Quip API client.
If `access_token` is given, all of the API methods in the client
will work to read and modify Quip documents.
Otherwise, only `get_authorization_url` and `get_access_token`
work, and we assume the client is for a server using the Quip API's
OAuth endpoint.
"""
self.access_token = access_token
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url if base_url else "https://platform.quip.com"
self.request_timeout = request_timeout if request_timeout else 10
def get_authorization_url(self, redirect_uri, state=None):
"""Returns the URL the user should be redirected to to sign in."""
return self._url(
"oauth/login",
redirect_uri=redirect_uri,
state=state,
response_type="code",
client_id=self.client_id,
)
def get_access_token(
self, redirect_uri, code, grant_type="authorization_code", refresh_token=None
):
"""Exchanges a verification code for an access_token.
Once the user is redirected back to your server from the URL
returned by `get_authorization_url`, you can exchange the `code`
argument with this method.
"""
return self._fetch_json(
"oauth/access_token",
redirect_uri=redirect_uri,
code=code,
grant_type=grant_type,
refresh_token=refresh_token,
client_id=self.client_id,
client_secret=self.client_secret,
)
def get_authenticated_user(self):
"""Returns the user corresponding to our access token."""
return self._fetch_json("users/current")
def get_user(self, id):
"""Returns the user with the given ID."""
return self._fetch_json("users/" + id)
def get_users(self, ids):
"""Returns a dictionary of users for the given IDs."""
return self._fetch_json("users/", post_data={"ids": ",".join(ids)})
def update_user(self, user_id, picture_url=None):
return self._fetch_json(
"users/update", post_data={"user_id": user_id, "picture_url": picture_url}
)
def get_contacts(self):
"""Returns a list of the users in the authenticated user's contacts."""
return self._fetch_json("users/contacts")
def get_folder(self, id):
"""Returns the folder with the given ID."""
return self._fetch_json("folders/" + id)
def get_folders(self, ids):
"""Returns a dictionary of folders for the given IDs."""
return self._fetch_json("folders/", post_data={"ids": ",".join(ids)})
def new_folder(self, title, parent_id=None, color=None, member_ids=[]):
return self._fetch_json(
"folders/new",
post_data={
"title": title,
"parent_id": parent_id,
"color": color,
"member_ids": ",".join(member_ids),
},
)
def update_folder(self, folder_id, color=None, title=None):
return self._fetch_json(
"folders/update",
post_data={"folder_id": folder_id, "color": color, "title": title},
)
def add_folder_members(self, folder_id, member_ids):
"""Adds the given users to the given folder."""
return self._fetch_json(
"folders/add-members",
post_data={"folder_id": folder_id, "member_ids": ",".join(member_ids)},
)
def remove_folder_members(self, folder_id, member_ids):
"""Removes the given users from the given folder."""
return self._fetch_json(
"folders/remove-members",
post_data={"folder_id": folder_id, "member_ids": ",".join(member_ids)},
)
def get_teams(self):
"""Returns the teams for the user corresponding to our access token."""
return self._fetch_json("teams/current")
def get_messages(self, thread_id, max_created_usec=None, count=None):
"""Returns the most recent messages for the given thread.
To page through the messages, use max_created_usec, which is the
sort order for the returned messages.
count should be an integer indicating the number of messages you
want returned. The maximum is 100.
"""
return self._fetch_json(
"messages/" + thread_id, max_created_usec=max_created_usec, count=count
)
def new_message(self, thread_id, content=None, **kwargs):
"""Sends a message on the given thread.
`content` is plain text, not HTML.
"""
args = {"thread_id": thread_id, "content": content}
args.update(kwargs)
return self._fetch_json("messages/new", post_data=args)
def get_thread(self, id):
"""Returns the thread with the given ID."""
return self._fetch_json("threads/" + id)
def get_threads(self, ids):
"""Returns a dictionary of threads for the given IDs."""
return self._fetch_json("threads/", post_data={"ids": ",".join(ids)})
def get_recent_threads(self, max_updated_usec=None, count=None, **kwargs):
"""Returns the recently updated threads for a given user."""
return self._fetch_json(
"threads/recent", max_updated_usec=max_updated_usec, count=count, **kwargs
)
def get_matching_threads(
self, query, count=None, only_match_titles=False, **kwargs
):
"""Returns the recently updated threads for a given user."""
return self._fetch_json(
"threads/search",
query=query,
count=count,
only_match_titles=False,
**kwargs
)
def add_thread_members(self, thread_id, member_ids):
"""Adds the given folder or user IDs to the given thread."""
return self._fetch_json(
"threads/add-members",
post_data={"thread_id": thread_id, "member_ids": ",".join(member_ids)},
)
def delete_thread(self, thread_id):
"""Deletes the thread with the given thread id or secret"""
return self._fetch_json("threads/delete", post_data={"thread_id": thread_id})
def remove_thread_members(self, thread_id, member_ids):
"""Removes the given folder or user IDs from the given thread."""
return self._fetch_json(
"threads/remove-members",
post_data={"thread_id": thread_id, "member_ids": ",".join(member_ids)},
)
def pin_to_desktop(self, thread_id, **kwargs):
"""Pins the given thread to desktop."""
args = {"thread_id": thread_id}
args.update(kwargs)
return self._fetch_json("threads/pin-to-desktop", post_data=args)
def move_thread(self, thread_id, source_folder_id, destination_folder_id):
"""Moves the given thread from the source folder to the destination one.
"""
self.add_thread_members(thread_id, [destination_folder_id])
self.remove_thread_members(thread_id, [source_folder_id])
def new_chat(self, message, title=None, member_ids=[]):
"""Creates a chat with the given title and members, and send the
initial message."""
return self._fetch_json(
"threads/new-chat",
post_data={
"message": message,
"title": title,
"member_ids": ",".join(member_ids),
},
)
def new_document(self, content, format="html", title=None, member_ids=[]):
"""Creates a new document from the given content.
To create a document in a folder, include the folder ID in the list
of member_ids, e.g.,
client = quip.QuipClient(...)
user = client.get_authenticated_user()
client.new_document(..., member_ids=[user["archive_folder_id"]])
"""
return self._fetch_json(
"threads/new-document",
post_data={
"content": content,
"format": format,
"title": title,
"member_ids": ",".join(member_ids),
},
)
def copy_document(
self,
thread_id,
folder_ids=None,
member_ids=None,
title=None,
values=None,
**kwargs
):
"""Copies the given document, optionally replaces template variables
in the document with values in 'values' arg. The values argument
must be a dictionary that contains string keys and values that
are either strings, numbers or dictionaries.
"""
args = {"thread_id": thread_id}
if folder_ids:
args["folder_ids"] = ",".join(folder_ids)
if member_ids:
args["member_ids"] = ",".join(member_ids)
if title:
args["title"] = title
if values:
args["values"] = json.dumps(values)
args.update(kwargs)
return self._fetch_json("threads/copy-document", post_data=args)
def merge_comments(self, original_id, children_ids, ignore_user_ids=[]):
"""Given an original document and a set of exact duplicates, copies
all comments and messages on the duplicates to the original.
Impersonates the commentors if the access token used has
permission, but does not add them to the thread.
"""
import re
threads = self.get_threads(children_ids + [original_id])
original_section_ids = re.findall(
r" id='([a-zA-Z0-9]{11})'", threads[original_id]["html"]
)
for thread_id in children_ids:
thread = threads[thread_id]
child_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'", thread["html"])
parent_map = dict(zip(child_section_ids, original_section_ids))
messages = self.get_messages(thread_id)
for message in reversed(messages):
if message["author_id"] in ignore_user_ids:
continue
kwargs = {
"user_id": message["author_id"],
"frame": "bubble",
"service_id": message["id"],
}
if "parts" in message:
kwargs["parts"] = json.dumps(message["parts"])
else:
kwargs["content"] = message["text"]
if "annotation" in message:
section_id = None
if "highlight_section_ids" in message["annotation"]:
section_id = message["annotation"]["highlight_section_ids"][0]
else:
anno_loc = thread["html"].find(
'