PKt%Ofbchat/__init__.py# -*- coding: UTF-8 -*- """Facebook Chat (Messenger) for Python :copyright: (c) 2015 - 2019 by Taehoon Kim :license: BSD 3-Clause, see LICENSE for more details. """ from __future__ import unicode_literals # These imports are far too general, but they're needed for backwards compatbility. from .models import * from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" __version__ = "1.8.2" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" __license__ = "BSD 3-Clause" __author__ = "Taehoon Kim; Moreels Pieter-Jan; Mads Marquart" __email__ = "carpedm20@gmail.com" __all__ = ["Client"] PKt%O{^ fbchat/_attachment.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from . import _util @attr.s(cmp=False) class Attachment(object): """Represents a Facebook attachment.""" #: The attachment ID uid = attr.ib(None) @attr.s(cmp=False) class UnsentMessage(Attachment): """Represents an unsent message attachment.""" @attr.s(cmp=False) class ShareAttachment(Attachment): """Represents a shared item (e.g. URL) attachment.""" #: ID of the author of the shared post author = attr.ib(None) #: Target URL url = attr.ib(None) #: Original URL if Facebook redirects the URL original_url = attr.ib(None) #: Title of the attachment title = attr.ib(None) #: Description of the attachment description = attr.ib(None) #: Name of the source source = attr.ib(None) #: URL of the attachment image image_url = attr.ib(None) #: URL of the original image if Facebook uses ``safe_image`` original_image_url = attr.ib(None) #: Width of the image image_width = attr.ib(None) #: Height of the image image_height = attr.ib(None) #: List of additional attachments attachments = attr.ib(factory=list, converter=lambda x: [] if x is None else x) # Put here for backwards compatibility, so that the init argument order is preserved uid = attr.ib(None) @classmethod def _from_graphql(cls, data): from . import _file url = data.get("url") rtn = cls( uid=data.get("deduplication_key"), author=data["target"]["actors"][0]["id"] if data["target"].get("actors") else None, url=url, original_url=_util.get_url_parameter(url, "u") if "/l.php?u=" in url else url, title=data["title_with_entities"].get("text"), description=data["description"].get("text") if data.get("description") else None, source=data["source"].get("text") if data.get("source") else None, attachments=[ _file.graphql_to_subattachment(attachment) for attachment in data.get("subattachments") ], ) media = data.get("media") if media and media.get("image"): image = media["image"] rtn.image_url = image.get("uri") rtn.original_image_url = ( _util.get_url_parameter(rtn.image_url, "url") if "/safe_image.php" in rtn.image_url else rtn.image_url ) rtn.image_width = image.get("width") rtn.image_height = image.get("height") return rtn PKt%OTB  fbchat/_client.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import requests import urllib from uuid import uuid1 from random import choice from bs4 import BeautifulSoup as bs from mimetypes import guess_type from collections import OrderedDict from ._util import * from .models import * from . import _graphql from ._state import State import time import json try: from urllib.parse import urlparse, parse_qs except ImportError: from urlparse import urlparse, parse_qs ACONTEXT = { "action_history": [ {"surface": "messenger_chat_tab", "mechanism": "messenger_composer"} ] } class Client(object): """A client for the Facebook Chat (Messenger). This is the main class of ``fbchat``, which contains all the methods you use to interact with Facebook. You can extend this class, and overwrite the ``on`` methods, to provide custom event handling (mainly useful while listening). """ listening = False """Whether the client is listening. Used when creating an external event loop to determine when to stop listening. """ @property def ssl_verify(self): """Verify SSL certificate. Set to False to allow debugging with a proxy. """ # TODO: Deprecate this return self._state._session.verify @ssl_verify.setter def ssl_verify(self, value): self._state._session.verify = value @property def uid(self): """The ID of the client. Can be used as ``thread_id``. See :ref:`intro_threads` for more info. """ return self._uid def __init__( self, email, password, user_agent=None, max_tries=5, session_cookies=None, logging_level=logging.INFO, ): """Initialize and log in the client. Args: email: Facebook ``email``, ``id`` or ``phone number`` password: Facebook account password user_agent: Custom user agent to use when sending requests. If `None`, user agent will be chosen from a premade list max_tries (int): Maximum number of times to try logging in session_cookies (dict): Cookies from a previous session (Will default to login if these are invalid) logging_level (int): Configures the `logging level `_. Defaults to ``logging.INFO`` Raises: FBchatException: On failed login """ self._sticky, self._pool = (None, None) self._seq = "0" self._default_thread_id = None self._default_thread_type = None self._pull_channel = 0 self._markAlive = True self._buddylist = dict() handler.setLevel(logging_level) # If session cookies aren't set, not properly loaded or gives us an invalid session, then do the login if ( not session_cookies or not self.setSession(session_cookies, user_agent=user_agent) or not self.isLoggedIn() ): self.login(email, password, max_tries, user_agent=user_agent) """ INTERNAL REQUEST METHODS """ def _get(self, url, params): return self._state._get(url, params) def _post(self, url, params, files=None): return self._state._post(url, params, files=files) def _payload_post(self, url, data, files=None): return self._state._payload_post(url, data, files=files) def graphql_requests(self, *queries): """Execute GraphQL queries. Args: queries (dict): Zero or more dictionaries Returns: tuple: A tuple containing JSON GraphQL queries Raises: FBchatException: If request failed """ return tuple(self._state._graphql_requests(*queries)) def graphql_request(self, query): """Shorthand for ``graphql_requests(query)[0]``. Raises: FBchatException: If request failed """ return self.graphql_requests(query)[0] """ END INTERNAL REQUEST METHODS """ """ LOGIN METHODS """ def isLoggedIn(self): """Send a request to Facebook to check the login status. Returns: bool: True if the client is still logged in """ return self._state.is_logged_in() def getSession(self): """Retrieve session cookies. Returns: dict: A dictionary containing session cookies """ return self._state.get_cookies() def setSession(self, session_cookies, user_agent=None): """Load session cookies. Args: session_cookies (dict): A dictionary containing session cookies Returns: bool: False if ``session_cookies`` does not contain proper cookies """ try: # Load cookies into current session self._state = State.from_cookies(session_cookies, user_agent=user_agent) self._uid = self._state.user_id except Exception as e: log.exception("Failed loading session") return False return True def login(self, email, password, max_tries=5, user_agent=None): """Login the user, using ``email`` and ``password``. If the user is already logged in, this will do a re-login. Args: email: Facebook ``email`` or ``id`` or ``phone number`` password: Facebook account password max_tries (int): Maximum number of times to try logging in Raises: FBchatException: On failed login """ self.onLoggingIn(email=email) if max_tries < 1: raise FBchatUserError("Cannot login: max_tries should be at least one") if not (email and password): raise FBchatUserError("Email and password not set") for i in range(1, max_tries + 1): try: self._state = State.login( email, password, on_2fa_callback=self.on2FACode, user_agent=user_agent, ) self._uid = self._state.user_id except Exception: if i >= max_tries: raise log.exception("Attempt #{} failed, retrying".format(i)) time.sleep(1) else: self.onLoggedIn(email=email) break def logout(self): """Safely log out the client. Returns: bool: True if the action was successful """ if self._state.logout(): self._state = None self._uid = None return True return False """ END LOGIN METHODS """ """ DEFAULT THREAD METHODS """ def _getThread(self, given_thread_id=None, given_thread_type=None): """Check if thread ID is given and if default is set, and return correct values. Returns: tuple: Thread ID and thread type Raises: ValueError: If thread ID is not given and there is no default """ if given_thread_id is None: if self._default_thread_id is not None: return self._default_thread_id, self._default_thread_type else: raise ValueError("Thread ID is not set") else: return given_thread_id, given_thread_type def setDefaultThread(self, thread_id, thread_type): """Set default thread to send messages to. Args: thread_id: User/Group ID to default to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` """ self._default_thread_id = thread_id self._default_thread_type = thread_type def resetDefaultThread(self): """Reset default thread.""" self.setDefaultThread(None, None) """ END DEFAULT THREAD METHODS """ """ FETCH METHODS """ def _forcedFetch(self, thread_id, mid): params = {"thread_and_message_id": {"thread_id": thread_id, "message_id": mid}} j, = self.graphql_requests(_graphql.from_doc_id("1768656253222505", params)) return j def fetchThreads(self, thread_location, before=None, after=None, limit=None): """Fetch all threads in ``thread_location``. Threads will be sorted from newest to oldest. Args: thread_location (ThreadLocation): INBOX, PENDING, ARCHIVED or OTHER before: Fetch only thread before this epoch (in ms) (default all threads) after: Fetch only thread after this epoch (in ms) (default all threads) limit: The max. amount of threads to fetch (default all threads) Returns: list: :class:`Thread` objects Raises: FBchatException: If request failed """ threads = [] last_thread_timestamp = None while True: # break if limit is exceeded if limit and len(threads) >= limit: break # fetchThreadList returns at max 20 threads before last_thread_timestamp (included) candidates = self.fetchThreadList( before=last_thread_timestamp, thread_location=thread_location ) if len(candidates) > 1: threads += candidates[1:] else: # End of threads break last_thread_timestamp = threads[-1].last_message_timestamp # FB returns a sorted list of threads if (before is not None and int(last_thread_timestamp) > before) or ( after is not None and int(last_thread_timestamp) < after ): break # Return only threads between before and after (if set) if before is not None or after is not None: for t in threads: last_message_timestamp = int(t.last_message_timestamp) if (before is not None and last_message_timestamp > before) or ( after is not None and last_message_timestamp < after ): threads.remove(t) if limit and len(threads) > limit: return threads[:limit] return threads def fetchAllUsersFromThreads(self, threads): """Fetch all users involved in given threads. Args: threads: Thread: List of threads to check for users Returns: list: :class:`User` objects Raises: FBchatException: If request failed """ users = [] users_to_fetch = [] # It's more efficient to fetch all users in one request for thread in threads: if thread.type == ThreadType.USER: if thread.uid not in [user.uid for user in users]: users.append(thread) elif thread.type == ThreadType.GROUP: for user_id in thread.participants: if ( user_id not in [user.uid for user in users] and user_id not in users_to_fetch ): users_to_fetch.append(user_id) for user_id, user in self.fetchUserInfo(*users_to_fetch).items(): users.append(user) return users def fetchAllUsers(self): """Fetch all users the client is currently chatting with. Returns: list: :class:`User` objects Raises: FBchatException: If request failed """ data = {"viewer": self._uid} j = self._payload_post("/chat/user_info_all", data) users = [] for data in j.values(): if data["type"] in ["user", "friend"]: if data["id"] in ["0", 0]: # Skip invalid users continue users.append(User._from_all_fetch(data)) return users def searchForUsers(self, name, limit=10): """Find and get users by their name. Args: name: Name of the user limit: The max. amount of users to fetch Returns: list: :class:`User` objects, ordered by relevance Raises: FBchatException: If request failed """ params = {"search": name, "limit": limit} j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_USER, params)) return [User._from_graphql(node) for node in j[name]["users"]["nodes"]] def searchForPages(self, name, limit=10): """Find and get pages by their name. Args: name: Name of the page Returns: list: :class:`Page` objects, ordered by relevance Raises: FBchatException: If request failed """ params = {"search": name, "limit": limit} j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_PAGE, params)) return [Page._from_graphql(node) for node in j[name]["pages"]["nodes"]] def searchForGroups(self, name, limit=10): """Find and get group threads by their name. Args: name: Name of the group thread limit: The max. amount of groups to fetch Returns: list: :class:`Group` objects, ordered by relevance Raises: FBchatException: If request failed """ params = {"search": name, "limit": limit} j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_GROUP, params)) return [Group._from_graphql(node) for node in j["viewer"]["groups"]["nodes"]] def searchForThreads(self, name, limit=10): """Find and get threads by their name. Args: name: Name of the thread limit: The max. amount of groups to fetch Returns: list: :class:`User`, :class:`Group` and :class:`Page` objects, ordered by relevance Raises: FBchatException: If request failed """ params = {"search": name, "limit": limit} j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_THREAD, params)) rtn = [] for node in j[name]["threads"]["nodes"]: if node["__typename"] == "User": rtn.append(User._from_graphql(node)) elif node["__typename"] == "MessageThread": # MessageThread => Group thread rtn.append(Group._from_graphql(node)) elif node["__typename"] == "Page": rtn.append(Page._from_graphql(node)) elif node["__typename"] == "Group": # We don't handle Facebook "Groups" pass else: log.warning( "Unknown type {} in {}".format(repr(node["__typename"]), node) ) return rtn def searchForMessageIDs(self, query, offset=0, limit=5, thread_id=None): """Find and get message IDs by query. Args: query: Text to search for offset (int): Number of messages to skip limit (int): Max. number of messages to retrieve thread_id: User/Group ID to search in. See :ref:`intro_threads` Returns: typing.Iterable: Found Message IDs Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = { "query": query, "snippetOffset": offset, "snippetLimit": limit, "identifier": "thread_fbid", "thread_fbid": thread_id, } j = self._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data) result = j["search_snippets"][query] snippets = result[thread_id]["snippets"] if result.get(thread_id) else [] for snippet in snippets: yield snippet["message_id"] def searchForMessages(self, query, offset=0, limit=5, thread_id=None): """Find and get `Message` objects by query. Warning: This method sends request for every found message ID. Args: query: Text to search for offset (int): Number of messages to skip limit (int): Max. number of messages to retrieve thread_id: User/Group ID to search in. See :ref:`intro_threads` Returns: typing.Iterable: Found :class:`Message` objects Raises: FBchatException: If request failed """ message_ids = self.searchForMessageIDs( query, offset=offset, limit=limit, thread_id=thread_id ) for mid in message_ids: yield self.fetchMessageInfo(mid, thread_id) def search(self, query, fetch_messages=False, thread_limit=5, message_limit=5): """Search for messages in all threads. Args: query: Text to search for fetch_messages: Whether to fetch :class:`Message` objects or IDs only thread_limit (int): Max. number of threads to retrieve message_limit (int): Max. number of messages to retrieve Returns: typing.Dict[str, typing.Iterable]: Dictionary with thread IDs as keys and iterables to get messages as values Raises: FBchatException: If request failed """ data = {"query": query, "snippetLimit": thread_limit} j = self._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data) result = j["search_snippets"][query] if not result: return {} if fetch_messages: search_method = self.searchForMessages else: search_method = self.searchForMessageIDs return { thread_id: search_method(query, limit=message_limit, thread_id=thread_id) for thread_id in result } def _fetchInfo(self, *ids): data = {"ids[{}]".format(i): _id for i, _id in enumerate(ids)} j = self._payload_post("/chat/user_info/", data) if j.get("profiles") is None: raise FBchatException("No users/pages returned: {}".format(j)) entries = {} for _id in j["profiles"]: k = j["profiles"][_id] if k["type"] in ["user", "friend"]: entries[_id] = { "id": _id, "type": ThreadType.USER, "url": k.get("uri"), "first_name": k.get("firstName"), "is_viewer_friend": k.get("is_friend"), "gender": k.get("gender"), "profile_picture": {"uri": k.get("thumbSrc")}, "name": k.get("name"), } elif k["type"] == "page": entries[_id] = { "id": _id, "type": ThreadType.PAGE, "url": k.get("uri"), "profile_picture": {"uri": k.get("thumbSrc")}, "name": k.get("name"), } else: raise FBchatException( "{} had an unknown thread type: {}".format(_id, k) ) log.debug(entries) return entries def fetchUserInfo(self, *user_ids): """Fetch users' info from IDs, unordered. Warning: Sends two requests, to fetch all available info! Args: user_ids: One or more user ID(s) to query Returns: dict: :class:`User` objects, labeled by their ID Raises: FBchatException: If request failed """ threads = self.fetchThreadInfo(*user_ids) users = {} for id_, thread in threads.items(): if thread.type == ThreadType.USER: users[id_] = thread else: raise FBchatUserError("Thread {} was not a user".format(thread)) return users def fetchPageInfo(self, *page_ids): """Fetch pages' info from IDs, unordered. Warning: Sends two requests, to fetch all available info! Args: page_ids: One or more page ID(s) to query Returns: dict: :class:`Page` objects, labeled by their ID Raises: FBchatException: If request failed """ threads = self.fetchThreadInfo(*page_ids) pages = {} for id_, thread in threads.items(): if thread.type == ThreadType.PAGE: pages[id_] = thread else: raise FBchatUserError("Thread {} was not a page".format(thread)) return pages def fetchGroupInfo(self, *group_ids): """Fetch groups' info from IDs, unordered. Args: group_ids: One or more group ID(s) to query Returns: dict: :class:`Group` objects, labeled by their ID Raises: FBchatException: If request failed """ threads = self.fetchThreadInfo(*group_ids) groups = {} for id_, thread in threads.items(): if thread.type == ThreadType.GROUP: groups[id_] = thread else: raise FBchatUserError("Thread {} was not a group".format(thread)) return groups def fetchThreadInfo(self, *thread_ids): """Fetch threads' info from IDs, unordered. Warning: Sends two requests if users or pages are present, to fetch all available info! Args: thread_ids: One or more thread ID(s) to query Returns: dict: :class:`Thread` objects, labeled by their ID Raises: FBchatException: If request failed """ queries = [] for thread_id in thread_ids: params = { "id": thread_id, "message_limit": 0, "load_messages": False, "load_read_receipts": False, "before": None, } queries.append(_graphql.from_doc_id("2147762685294928", params)) j = self.graphql_requests(*queries) for i, entry in enumerate(j): if entry.get("message_thread") is None: # If you don't have an existing thread with this person, attempt to retrieve user data anyways j[i]["message_thread"] = { "thread_key": {"other_user_id": thread_ids[i]}, "thread_type": "ONE_TO_ONE", } pages_and_user_ids = [ k["message_thread"]["thread_key"]["other_user_id"] for k in j if k["message_thread"].get("thread_type") == "ONE_TO_ONE" ] pages_and_users = {} if len(pages_and_user_ids) != 0: pages_and_users = self._fetchInfo(*pages_and_user_ids) rtn = {} for i, entry in enumerate(j): entry = entry["message_thread"] if entry.get("thread_type") == "GROUP": _id = entry["thread_key"]["thread_fbid"] rtn[_id] = Group._from_graphql(entry) elif entry.get("thread_type") == "ONE_TO_ONE": _id = entry["thread_key"]["other_user_id"] if pages_and_users.get(_id) is None: raise FBchatException("Could not fetch thread {}".format(_id)) entry.update(pages_and_users[_id]) if entry["type"] == ThreadType.USER: rtn[_id] = User._from_graphql(entry) else: rtn[_id] = Page._from_graphql(entry) else: raise FBchatException( "{} had an unknown thread type: {}".format(thread_ids[i], entry) ) return rtn def fetchThreadMessages(self, thread_id=None, limit=20, before=None): """Fetch messages in a thread, ordered by most recent. Args: thread_id: User/Group ID to get messages from. See :ref:`intro_threads` limit (int): Max. number of messages to retrieve before (int): A timestamp, indicating from which point to retrieve messages Returns: list: :class:`Message` objects Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) params = { "id": thread_id, "message_limit": limit, "load_messages": True, "load_read_receipts": True, "before": before, } j, = self.graphql_requests(_graphql.from_doc_id("1860982147341344", params)) if j.get("message_thread") is None: raise FBchatException("Could not fetch thread {}: {}".format(thread_id, j)) messages = [ Message._from_graphql(message) for message in j["message_thread"]["messages"]["nodes"] ] messages.reverse() read_receipts = j["message_thread"]["read_receipts"]["nodes"] for message in messages: for receipt in read_receipts: if int(receipt["watermark"]) >= int(message.timestamp): message.read_by.append(receipt["actor"]["id"]) return messages def fetchThreadList( self, offset=None, limit=20, thread_location=ThreadLocation.INBOX, before=None ): """Fetch the client's thread list. Args: offset: Deprecated. Do not use! limit (int): Max. number of threads to retrieve. Capped at 20 thread_location (ThreadLocation): INBOX, PENDING, ARCHIVED or OTHER before (int): A timestamp (in milliseconds), indicating from which point to retrieve threads Returns: list: :class:`Thread` objects Raises: FBchatException: If request failed """ if offset is not None: log.warning( "Using `offset` in `fetchThreadList` is no longer supported, " "since Facebook migrated to the use of GraphQL in this request. " "Use `before` instead." ) if limit > 20 or limit < 1: raise FBchatUserError("`limit` should be between 1 and 20") if thread_location in ThreadLocation: loc_str = thread_location.value else: raise FBchatUserError('"thread_location" must be a value of ThreadLocation') params = { "limit": limit, "tags": [loc_str], "before": before, "includeDeliveryReceipts": True, "includeSeqID": False, } j, = self.graphql_requests(_graphql.from_doc_id("1349387578499440", params)) rtn = [] for node in j["viewer"]["message_threads"]["nodes"]: _type = node.get("thread_type") if _type == "GROUP": rtn.append(Group._from_graphql(node)) elif _type == "ONE_TO_ONE": rtn.append(User._from_thread_fetch(node)) else: raise FBchatException( "Unknown thread type: {}, with data: {}".format(_type, node) ) return rtn def fetchUnread(self): """Fetch unread threads. Returns: list: List of unread thread ids Raises: FBchatException: If request failed """ form = { "folders[0]": "inbox", "client": "mercury", "last_action_timestamp": now() - 60 * 1000 # 'last_action_timestamp': 0 } j = self._payload_post("/ajax/mercury/unread_threads.php", form) result = j["unread_thread_fbids"][0] return result["thread_fbids"] + result["other_user_fbids"] def fetchUnseen(self): """Fetch unseen / new threads. Returns: list: List of unseen thread ids Raises: FBchatException: If request failed """ j = self._payload_post("/mercury/unseen_thread_ids/", {}) result = j["unseen_thread_fbids"][0] return result["thread_fbids"] + result["other_user_fbids"] def fetchImageUrl(self, image_id): """Fetch URL to download the original image from an image attachment ID. Args: image_id (str): The image you want to fetch Returns: str: An URL where you can download the original image Raises: FBchatException: If request failed """ image_id = str(image_id) data = {"photo_id": str(image_id)} j = self._post("/mercury/attachments/photo/", data) url = get_jsmods_require(j, 3) if url is None: raise FBchatException("Could not fetch image URL from: {}".format(j)) return url def fetchMessageInfo(self, mid, thread_id=None): """Fetch `Message` object from the given message id. Args: mid: Message ID to fetch from thread_id: User/Group ID to get message info from. See :ref:`intro_threads` Returns: Message: :class:`Message` object Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) message_info = self._forcedFetch(thread_id, mid).get("message") return Message._from_graphql(message_info) def fetchPollOptions(self, poll_id): """Fetch list of `PollOption` objects from the poll id. Args: poll_id: Poll ID to fetch from Returns: list Raises: FBchatException: If request failed """ data = {"question_id": poll_id} j = self._payload_post("/ajax/mercury/get_poll_options", data) return [PollOption._from_graphql(m) for m in j] def fetchPlanInfo(self, plan_id): """Fetch `Plan` object from the plan id. Args: plan_id: Plan ID to fetch from Returns: Plan: :class:`Plan` object Raises: FBchatException: If request failed """ data = {"event_reminder_id": plan_id} j = self._payload_post("/ajax/eventreminder", data) return Plan._from_fetch(j) def _getPrivateData(self): j, = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {})) return j["viewer"] def getPhoneNumbers(self): """Fetch list of user's phone numbers. Returns: list: List of phone numbers """ data = self._getPrivateData() return [ j["phone_number"]["universal_number"] for j in data["user"]["all_phones"] ] def getEmails(self): """Fetch list of user's emails. Returns: list: List of emails """ data = self._getPrivateData() return [j["display_email"] for j in data["all_emails"]] def getUserActiveStatus(self, user_id): """Fetch friend active status as an `ActiveStatus` object. Return ``None`` if status isn't known. Warning: Only works when listening. Args: user_id: ID of the user Returns: ActiveStatus: Given user active status """ return self._buddylist.get(str(user_id)) def fetchThreadImages(self, thread_id=None): """Fetch images posted in thread. Args: thread_id: ID of the thread Returns: typing.Iterable: :class:`ImageAttachment` or :class:`VideoAttachment` """ thread_id, thread_type = self._getThread(thread_id, None) data = {"id": thread_id, "first": 48} thread_id = str(thread_id) j, = self.graphql_requests(_graphql.from_query_id("515216185516880", data)) while True: try: i = j[thread_id]["message_shared_media"]["edges"][0] except IndexError: if j[thread_id]["message_shared_media"]["page_info"].get( "has_next_page" ): data["after"] = j[thread_id]["message_shared_media"][ "page_info" ].get("end_cursor") j, = self.graphql_requests( _graphql.from_query_id("515216185516880", data) ) continue else: break if i["node"].get("__typename") == "MessageImage": yield ImageAttachment._from_list(i) elif i["node"].get("__typename") == "MessageVideo": yield VideoAttachment._from_list(i) else: yield Attachment(uid=i["node"].get("legacy_attachment_id")) del j[thread_id]["message_shared_media"]["edges"][0] """ END FETCH METHODS """ """ SEND METHODS """ def _oldMessage(self, message): return message if isinstance(message, Message) else Message(text=message) def _doSendRequest(self, data, get_thread_id=False): """Send the data to `SendURL`, and returns the message ID or None on failure.""" mid, thread_id = self._state._do_send_request(data) if get_thread_id: return mid, thread_id else: return mid def send(self, message, thread_id=None, thread_type=ThreadType.USER): """Send message to a thread. Args: message (Message): Message to send thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent message Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, thread_type) thread = thread_type._to_class()(thread_id) data = thread._to_send_data() data.update(message._to_send_data()) return self._doSendRequest(data) def sendMessage(self, message, thread_id=None, thread_type=ThreadType.USER): """Deprecated. Use :func:`fbchat.Client.send` instead.""" return self.send( Message(text=message), thread_id=thread_id, thread_type=thread_type ) def sendEmoji( self, emoji=None, size=EmojiSize.SMALL, thread_id=None, thread_type=ThreadType.USER, ): """Deprecated. Use :func:`fbchat.Client.send` instead.""" return self.send( Message(text=emoji, emoji_size=size), thread_id=thread_id, thread_type=thread_type, ) def wave(self, wave_first=True, thread_id=None, thread_type=None): """Wave hello to a thread. Args: wave_first: Whether to wave first or wave back thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent message Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, thread_type) thread = thread_type._to_class()(thread_id) data = thread._to_send_data() data["action_type"] = "ma-type:user-generated-message" data["lightweight_action_attachment[lwa_state]"] = ( "INITIATED" if wave_first else "RECIPROCATED" ) data["lightweight_action_attachment[lwa_type]"] = "WAVE" if thread_type == ThreadType.USER: data["specific_to_list[0]"] = "fbid:{}".format(thread_id) return self._doSendRequest(data) def quickReply(self, quick_reply, payload=None, thread_id=None, thread_type=None): """Reply to chosen quick reply. Args: quick_reply (QuickReply): Quick reply to reply to payload: Optional answer to the quick reply thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent message Raises: FBchatException: If request failed """ quick_reply.is_response = True if isinstance(quick_reply, QuickReplyText): return self.send( Message(text=quick_reply.title, quick_replies=[quick_reply]) ) elif isinstance(quick_reply, QuickReplyLocation): if not isinstance(payload, LocationAttachment): raise ValueError( "Payload must be an instance of `fbchat.LocationAttachment`" ) return self.sendLocation( payload, thread_id=thread_id, thread_type=thread_type ) elif isinstance(quick_reply, QuickReplyEmail): if not payload: payload = self.getEmails()[0] quick_reply.external_payload = quick_reply.payload quick_reply.payload = payload return self.send(Message(text=payload, quick_replies=[quick_reply])) elif isinstance(quick_reply, QuickReplyPhoneNumber): if not payload: payload = self.getPhoneNumbers()[0] quick_reply.external_payload = quick_reply.payload quick_reply.payload = payload return self.send(Message(text=payload, quick_replies=[quick_reply])) def unsend(self, mid): """Unsend message by it's ID (removes it for everyone). Args: mid: :ref:`Message ID ` of the message to unsend """ data = {"message_id": mid} j = self._payload_post("/messaging/unsend_message/?dpr=1", data) def _sendLocation( self, location, current=True, message=None, thread_id=None, thread_type=None ): thread_id, thread_type = self._getThread(thread_id, thread_type) thread = thread_type._to_class()(thread_id) data = thread._to_send_data() if message is not None: data.update(message._to_send_data()) data["action_type"] = "ma-type:user-generated-message" data["location_attachment[coordinates][latitude]"] = location.latitude data["location_attachment[coordinates][longitude]"] = location.longitude data["location_attachment[is_current_location]"] = current return self._doSendRequest(data) def sendLocation(self, location, message=None, thread_id=None, thread_type=None): """Send a given location to a thread as the user's current location. Args: location (LocationAttachment): Location to send message (Message): Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent message Raises: FBchatException: If request failed """ self._sendLocation( location=location, current=True, message=message, thread_id=thread_id, thread_type=thread_type, ) def sendPinnedLocation( self, location, message=None, thread_id=None, thread_type=None ): """Send a given location to a thread as a pinned location. Args: location (LocationAttachment): Location to send message (Message): Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent message Raises: FBchatException: If request failed """ self._sendLocation( location=location, current=False, message=message, thread_id=thread_id, thread_type=thread_type, ) def _upload(self, files, voice_clip=False): return self._state._upload(files, voice_clip=voice_clip) def _sendFiles( self, files, message=None, thread_id=None, thread_type=ThreadType.USER ): """Send files from file IDs to a thread. `files` should be a list of tuples, with a file's ID and mimetype. """ thread_id, thread_type = self._getThread(thread_id, thread_type) thread = thread_type._to_class()(thread_id) data = thread._to_send_data() data.update(self._oldMessage(message)._to_send_data()) data["action_type"] = "ma-type:user-generated-message" data["has_attachment"] = True for i, (file_id, mimetype) in enumerate(files): data["{}s[{}]".format(mimetype_to_key(mimetype), i)] = file_id return self._doSendRequest(data) def sendRemoteFiles( self, file_urls, message=None, thread_id=None, thread_type=ThreadType.USER ): """Send files from URLs to a thread. Args: file_urls: URLs of files to upload and send message: Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent files Raises: FBchatException: If request failed """ file_urls = require_list(file_urls) files = self._upload(get_files_from_urls(file_urls)) return self._sendFiles( files=files, message=message, thread_id=thread_id, thread_type=thread_type ) def sendLocalFiles( self, file_paths, message=None, thread_id=None, thread_type=ThreadType.USER ): """Send local files to a thread. Args: file_paths: Paths of files to upload and send message: Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent files Raises: FBchatException: If request failed """ file_paths = require_list(file_paths) with get_files_from_paths(file_paths) as x: files = self._upload(x) return self._sendFiles( files=files, message=message, thread_id=thread_id, thread_type=thread_type ) def sendRemoteVoiceClips( self, clip_urls, message=None, thread_id=None, thread_type=ThreadType.USER ): """Send voice clips from URLs to a thread. Args: clip_urls: URLs of clips to upload and send message: Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent files Raises: FBchatException: If request failed """ clip_urls = require_list(clip_urls) files = self._upload(get_files_from_urls(clip_urls), voice_clip=True) return self._sendFiles( files=files, message=message, thread_id=thread_id, thread_type=thread_type ) def sendLocalVoiceClips( self, clip_paths, message=None, thread_id=None, thread_type=ThreadType.USER ): """Send local voice clips to a thread. Args: clip_paths: Paths of clips to upload and send message: Additional message thread_id: User/Group ID to send to. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Returns: :ref:`Message ID ` of the sent files Raises: FBchatException: If request failed """ clip_paths = require_list(clip_paths) with get_files_from_paths(clip_paths) as x: files = self._upload(x, voice_clip=True) return self._sendFiles( files=files, message=message, thread_id=thread_id, thread_type=thread_type ) def sendImage( self, image_id, message=None, thread_id=None, thread_type=ThreadType.USER, is_gif=False, ): """Deprecated.""" if is_gif: mimetype = "image/gif" else: mimetype = "image/png" return self._sendFiles( files=[(image_id, mimetype)], message=message, thread_id=thread_id, thread_type=thread_type, ) def sendRemoteImage( self, image_url, message=None, thread_id=None, thread_type=ThreadType.USER ): """Deprecated. Use :func:`fbchat.Client.sendRemoteFiles` instead.""" return self.sendRemoteFiles( file_urls=[image_url], message=message, thread_id=thread_id, thread_type=thread_type, ) def sendLocalImage( self, image_path, message=None, thread_id=None, thread_type=ThreadType.USER ): """Deprecated. Use :func:`fbchat.Client.sendLocalFiles` instead.""" return self.sendLocalFiles( file_paths=[image_path], message=message, thread_id=thread_id, thread_type=thread_type, ) def forwardAttachment(self, attachment_id, thread_id=None): """Forward an attachment. Args: attachment_id: Attachment ID to forward thread_id: User/Group ID to send to. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = { "attachment_id": attachment_id, "recipient_map[{}]".format(generateOfflineThreadingID()): thread_id, } j = self._payload_post("/mercury/attachments/forward/", data) if not j.get("success"): raise FBchatFacebookError( "Failed forwarding attachment: {}".format(j["error"]), fb_error_message=j["error"], ) def createGroup(self, message, user_ids): """Create a group with the given user ids. Args: message: The initial message user_ids: A list of users to create the group with. Returns: ID of the new group Raises: FBchatException: If request failed """ data = self._oldMessage(message)._to_send_data() if len(user_ids) < 2: raise FBchatUserError("Error when creating group: Not enough participants") for i, user_id in enumerate(user_ids + [self._uid]): data["specific_to_list[{}]".format(i)] = "fbid:{}".format(user_id) message_id, thread_id = self._doSendRequest(data, get_thread_id=True) if not thread_id: raise FBchatException( "Error when creating group: No thread_id could be found" ) return thread_id def addUsersToGroup(self, user_ids, thread_id=None): """Add users to a group. Args: user_ids (list): One or more user IDs to add thread_id: Group ID to add people to. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = Group(thread_id)._to_send_data() data["action_type"] = "ma-type:log-message" data["log_message_type"] = "log:subscribe" user_ids = require_list(user_ids) for i, user_id in enumerate(user_ids): if user_id == self._uid: raise FBchatUserError( "Error when adding users: Cannot add self to group thread" ) else: data[ "log_message_data[added_participants][{}]".format(i) ] = "fbid:{}".format(user_id) return self._doSendRequest(data) def removeUserFromGroup(self, user_id, thread_id=None): """Remove user from a group. Args: user_id: User ID to remove thread_id: Group ID to remove people from. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = {"uid": user_id, "tid": thread_id} j = self._payload_post("/chat/remove_participants/", data) def _adminStatus(self, admin_ids, admin, thread_id=None): thread_id, thread_type = self._getThread(thread_id, None) data = {"add": admin, "thread_fbid": thread_id} admin_ids = require_list(admin_ids) for i, admin_id in enumerate(admin_ids): data["admin_ids[{}]".format(i)] = str(admin_id) j = self._payload_post("/messaging/save_admins/?dpr=1", data) def addGroupAdmins(self, admin_ids, thread_id=None): """Set specified users as group admins. Args: admin_ids: One or more user IDs to set admin thread_id: Group ID to remove people from. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._adminStatus(admin_ids, True, thread_id) def removeGroupAdmins(self, admin_ids, thread_id=None): """Remove admin status from specified users. Args: admin_ids: One or more user IDs to remove admin thread_id: Group ID to remove people from. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._adminStatus(admin_ids, False, thread_id) def changeGroupApprovalMode(self, require_admin_approval, thread_id=None): """Change group's approval mode. Args: require_admin_approval: True or False thread_id: Group ID to remove people from. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = {"set_mode": int(require_admin_approval), "thread_fbid": thread_id} j = self._payload_post("/messaging/set_approval_mode/?dpr=1", data) def _usersApproval(self, user_ids, approve, thread_id=None): thread_id, thread_type = self._getThread(thread_id, None) user_ids = list(require_list(user_ids)) data = { "client_mutation_id": "0", "actor_id": self._uid, "thread_fbid": thread_id, "user_ids": user_ids, "response": "ACCEPT" if approve else "DENY", "surface": "ADMIN_MODEL_APPROVAL_CENTER", } j, = self.graphql_requests( _graphql.from_doc_id("1574519202665847", {"data": data}) ) def acceptUsersToGroup(self, user_ids, thread_id=None): """Accept users to the group from the group's approval. Args: user_ids: One or more user IDs to accept thread_id: Group ID to accept users to. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._usersApproval(user_ids, True, thread_id) def denyUsersFromGroup(self, user_ids, thread_id=None): """Deny users from joining the group. Args: user_ids: One or more user IDs to deny thread_id: Group ID to deny users from. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._usersApproval(user_ids, False, thread_id) def _changeGroupImage(self, image_id, thread_id=None): """Change a thread image from an image id. Args: image_id: ID of uploaded image thread_id: User/Group ID to change image. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = {"thread_image_id": image_id, "thread_id": thread_id} j = self._payload_post("/messaging/set_thread_image/?dpr=1", data) return image_id def changeGroupImageRemote(self, image_url, thread_id=None): """Change a thread image from a URL. Args: image_url: URL of an image to upload and change thread_id: User/Group ID to change image. See :ref:`intro_threads` Raises: FBchatException: If request failed """ (image_id, mimetype), = self._upload(get_files_from_urls([image_url])) return self._changeGroupImage(image_id, thread_id) def changeGroupImageLocal(self, image_path, thread_id=None): """Change a thread image from a local path. Args: image_path: Path of an image to upload and change thread_id: User/Group ID to change image. See :ref:`intro_threads` Raises: FBchatException: If request failed """ with get_files_from_paths([image_path]) as files: (image_id, mimetype), = self._upload(files) return self._changeGroupImage(image_id, thread_id) def changeThreadTitle(self, title, thread_id=None, thread_type=ThreadType.USER): """Change title of a thread. If this is executed on a user thread, this will change the nickname of that user, effectively changing the title. Args: title: New group thread title thread_id: Group ID to change title of. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, thread_type) if thread_type == ThreadType.USER: # The thread is a user, so we change the user's nickname return self.changeNickname( title, thread_id, thread_id=thread_id, thread_type=thread_type ) data = {"thread_name": title, "thread_id": thread_id} j = self._payload_post("/messaging/set_thread_name/?dpr=1", data) def changeNickname( self, nickname, user_id, thread_id=None, thread_type=ThreadType.USER ): """Change the nickname of a user in a thread. Args: nickname: New nickname user_id: User that will have their nickname changed thread_id: User/Group ID to change color of. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, thread_type) data = { "nickname": nickname, "participant_id": user_id, "thread_or_other_fbid": thread_id, } j = self._payload_post( "/messaging/save_thread_nickname/?source=thread_settings&dpr=1", data ) def changeThreadColor(self, color, thread_id=None): """Change thread color. Args: color (ThreadColor): New thread color thread_id: User/Group ID to change color of. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = { "color_choice": color.value if color != ThreadColor.MESSENGER_BLUE else "", "thread_or_other_fbid": thread_id, } j = self._payload_post( "/messaging/save_thread_color/?source=thread_settings&dpr=1", data ) def changeThreadEmoji(self, emoji, thread_id=None): """Change thread color. Note: While changing the emoji, the Facebook web client actually sends multiple different requests, though only this one is required to make the change. Args: color: New thread emoji thread_id: User/Group ID to change emoji of. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = {"emoji_choice": emoji, "thread_or_other_fbid": thread_id} j = self._payload_post( "/messaging/save_thread_emoji/?source=thread_settings&dpr=1", data ) def reactToMessage(self, message_id, reaction): """React to a message, or removes reaction. Args: message_id: :ref:`Message ID ` to react to reaction (MessageReaction): Reaction emoji to use, if None removes reaction Raises: FBchatException: If request failed """ data = { "action": "ADD_REACTION" if reaction else "REMOVE_REACTION", "client_mutation_id": "1", "actor_id": self._uid, "message_id": str(message_id), "reaction": reaction.value if reaction else None, } data = {"doc_id": 1491398900900362, "variables": json.dumps({"data": data})} j = self._payload_post("/webgraphql/mutation", data) handle_graphql_errors(j) def createPlan(self, plan, thread_id=None): """Set a plan. Args: plan (Plan): Plan to set thread_id: User/Group ID to send plan to. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) data = { "event_type": "EVENT", "event_time": plan.time, "title": plan.title, "thread_id": thread_id, "location_id": plan.location_id or "", "location_name": plan.location or "", "acontext": ACONTEXT, } j = self._payload_post("/ajax/eventreminder/create", data) if "error" in j: raise FBchatFacebookError( "Failed creating plan: {}".format(j["error"]), fb_error_message=j["error"], ) def editPlan(self, plan, new_plan): """Edit a plan. Args: plan (Plan): Plan to edit new_plan: New plan Raises: FBchatException: If request failed """ data = { "event_reminder_id": plan.uid, "delete": "false", "date": new_plan.time, "location_name": new_plan.location or "", "location_id": new_plan.location_id or "", "title": new_plan.title, "acontext": ACONTEXT, } j = self._payload_post("/ajax/eventreminder/submit", data) def deletePlan(self, plan): """Delete a plan. Args: plan: Plan to delete Raises: FBchatException: If request failed """ data = {"event_reminder_id": plan.uid, "delete": "true", "acontext": ACONTEXT} j = self._payload_post("/ajax/eventreminder/submit", data) def changePlanParticipation(self, plan, take_part=True): """Change participation in a plan. Args: plan: Plan to take part in or not take_part: Whether to take part in the plan Raises: FBchatException: If request failed """ data = { "event_reminder_id": plan.uid, "guest_state": "GOING" if take_part else "DECLINED", "acontext": ACONTEXT, } j = self._payload_post("/ajax/eventreminder/rsvp", data) def eventReminder(self, thread_id, time, title, location="", location_id=""): """Deprecated. Use :func:`fbchat.Client.createPlan` instead.""" plan = Plan(time=time, title=title, location=location, location_id=location_id) self.createPlan(plan=plan, thread_id=thread_id) def createPoll(self, poll, thread_id=None): """Create poll in a group thread. Args: poll (Poll): Poll to create thread_id: User/Group ID to create poll in. See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) # We're using ordered dictionaries, because the Facebook endpoint that parses # the POST parameters is badly implemented, and deals with ordering the options # wrongly. If you can find a way to fix this for the endpoint, or if you find # another endpoint, please do suggest it ;) data = OrderedDict([("question_text", poll.title), ("target_id", thread_id)]) for i, option in enumerate(poll.options): data["option_text_array[{}]".format(i)] = option.text data["option_is_selected_array[{}]".format(i)] = str(int(option.vote)) j = self._payload_post("/messaging/group_polling/create_poll/?dpr=1", data) if j.get("status") != "success": raise FBchatFacebookError( "Failed creating poll: {}".format(j.get("errorTitle")), fb_error_message=j.get("errorMessage"), ) def updatePollVote(self, poll_id, option_ids=[], new_options=[]): """Update a poll vote. Args: poll_id: ID of the poll to update vote option_ids: List of the option IDs to vote new_options: List of the new option names thread_id: User/Group ID to change status in. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Raises: FBchatException: If request failed """ data = {"question_id": poll_id} for i, option_id in enumerate(option_ids): data["selected_options[{}]".format(i)] = option_id for i, option_text in enumerate(new_options): data["new_options[{}]".format(i)] = option_text j = self._payload_post("/messaging/group_polling/update_vote/?dpr=1", data) if j.get("status") != "success": raise FBchatFacebookError( "Failed updating poll vote: {}".format(j.get("errorTitle")), fb_error_message=j.get("errorMessage"), ) def setTypingStatus(self, status, thread_id=None, thread_type=None): """Set users typing status in a thread. Args: status (TypingStatus): Specify the typing status thread_id: User/Group ID to change status in. See :ref:`intro_threads` thread_type (ThreadType): See :ref:`intro_threads` Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, thread_type) data = { "typ": status.value, "thread": thread_id, "to": thread_id if thread_type == ThreadType.USER else "", "source": "mercury-chat", } j = self._payload_post("/ajax/messaging/typ.php", data) """ END SEND METHODS """ def markAsDelivered(self, thread_id, message_id): """Mark a message as delivered. Args: thread_id: User/Group ID to which the message belongs. See :ref:`intro_threads` message_id: Message ID to set as delivered. See :ref:`intro_threads` Returns: True Raises: FBchatException: If request failed """ data = { "message_ids[0]": message_id, "thread_ids[%s][0]" % thread_id: message_id, } j = self._payload_post("/ajax/mercury/delivery_receipts.php", data) return True def _readStatus(self, read, thread_ids): thread_ids = require_list(thread_ids) data = {"watermarkTimestamp": now(), "shouldSendReadReceipt": "true"} for thread_id in thread_ids: data["ids[{}]".format(thread_id)] = "true" if read else "false" j = self._payload_post("/ajax/mercury/change_read_status.php", data) def markAsRead(self, thread_ids=None): """Mark threads as read. All messages inside the specified threads will be marked as read. Args: thread_ids: User/Group IDs to set as read. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._readStatus(True, thread_ids) def markAsUnread(self, thread_ids=None): """Mark threads as unread. All messages inside the specified threads will be marked as unread. Args: thread_ids: User/Group IDs to set as unread. See :ref:`intro_threads` Raises: FBchatException: If request failed """ self._readStatus(False, thread_ids) def markAsSeen(self): """ Todo: Documenting this """ j = self._payload_post("/ajax/mercury/mark_seen.php", {"seen_timestamp": now()}) def friendConnect(self, friend_id): """ Todo: Documenting this """ data = {"to_friend": friend_id, "action": "confirm"} j = self._payload_post("/ajax/add_friend/action.php?dpr=1", data) def removeFriend(self, friend_id=None): """Remove a specified friend from the client's friend list. Args: friend_id: The ID of the friend that you want to remove Returns: True Raises: FBchatException: If request failed """ data = {"uid": friend_id} j = self._payload_post("/ajax/profile/removefriendconfirm.php", data) return True def blockUser(self, user_id): """Block messages from a specified user. Args: user_id: The ID of the user that you want to block Returns: True Raises: FBchatException: If request failed """ data = {"fbid": user_id} j = self._payload_post("/messaging/block_messages/?dpr=1", data) return True def unblockUser(self, user_id): """Unblock a previously blocked user. Args: user_id: The ID of the user that you want to unblock Returns: Whether the request was successful Raises: FBchatException: If request failed """ data = {"fbid": user_id} j = self._payload_post("/messaging/unblock_messages/?dpr=1", data) return True def moveThreads(self, location, thread_ids): """Move threads to specified location. Args: location (ThreadLocation): INBOX, PENDING, ARCHIVED or OTHER thread_ids: Thread IDs to move. See :ref:`intro_threads` Returns: True Raises: FBchatException: If request failed """ thread_ids = require_list(thread_ids) if location == ThreadLocation.PENDING: location = ThreadLocation.OTHER if location == ThreadLocation.ARCHIVED: data_archive = dict() data_unpin = dict() for thread_id in thread_ids: data_archive["ids[{}]".format(thread_id)] = "true" data_unpin["ids[{}]".format(thread_id)] = "false" j_archive = self._payload_post( "/ajax/mercury/change_archived_status.php?dpr=1", data_archive ) j_unpin = self._payload_post( "/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin ) else: data = dict() for i, thread_id in enumerate(thread_ids): data["{}[{}]".format(location.name.lower(), i)] = thread_id j = self._payload_post("/ajax/mercury/move_thread.php", data) return True def deleteThreads(self, thread_ids): """Delete threads. Args: thread_ids: Thread IDs to delete. See :ref:`intro_threads` Returns: True Raises: FBchatException: If request failed """ thread_ids = require_list(thread_ids) data_unpin = dict() data_delete = dict() for i, thread_id in enumerate(thread_ids): data_unpin["ids[{}]".format(thread_id)] = "false" data_delete["ids[{}]".format(i)] = thread_id j_unpin = self._payload_post( "/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin ) j_delete = self._payload_post( "/ajax/mercury/delete_thread.php?dpr=1", data_delete ) return True def markAsSpam(self, thread_id=None): """Mark a thread as spam, and delete it. Args: thread_id: User/Group ID to mark as spam. See :ref:`intro_threads` Returns: True Raises: FBchatException: If request failed """ thread_id, thread_type = self._getThread(thread_id, None) j = self._payload_post("/ajax/mercury/mark_spam.php?dpr=1", {"id": thread_id}) return True def deleteMessages(self, message_ids): """Delete specified messages. Args: message_ids: Message IDs to delete Returns: True Raises: FBchatException: If request failed """ message_ids = require_list(message_ids) data = dict() for i, message_id in enumerate(message_ids): data["message_ids[{}]".format(i)] = message_id j = self._payload_post("/ajax/mercury/delete_messages.php?dpr=1", data) return True def muteThread(self, mute_time=-1, thread_id=None): """Mute thread. Args: mute_time: Mute time in seconds, leave blank to mute forever thread_id: User/Group ID to mute. See :ref:`intro_threads` """ thread_id, thread_type = self._getThread(thread_id, None) data = {"mute_settings": str(mute_time), "thread_fbid": thread_id} j = self._payload_post("/ajax/mercury/change_mute_thread.php?dpr=1", data) def unmuteThread(self, thread_id=None): """Unmute thread. Args: thread_id: User/Group ID to unmute. See :ref:`intro_threads` """ return self.muteThread(0, thread_id) def muteThreadReactions(self, mute=True, thread_id=None): """Mute thread reactions. Args: mute: Boolean. True to mute, False to unmute thread_id: User/Group ID to mute. See :ref:`intro_threads` """ thread_id, thread_type = self._getThread(thread_id, None) data = {"reactions_mute_mode": int(mute), "thread_fbid": thread_id} j = self._payload_post( "/ajax/mercury/change_reactions_mute_thread/?dpr=1", data ) def unmuteThreadReactions(self, thread_id=None): """Unmute thread reactions. Args: thread_id: User/Group ID to unmute. See :ref:`intro_threads` """ return self.muteThreadReactions(False, thread_id) def muteThreadMentions(self, mute=True, thread_id=None): """Mute thread mentions. Args: mute: Boolean. True to mute, False to unmute thread_id: User/Group ID to mute. See :ref:`intro_threads` """ thread_id, thread_type = self._getThread(thread_id, None) data = {"mentions_mute_mode": int(mute), "thread_fbid": thread_id} j = self._payload_post("/ajax/mercury/change_mentions_mute_thread/?dpr=1", data) def unmuteThreadMentions(self, thread_id=None): """Unmute thread mentions. Args: thread_id: User/Group ID to unmute. See :ref:`intro_threads` """ return self.muteThreadMentions(False, thread_id) """ LISTEN METHODS """ def _ping(self): data = { "seq": self._seq, "channel": "p_" + self._uid, "clientid": self._state._client_id, "partition": -2, "cap": 0, "uid": self._uid, "sticky_token": self._sticky, "sticky_pool": self._pool, "viewer_uid": self._uid, "state": "active", } j = self._get( "https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel), data, ) def _pullMessage(self): """Call pull api to fetch message data.""" data = { "seq": self._seq, "msgs_recv": 0, "sticky_token": self._sticky, "sticky_pool": self._pool, "clientid": self._state._client_id, "state": "active" if self._markAlive else "offline", } return self._get( "https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data ) def _parseDelta(self, m): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" id_thread = None type_thread = None if "threadFbId" in msg_metadata["threadKey"]: id_thread = str(msg_metadata["threadKey"]["threadFbId"]) type_thread = ThreadType.GROUP elif "otherUserFbId" in msg_metadata["threadKey"]: id_thread = str(msg_metadata["threadKey"]["otherUserFbId"]) type_thread = ThreadType.USER return id_thread, type_thread delta = m["delta"] delta_type = delta.get("type") delta_class = delta.get("class") metadata = delta.get("messageMetadata") if metadata: mid = metadata["messageId"] author_id = str(metadata["actorFbId"]) ts = int(metadata.get("timestamp")) # Added participants if "addedParticipants" in delta: added_ids = [str(x["userFbId"]) for x in delta["addedParticipants"]] thread_id = str(metadata["threadKey"]["threadFbId"]) self.onPeopleAdded( mid=mid, added_ids=added_ids, author_id=author_id, thread_id=thread_id, ts=ts, msg=m, ) # Left/removed participants elif "leftParticipantFbId" in delta: removed_id = str(delta["leftParticipantFbId"]) thread_id = str(metadata["threadKey"]["threadFbId"]) self.onPersonRemoved( mid=mid, removed_id=removed_id, author_id=author_id, thread_id=thread_id, ts=ts, msg=m, ) # Color change elif delta_type == "change_thread_theme": new_color = ThreadColor._from_graphql(delta["untypedData"]["theme_color"]) thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onColorChange( mid=mid, author_id=author_id, new_color=new_color, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Emoji change elif delta_type == "change_thread_icon": new_emoji = delta["untypedData"]["thread_icon"] thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onEmojiChange( mid=mid, author_id=author_id, new_emoji=new_emoji, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Thread title change elif delta_class == "ThreadName": new_title = delta["name"] thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onTitleChange( mid=mid, author_id=author_id, new_title=new_title, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Forced fetch elif delta_class == "ForcedFetch": mid = delta.get("messageId") if mid is None: self.onUnknownMesssageType(msg=m) else: thread_id = str(delta["threadKey"]["threadFbId"]) fetch_info = self._forcedFetch(thread_id, mid) fetch_data = fetch_info["message"] author_id = fetch_data["message_sender"]["id"] ts = fetch_data["timestamp_precise"] if fetch_data.get("__typename") == "ThreadImageMessage": # Thread image change image_metadata = fetch_data.get("image_with_metadata") image_id = ( int(image_metadata["legacy_attachment_id"]) if image_metadata else None ) self.onImageChange( mid=mid, author_id=author_id, new_image=image_id, thread_id=thread_id, thread_type=ThreadType.GROUP, ts=ts, msg=m, ) # Nickname change elif delta_type == "change_thread_nickname": changed_for = str(delta["untypedData"]["participant_id"]) new_nickname = delta["untypedData"]["nickname"] thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onNicknameChange( mid=mid, author_id=author_id, changed_for=changed_for, new_nickname=new_nickname, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Admin added or removed in a group thread elif delta_type == "change_thread_admins": thread_id, thread_type = getThreadIdAndThreadType(metadata) target_id = delta["untypedData"]["TARGET_ID"] admin_event = delta["untypedData"]["ADMIN_EVENT"] if admin_event == "add_admin": self.onAdminAdded( mid=mid, added_id=target_id, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) elif admin_event == "remove_admin": self.onAdminRemoved( mid=mid, removed_id=target_id, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) # Group approval mode change elif delta_type == "change_thread_approval_mode": thread_id, thread_type = getThreadIdAndThreadType(metadata) approval_mode = bool(int(delta["untypedData"]["APPROVAL_MODE"])) self.onApprovalModeChange( mid=mid, approval_mode=approval_mode, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) # Message delivered elif delta_class == "DeliveryReceipt": message_ids = delta["messageIds"] delivered_for = str( delta.get("actorFbId") or delta["threadKey"]["otherUserFbId"] ) ts = int(delta["deliveredWatermarkTimestampMs"]) thread_id, thread_type = getThreadIdAndThreadType(delta) self.onMessageDelivered( msg_ids=message_ids, delivered_for=delivered_for, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Message seen elif delta_class == "ReadReceipt": seen_by = str(delta.get("actorFbId") or delta["threadKey"]["otherUserFbId"]) seen_ts = int(delta["actionTimestampMs"]) delivered_ts = int(delta["watermarkTimestampMs"]) thread_id, thread_type = getThreadIdAndThreadType(delta) self.onMessageSeen( seen_by=seen_by, thread_id=thread_id, thread_type=thread_type, seen_ts=seen_ts, ts=delivered_ts, metadata=metadata, msg=m, ) # Messages marked as seen elif delta_class == "MarkRead": seen_ts = int( delta.get("actionTimestampMs") or delta.get("actionTimestamp") ) delivered_ts = int( delta.get("watermarkTimestampMs") or delta.get("watermarkTimestamp") ) threads = [] if "folders" not in delta: threads = [ getThreadIdAndThreadType({"threadKey": thr}) for thr in delta.get("threadKeys") ] # thread_id, thread_type = getThreadIdAndThreadType(delta) self.onMarkedSeen( threads=threads, seen_ts=seen_ts, ts=delivered_ts, metadata=delta, msg=m ) # Game played elif delta_type == "instant_game_update": game_id = delta["untypedData"]["game_id"] game_name = delta["untypedData"]["game_name"] score = delta["untypedData"].get("score") if score is not None: score = int(score) leaderboard = delta["untypedData"].get("leaderboard") if leaderboard is not None: leaderboard = json.loads(leaderboard)["scores"] thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onGamePlayed( mid=mid, author_id=author_id, game_id=game_id, game_name=game_name, score=score, leaderboard=leaderboard, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Group call started/ended elif delta_type == "rtc_call_log": thread_id, thread_type = getThreadIdAndThreadType(metadata) call_status = delta["untypedData"]["event"] call_duration = int(delta["untypedData"]["call_duration"]) is_video_call = bool(int(delta["untypedData"]["is_video_call"])) if call_status == "call_started": self.onCallStarted( mid=mid, caller_id=author_id, is_video_call=is_video_call, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) elif call_status == "call_ended": self.onCallEnded( mid=mid, caller_id=author_id, is_video_call=is_video_call, call_duration=call_duration, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # User joined to group call elif delta_type == "participant_joined_group_call": thread_id, thread_type = getThreadIdAndThreadType(metadata) is_video_call = bool(int(delta["untypedData"]["group_call_type"])) self.onUserJoinedCall( mid=mid, joined_id=author_id, is_video_call=is_video_call, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Group poll event elif delta_type == "group_poll": thread_id, thread_type = getThreadIdAndThreadType(metadata) event_type = delta["untypedData"]["event_type"] poll_json = json.loads(delta["untypedData"]["question_json"]) poll = Poll._from_graphql(poll_json) if event_type == "question_creation": # User created group poll self.onPollCreated( mid=mid, poll=poll, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) elif event_type == "update_vote": # User voted on group poll added_options = json.loads(delta["untypedData"]["added_option_ids"]) removed_options = json.loads(delta["untypedData"]["removed_option_ids"]) self.onPollVoted( mid=mid, poll=poll, added_options=added_options, removed_options=removed_options, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Plan created elif delta_type == "lightweight_event_create": thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onPlanCreated( mid=mid, plan=Plan._from_pull(delta["untypedData"]), author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Plan ended elif delta_type == "lightweight_event_notify": thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onPlanEnded( mid=mid, plan=Plan._from_pull(delta["untypedData"]), thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Plan edited elif delta_type == "lightweight_event_update": thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onPlanEdited( mid=mid, plan=Plan._from_pull(delta["untypedData"]), author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Plan deleted elif delta_type == "lightweight_event_delete": thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onPlanDeleted( mid=mid, plan=Plan._from_pull(delta["untypedData"]), author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Plan participation change elif delta_type == "lightweight_event_rsvp": thread_id, thread_type = getThreadIdAndThreadType(metadata) take_part = delta["untypedData"]["guest_status"] == "GOING" self.onPlanParticipation( mid=mid, plan=Plan._from_pull(delta["untypedData"]), take_part=take_part, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Client payload (that weird numbers) elif delta_class == "ClientPayload": payload = json.loads("".join(chr(z) for z in delta["payload"])) ts = m.get("ofd_ts") for d in payload.get("deltas", []): # Message reaction if d.get("deltaMessageReaction"): i = d["deltaMessageReaction"] thread_id, thread_type = getThreadIdAndThreadType(i) mid = i["messageId"] author_id = str(i["userId"]) reaction = ( MessageReaction(i["reaction"]) if i.get("reaction") else None ) add_reaction = not bool(i["action"]) if add_reaction: self.onReactionAdded( mid=mid, reaction=reaction, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) else: self.onReactionRemoved( mid=mid, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) # Viewer status change elif d.get("deltaChangeViewerStatus"): i = d["deltaChangeViewerStatus"] thread_id, thread_type = getThreadIdAndThreadType(i) author_id = str(i["actorFbid"]) reason = i["reason"] can_reply = i["canViewerReply"] if reason == 2: if can_reply: self.onUnblock( author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) else: self.onBlock( author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) # Live location info elif d.get("liveLocationData"): i = d["liveLocationData"] thread_id, thread_type = getThreadIdAndThreadType(i) for l in i["messageLiveLocations"]: mid = l["messageId"] author_id = str(l["senderId"]) location = LiveLocationAttachment._from_pull(l) self.onLiveLocation( mid=mid, location=location, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) # Message deletion elif d.get("deltaRecallMessageData"): i = d["deltaRecallMessageData"] thread_id, thread_type = getThreadIdAndThreadType(i) mid = i["messageID"] ts = i["deletionTimestamp"] author_id = str(i["senderID"]) self.onMessageUnsent( mid=mid, author_id=author_id, thread_id=thread_id, thread_type=thread_type, ts=ts, msg=m, ) elif d.get("deltaMessageReply"): i = d["deltaMessageReply"] metadata = i["message"]["messageMetadata"] thread_id, thread_type = getThreadIdAndThreadType(metadata) message = Message._from_reply(i["message"]) message.replied_to = Message._from_reply(i["repliedToMessage"]) message.reply_to_id = message.replied_to.uid self.onMessage( mid=message.uid, author_id=message.author, message=message.text, message_object=message, thread_id=thread_id, thread_type=thread_type, ts=message.timestamp, metadata=metadata, msg=m, ) # New message elif delta.get("class") == "NewMessage": thread_id, thread_type = getThreadIdAndThreadType(metadata) self.onMessage( mid=mid, author_id=author_id, message=delta.get("body", ""), message_object=Message._from_pull( delta, mid=mid, tags=metadata.get("tags"), author=author_id, timestamp=ts, ), thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m, ) # Unknown message type else: self.onUnknownMesssageType(msg=m) def _parseMessage(self, content): """Get message and author name from content. May contain multiple messages in the content. """ self._seq = content.get("seq", "0") if "lb_info" in content: self._sticky = content["lb_info"]["sticky"] self._pool = content["lb_info"]["pool"] if "batches" in content: for batch in content["batches"]: self._parseMessage(batch) if "ms" not in content: return for m in content["ms"]: mtype = m.get("type") try: # Things that directly change chat if mtype == "delta": self._parseDelta(m) # Inbox elif mtype == "inbox": self.onInbox( unseen=m["unseen"], unread=m["unread"], recent_unread=m["recent_unread"], msg=m, ) # Typing elif mtype == "typ" or mtype == "ttyp": author_id = str(m.get("from")) thread_id = m.get("thread_fbid") if thread_id: thread_type = ThreadType.GROUP thread_id = str(thread_id) else: thread_type = ThreadType.USER if author_id == self._uid: thread_id = m.get("to") else: thread_id = author_id typing_status = TypingStatus(m.get("st")) self.onTyping( author_id=author_id, status=typing_status, thread_id=thread_id, thread_type=thread_type, msg=m, ) # Delivered # Seen # elif mtype == "m_read_receipt": # # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) elif mtype in ["jewel_requests_add"]: from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) # Happens on every login elif mtype == "qprimer": self.onQprimer(ts=m.get("made"), msg=m) # Is sent before any other message elif mtype == "deltaflow": pass # Chat timestamp elif mtype == "chatproxy-presence": statuses = dict() for id_, data in m.get("buddyList", {}).items(): statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) self._buddylist[id_] = statuses[id_] self.onChatTimestamp(buddylist=statuses, msg=m) # Buddylist overlay elif mtype == "buddylist_overlay": statuses = dict() for id_, data in m.get("overlay", {}).items(): old_in_game = None if id_ in self._buddylist: old_in_game = self._buddylist[id_].in_game statuses[id_] = ActiveStatus._from_buddylist_overlay( data, old_in_game ) self._buddylist[id_] = statuses[id_] self.onBuddylistOverlay(statuses=statuses, msg=m) # Unknown message type else: self.onUnknownMesssageType(msg=m) except Exception as e: self.onMessageError(exception=e, msg=m) def startListening(self): """Start listening from an external event loop. Raises: FBchatException: If request failed """ self.listening = True def doOneListen(self, markAlive=None): """Do one cycle of the listening loop. This method is useful if you want to control the client from an external event loop. Warning: ``markAlive`` parameter is deprecated, use :func:`Client.setActiveStatus` or ``markAlive`` parameter in :func:`Client.listen` instead. Returns: bool: Whether the loop should keep running """ if markAlive is not None: self._markAlive = markAlive try: if self._markAlive: self._ping() content = self._pullMessage() if content: self._parseMessage(content) except KeyboardInterrupt: return False except requests.Timeout: pass except requests.ConnectionError: # If the client has lost their internet connection, keep trying every 30 seconds time.sleep(30) except FBchatFacebookError as e: # Fix 502 and 503 pull errors if e.request_status_code in [502, 503]: # Bump pull channel, while contraining withing 0-4 self._pull_channel = (self._pull_channel + 1) % 5 self.startListening() else: raise e except Exception as e: return self.onListenError(exception=e) return True def stopListening(self): """Clean up the variables from `Client.startListening`.""" self.listening = False self._sticky, self._pool = (None, None) def listen(self, markAlive=None): """Initialize and runs the listening loop continually. Args: markAlive (bool): Whether this should ping the Facebook server each time the loop runs """ if markAlive is not None: self.setActiveStatus(markAlive) self.startListening() self.onListening() while self.listening and self.doOneListen(): pass self.stopListening() def setActiveStatus(self, markAlive): """Change active status while listening. Args: markAlive (bool): Whether to show if client is active """ self._markAlive = markAlive """ END LISTEN METHODS """ """ EVENTS """ def onLoggingIn(self, email=None): """Called when the client is logging in. Args: email: The email of the client """ log.info("Logging in {}...".format(email)) def on2FACode(self): """Called when a 2FA code is needed to progress.""" return input("Please enter your 2FA code --> ") def onLoggedIn(self, email=None): """Called when the client is successfully logged in. Args: email: The email of the client """ log.info("Login of {} successful.".format(email)) def onListening(self): """Called when the client is listening.""" log.info("Listening...") def onListenError(self, exception=None): """Called when an error was encountered while listening. Args: exception: The exception that was encountered Returns: Whether the loop should keep running """ log.exception("Got exception while listening") return True def onMessage( self, mid=None, author_id=None, message=None, message_object=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody sends a message. Args: mid: The message ID author_id: The ID of the author message: (deprecated. Use ``message_object.text`` instead) message_object (Message): The message (As a `Message` object) thread_id: Thread ID that the message was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the message was sent to. See :ref:`intro_threads` ts: The timestamp of the message metadata: Extra metadata about the message msg: A full set of the data received """ log.info("{} from {} in {}".format(message_object, thread_id, thread_type.name)) def onColorChange( self, mid=None, author_id=None, new_color=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody changes a thread's color. Args: mid: The action ID author_id: The ID of the person who changed the color new_color (ThreadColor): The new color thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Color change from {} in {} ({}): {}".format( author_id, thread_id, thread_type.name, new_color ) ) def onEmojiChange( self, mid=None, author_id=None, new_emoji=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody changes a thread's emoji. Args: mid: The action ID author_id: The ID of the person who changed the emoji new_emoji: The new emoji thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Emoji change from {} in {} ({}): {}".format( author_id, thread_id, thread_type.name, new_emoji ) ) def onTitleChange( self, mid=None, author_id=None, new_title=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody changes a thread's title. Args: mid: The action ID author_id: The ID of the person who changed the title new_title: The new title thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Title change from {} in {} ({}): {}".format( author_id, thread_id, thread_type.name, new_title ) ) def onImageChange( self, mid=None, author_id=None, new_image=None, thread_id=None, thread_type=ThreadType.GROUP, ts=None, msg=None, ): """Called when the client is listening, and somebody changes a thread's image. Args: mid: The action ID author_id: The ID of the person who changed the image new_image: The ID of the new image thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info("{} changed thread image in {}".format(author_id, thread_id)) def onNicknameChange( self, mid=None, author_id=None, changed_for=None, new_nickname=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody changes a nickname. Args: mid: The action ID author_id: The ID of the person who changed the nickname changed_for: The ID of the person whom got their nickname changed new_nickname: The new nickname thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Nickname change from {} in {} ({}) for {}: {}".format( author_id, thread_id, thread_type.name, changed_for, new_nickname ) ) def onAdminAdded( self, mid=None, added_id=None, author_id=None, thread_id=None, thread_type=ThreadType.GROUP, ts=None, msg=None, ): """Called when the client is listening, and somebody adds an admin to a group. Args: mid: The action ID added_id: The ID of the admin who got added author_id: The ID of the person who added the admins thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info("{} added admin: {} in {}".format(author_id, added_id, thread_id)) def onAdminRemoved( self, mid=None, removed_id=None, author_id=None, thread_id=None, thread_type=ThreadType.GROUP, ts=None, msg=None, ): """Called when the client is listening, and somebody is removed as an admin in a group. Args: mid: The action ID removed_id: The ID of the admin who got removed author_id: The ID of the person who removed the admins thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info("{} removed admin: {} in {}".format(author_id, removed_id, thread_id)) def onApprovalModeChange( self, mid=None, approval_mode=None, author_id=None, thread_id=None, thread_type=ThreadType.GROUP, ts=None, msg=None, ): """Called when the client is listening, and somebody changes approval mode in a group. Args: mid: The action ID approval_mode: True if approval mode is activated author_id: The ID of the person who changed approval mode thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ if approval_mode: log.info("{} activated approval mode in {}".format(author_id, thread_id)) else: log.info("{} disabled approval mode in {}".format(author_id, thread_id)) def onMessageSeen( self, seen_by=None, thread_id=None, thread_type=ThreadType.USER, seen_ts=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody marks a message as seen. Args: seen_by: The ID of the person who marked the message as seen thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` seen_ts: A timestamp of when the person saw the message ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Messages seen by {} in {} ({}) at {}s".format( seen_by, thread_id, thread_type.name, seen_ts / 1000 ) ) def onMessageDelivered( self, msg_ids=None, delivered_for=None, thread_id=None, thread_type=ThreadType.USER, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody marks messages as delivered. Args: msg_ids: The messages that are marked as delivered delivered_for: The person that marked the messages as delivered thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Messages {} delivered to {} in {} ({}) at {}s".format( msg_ids, delivered_for, thread_id, thread_type.name, ts / 1000 ) ) def onMarkedSeen( self, threads=None, seen_ts=None, ts=None, metadata=None, msg=None ): """Called when the client is listening, and the client has successfully marked threads as seen. Args: threads: The threads that were marked author_id: The ID of the person who changed the emoji seen_ts: A timestamp of when the threads were seen ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Marked messages as seen in threads {} at {}s".format( [(x[0], x[1].name) for x in threads], seen_ts / 1000 ) ) def onMessageUnsent( self, mid=None, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None, ): """Called when the client is listening, and someone unsends (deletes for everyone) a message. Args: mid: ID of the unsent message author_id: The ID of the person who unsent the message thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} unsent the message {} in {} ({}) at {}s".format( author_id, repr(mid), thread_id, thread_type.name, ts / 1000 ) ) def onPeopleAdded( self, mid=None, added_ids=None, author_id=None, thread_id=None, ts=None, msg=None, ): """Called when the client is listening, and somebody adds people to a group thread. Args: mid: The action ID added_ids: The IDs of the people who got added author_id: The ID of the person who added the people thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} added: {} in {}".format(author_id, ", ".join(added_ids), thread_id) ) def onPersonRemoved( self, mid=None, removed_id=None, author_id=None, thread_id=None, ts=None, msg=None, ): """Called when the client is listening, and somebody removes a person from a group thread. Args: mid: The action ID removed_id: The ID of the person who got removed author_id: The ID of the person who removed the person thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info("{} removed: {} in {}".format(author_id, removed_id, thread_id)) def onFriendRequest(self, from_id=None, msg=None): """Called when the client is listening, and somebody sends a friend request. Args: from_id: The ID of the person that sent the request msg: A full set of the data received """ log.info("Friend request from {}".format(from_id)) def onInbox(self, unseen=None, unread=None, recent_unread=None, msg=None): """ Todo: Documenting this Args: unseen: -- unread: -- recent_unread: -- msg: A full set of the data received """ log.info("Inbox event: {}, {}, {}".format(unseen, unread, recent_unread)) def onTyping( self, author_id=None, status=None, thread_id=None, thread_type=None, msg=None ): """Called when the client is listening, and somebody starts or stops typing into a chat. Args: author_id: The ID of the person who sent the action status (TypingStatus): The typing status thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` msg: A full set of the data received """ pass def onGamePlayed( self, mid=None, author_id=None, game_id=None, game_name=None, score=None, leaderboard=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody plays a game. Args: mid: The action ID author_id: The ID of the person who played the game game_id: The ID of the game game_name: Name of the game score: Score obtained in the game leaderboard: Actual leader board of the game in the thread thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( '{} played "{}" in {} ({})'.format( author_id, game_name, thread_id, thread_type.name ) ) def onReactionAdded( self, mid=None, reaction=None, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None, ): """Called when the client is listening, and somebody reacts to a message. Args: mid: Message ID, that user reacted to reaction (MessageReaction): Reaction add_reaction: Whether user added or removed reaction author_id: The ID of the person who reacted to the message thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} reacted to message {} with {} in {} ({})".format( author_id, mid, reaction.name, thread_id, thread_type.name ) ) def onReactionRemoved( self, mid=None, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None, ): """Called when the client is listening, and somebody removes reaction from a message. Args: mid: Message ID, that user reacted to author_id: The ID of the person who removed reaction thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} removed reaction from {} message in {} ({})".format( author_id, mid, thread_id, thread_type ) ) def onBlock( self, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None ): """Called when the client is listening, and somebody blocks client. Args: author_id: The ID of the person who blocked thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} blocked {} ({}) thread".format(author_id, thread_id, thread_type.name) ) def onUnblock( self, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None ): """Called when the client is listening, and somebody blocks client. Args: author_id: The ID of the person who unblocked thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} unblocked {} ({}) thread".format(author_id, thread_id, thread_type.name) ) def onLiveLocation( self, mid=None, location=None, author_id=None, thread_id=None, thread_type=None, ts=None, msg=None, ): """Called when the client is listening and somebody sends live location info. Args: mid: The action ID location (LiveLocationAttachment): Sent location info author_id: The ID of the person who sent location info thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action msg: A full set of the data received """ log.info( "{} sent live location info in {} ({}) with latitude {} and longitude {}".format( author_id, thread_id, thread_type, location.latitude, location.longitude ) ) def onCallStarted( self, mid=None, caller_id=None, is_video_call=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody starts a call in a group. Todo: Make this work with private calls. Args: mid: The action ID caller_id: The ID of the person who started the call is_video_call: True if it's video call thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} started call in {} ({})".format(caller_id, thread_id, thread_type.name) ) def onCallEnded( self, mid=None, caller_id=None, is_video_call=None, call_duration=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody ends a call in a group. Todo: Make this work with private calls. Args: mid: The action ID caller_id: The ID of the person who ended the call is_video_call: True if it was video call call_duration: Call duration in seconds thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} ended call in {} ({})".format(caller_id, thread_id, thread_type.name) ) def onUserJoinedCall( self, mid=None, joined_id=None, is_video_call=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody joins a group call. Args: mid: The action ID joined_id: The ID of the person who joined the call is_video_call: True if it's video call thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} joined call in {} ({})".format(joined_id, thread_id, thread_type.name) ) def onPollCreated( self, mid=None, poll=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody creates a group poll. Args: mid: The action ID poll (Poll): Created poll author_id: The ID of the person who created the poll thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} created poll {} in {} ({})".format( author_id, poll, thread_id, thread_type.name ) ) def onPollVoted( self, mid=None, poll=None, added_options=None, removed_options=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody votes in a group poll. Args: mid: The action ID poll (Poll): Poll, that user voted in author_id: The ID of the person who voted in the poll thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} voted in poll {} in {} ({})".format( author_id, poll, thread_id, thread_type.name ) ) def onPlanCreated( self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody creates a plan. Args: mid: The action ID plan (Plan): Created plan author_id: The ID of the person who created the plan thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} created plan {} in {} ({})".format( author_id, plan, thread_id, thread_type.name ) ) def onPlanEnded( self, mid=None, plan=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and a plan ends. Args: mid: The action ID plan (Plan): Ended plan thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "Plan {} has ended in {} ({})".format(plan, thread_id, thread_type.name) ) def onPlanEdited( self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody edits a plan. Args: mid: The action ID plan (Plan): Edited plan author_id: The ID of the person who edited the plan thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} edited plan {} in {} ({})".format( author_id, plan, thread_id, thread_type.name ) ) def onPlanDeleted( self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody deletes a plan. Args: mid: The action ID plan (Plan): Deleted plan author_id: The ID of the person who deleted the plan thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ log.info( "{} deleted plan {} in {} ({})".format( author_id, plan, thread_id, thread_type.name ) ) def onPlanParticipation( self, mid=None, plan=None, take_part=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None, ): """Called when the client is listening, and somebody takes part in a plan or not. Args: mid: The action ID plan (Plan): Plan take_part (bool): Whether the person takes part in the plan or not author_id: The ID of the person who will participate in the plan or not thread_id: Thread ID that the action was sent to. See :ref:`intro_threads` thread_type (ThreadType): Type of thread that the action was sent to. See :ref:`intro_threads` ts: A timestamp of the action metadata: Extra metadata about the action msg: A full set of the data received """ if take_part: log.info( "{} will take part in {} in {} ({})".format( author_id, plan, thread_id, thread_type.name ) ) else: log.info( "{} won't take part in {} in {} ({})".format( author_id, plan, thread_id, thread_type.name ) ) def onQprimer(self, ts=None, msg=None): """Called when the client just started listening. Args: ts: A timestamp of the action msg: A full set of the data received """ pass def onChatTimestamp(self, buddylist=None, msg=None): """Called when the client receives chat online presence update. Args: buddylist: A list of dictionaries with friend id and last seen timestamp msg: A full set of the data received """ log.debug("Chat Timestamps received: {}".format(buddylist)) def onBuddylistOverlay(self, statuses=None, msg=None): """Called when the client is listening and client receives information about friend active status. Args: statuses (dict): Dictionary with user IDs as keys and :class:`ActiveStatus` as values msg: A full set of the data received """ log.debug("Buddylist overlay received: {}".format(statuses)) def onUnknownMesssageType(self, msg=None): """Called when the client is listening, and some unknown data was received. Args: msg: A full set of the data received """ log.debug("Unknown message received: {}".format(msg)) def onMessageError(self, exception=None, msg=None): """Called when an error was encountered while parsing received data. Args: exception: The exception that was encountered msg: A full set of the data received """ log.exception("Exception in parsing of {}".format(msg)) """ END EVENTS """ PKt%OHEfbchat/_core.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import logging import aenum log = logging.getLogger("client") class Enum(aenum.Enum): """Used internally by ``fbchat`` to support enumerations""" def __repr__(self): # For documentation: return "{}.{}".format(type(self).__name__, self.name) @classmethod def _extend_if_invalid(cls, value): try: return cls(value) except ValueError: log.warning( "Failed parsing {.__name__}({!r}). Extending enum.".format(cls, value) ) aenum.extend_enum(cls, "UNKNOWN_{}".format(value).upper(), value) return cls(value) PKt%Ofwfbchat/_exception.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals class FBchatException(Exception): """Custom exception thrown by ``fbchat``. All exceptions in the ``fbchat`` module inherits this. """ class FBchatFacebookError(FBchatException): #: The error code that Facebook returned fb_error_code = None #: The error message that Facebook returned (In the user's own language) fb_error_message = None #: The status code that was sent in the HTTP response (e.g. 404) (Usually only set if not successful, aka. not 200) request_status_code = None def __init__( self, message, fb_error_code=None, fb_error_message=None, request_status_code=None, ): super(FBchatFacebookError, self).__init__(message) """Thrown by ``fbchat`` when Facebook returns an error""" self.fb_error_code = str(fb_error_code) self.fb_error_message = fb_error_message self.request_status_code = request_status_code class FBchatInvalidParameters(FBchatFacebookError): """Raised by Facebook if: - Some function supplied invalid parameters. - Some content is not found. - Some content is no longer available. """ class FBchatNotLoggedIn(FBchatFacebookError): """Raised by Facebook if the client has been logged out.""" fb_error_code = "1357001" class FBchatPleaseRefresh(FBchatFacebookError): """Raised by Facebook if the client has been inactive for too long. This error usually happens after 1-2 days of inactivity. """ fb_error_code = "1357004" fb_error_message = "Please try closing and re-opening your browser window." class FBchatUserError(FBchatException): """Thrown by ``fbchat`` when wrong values are entered.""" PKt%O6&&fbchat/_file.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._attachment import Attachment @attr.s(cmp=False) class FileAttachment(Attachment): """Represents a file that has been sent as a Facebook attachment.""" #: URL where you can download the file url = attr.ib(None) #: Size of the file in bytes size = attr.ib(None) #: Name of the file name = attr.ib(None) #: Whether Facebook determines that this file may be harmful is_malicious = attr.ib(None) # Put here for backwards compatibility, so that the init argument order is preserved uid = attr.ib(None) @classmethod def _from_graphql(cls, data): return cls( url=data.get("url"), name=data.get("filename"), is_malicious=data.get("is_malicious"), uid=data.get("message_file_fbid"), ) @attr.s(cmp=False) class AudioAttachment(Attachment): """Represents an audio file that has been sent as a Facebook attachment.""" #: Name of the file filename = attr.ib(None) #: URL of the audio file url = attr.ib(None) #: Duration of the audio clip in milliseconds duration = attr.ib(None) #: Audio type audio_type = attr.ib(None) # Put here for backwards compatibility, so that the init argument order is preserved uid = attr.ib(None) @classmethod def _from_graphql(cls, data): return cls( filename=data.get("filename"), url=data.get("playable_url"), duration=data.get("playable_duration_in_ms"), audio_type=data.get("audio_type"), ) @attr.s(cmp=False, init=False) class ImageAttachment(Attachment): """Represents an image that has been sent as a Facebook attachment. To retrieve the full image URL, use: `Client.fetchImageUrl`, and pass it the id of the image attachment. """ #: The extension of the original image (e.g. ``png``) original_extension = attr.ib(None) #: Width of original image width = attr.ib(None, converter=lambda x: None if x is None else int(x)) #: Height of original image height = attr.ib(None, converter=lambda x: None if x is None else int(x)) #: Whether the image is animated is_animated = attr.ib(None) #: URL to a thumbnail of the image thumbnail_url = attr.ib(None) #: URL to a medium preview of the image preview_url = attr.ib(None) #: Width of the medium preview image preview_width = attr.ib(None) #: Height of the medium preview image preview_height = attr.ib(None) #: URL to a large preview of the image large_preview_url = attr.ib(None) #: Width of the large preview image large_preview_width = attr.ib(None) #: Height of the large preview image large_preview_height = attr.ib(None) #: URL to an animated preview of the image (e.g. for GIFs) animated_preview_url = attr.ib(None) #: Width of the animated preview image animated_preview_width = attr.ib(None) #: Height of the animated preview image animated_preview_height = attr.ib(None) def __init__( self, original_extension=None, width=None, height=None, is_animated=None, thumbnail_url=None, preview=None, large_preview=None, animated_preview=None, **kwargs ): super(ImageAttachment, self).__init__(**kwargs) self.original_extension = original_extension if width is not None: width = int(width) self.width = width if height is not None: height = int(height) self.height = height self.is_animated = is_animated self.thumbnail_url = thumbnail_url if preview is None: preview = {} self.preview_url = preview.get("uri") self.preview_width = preview.get("width") self.preview_height = preview.get("height") if large_preview is None: large_preview = {} self.large_preview_url = large_preview.get("uri") self.large_preview_width = large_preview.get("width") self.large_preview_height = large_preview.get("height") if animated_preview is None: animated_preview = {} self.animated_preview_url = animated_preview.get("uri") self.animated_preview_width = animated_preview.get("width") self.animated_preview_height = animated_preview.get("height") @classmethod def _from_graphql(cls, data): return cls( original_extension=data.get("original_extension") or (data["filename"].split("-")[0] if data.get("filename") else None), width=data.get("original_dimensions", {}).get("width"), height=data.get("original_dimensions", {}).get("height"), is_animated=data["__typename"] == "MessageAnimatedImage", thumbnail_url=data.get("thumbnail", {}).get("uri"), preview=data.get("preview") or data.get("preview_image"), large_preview=data.get("large_preview"), animated_preview=data.get("animated_image"), uid=data.get("legacy_attachment_id"), ) @classmethod def _from_list(cls, data): data = data["node"] return cls( width=data["original_dimensions"].get("x"), height=data["original_dimensions"].get("y"), thumbnail_url=data["image"].get("uri"), large_preview=data["image2"], preview=data["image1"], uid=data["legacy_attachment_id"], ) @attr.s(cmp=False, init=False) class VideoAttachment(Attachment): """Represents a video that has been sent as a Facebook attachment.""" #: Size of the original video in bytes size = attr.ib(None) #: Width of original video width = attr.ib(None) #: Height of original video height = attr.ib(None) #: Length of video in milliseconds duration = attr.ib(None) #: URL to very compressed preview video preview_url = attr.ib(None) #: URL to a small preview image of the video small_image_url = attr.ib(None) #: Width of the small preview image small_image_width = attr.ib(None) #: Height of the small preview image small_image_height = attr.ib(None) #: URL to a medium preview image of the video medium_image_url = attr.ib(None) #: Width of the medium preview image medium_image_width = attr.ib(None) #: Height of the medium preview image medium_image_height = attr.ib(None) #: URL to a large preview image of the video large_image_url = attr.ib(None) #: Width of the large preview image large_image_width = attr.ib(None) #: Height of the large preview image large_image_height = attr.ib(None) def __init__( self, size=None, width=None, height=None, duration=None, preview_url=None, small_image=None, medium_image=None, large_image=None, **kwargs ): super(VideoAttachment, self).__init__(**kwargs) self.size = size self.width = width self.height = height self.duration = duration self.preview_url = preview_url if small_image is None: small_image = {} self.small_image_url = small_image.get("uri") self.small_image_width = small_image.get("width") self.small_image_height = small_image.get("height") if medium_image is None: medium_image = {} self.medium_image_url = medium_image.get("uri") self.medium_image_width = medium_image.get("width") self.medium_image_height = medium_image.get("height") if large_image is None: large_image = {} self.large_image_url = large_image.get("uri") self.large_image_width = large_image.get("width") self.large_image_height = large_image.get("height") @classmethod def _from_graphql(cls, data): return cls( width=data.get("original_dimensions", {}).get("width"), height=data.get("original_dimensions", {}).get("height"), duration=data.get("playable_duration_in_ms"), preview_url=data.get("playable_url"), small_image=data.get("chat_image"), medium_image=data.get("inbox_image"), large_image=data.get("large_image"), uid=data.get("legacy_attachment_id"), ) @classmethod def _from_subattachment(cls, data): media = data["media"] return cls( duration=media.get("playable_duration_in_ms"), preview_url=media.get("playable_url"), medium_image=media.get("image"), uid=data["target"].get("video_id"), ) @classmethod def _from_list(cls, data): data = data["node"] return cls( width=data["original_dimensions"].get("x"), height=data["original_dimensions"].get("y"), small_image=data["image"], medium_image=data["image1"], large_image=data["image2"], uid=data["legacy_attachment_id"], ) def graphql_to_attachment(data): _type = data["__typename"] if _type in ["MessageImage", "MessageAnimatedImage"]: return ImageAttachment._from_graphql(data) elif _type == "MessageVideo": return VideoAttachment._from_graphql(data) elif _type == "MessageAudio": return AudioAttachment._from_graphql(data) elif _type == "MessageFile": return FileAttachment._from_graphql(data) return Attachment(uid=data.get("legacy_attachment_id")) def graphql_to_subattachment(data): target = data.get("target") type_ = target.get("__typename") if target else None if type_ == "Video": return VideoAttachment._from_subattachment(data) return None PKt%O>V=GGfbchat/_graphql.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import json import re from . import _util from ._exception import FBchatException # Shameless copy from https://stackoverflow.com/a/8730674 FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL WHITESPACE = re.compile(r"[ \t\n\r]*", FLAGS) class ConcatJSONDecoder(json.JSONDecoder): def decode(self, s, _w=WHITESPACE.match): s_len = len(s) objs = [] end = 0 while end != s_len: obj, end = self.raw_decode(s, idx=_w(s, end).end()) end = _w(s, end).end() objs.append(obj) return objs # End shameless copy def queries_to_json(*queries): """ Queries should be a list of GraphQL objects """ rtn = {} for i, query in enumerate(queries): rtn["q{}".format(i)] = query return json.dumps(rtn) def response_to_json(content): content = _util.strip_json_cruft(content) # Usually only needed in some error cases try: j = json.loads(content, cls=ConcatJSONDecoder) except Exception: raise FBchatException("Error while parsing JSON: {}".format(repr(content))) rtn = [None] * (len(j)) for x in j: if "error_results" in x: del rtn[-1] continue _util.handle_payload_error(x) [(key, value)] = x.items() _util.handle_graphql_errors(value) if "response" in value: rtn[int(key[1:])] = value["response"] else: rtn[int(key[1:])] = value["data"] _util.log.debug(rtn) return rtn def from_query(query, params): return {"priority": 0, "q": query, "query_params": params} def from_query_id(query_id, params): return {"query_id": query_id, "query_params": params} def from_doc(doc, params): return {"doc": doc, "query_params": params} def from_doc_id(doc_id, params): return {"doc_id": doc_id, "query_params": params} FRAGMENT_USER = """ QueryFragment User: User { id, name, first_name, last_name, profile_picture.width().height() { uri }, is_viewer_friend, url, gender, viewer_affinity } """ FRAGMENT_GROUP = """ QueryFragment Group: MessageThread { name, thread_key { thread_fbid }, image { uri }, is_group_thread, all_participants { nodes { messaging_actor { id } } }, customization_info { participant_customizations { participant_id, nickname }, outgoing_bubble_color, emoji }, thread_admins { id }, group_approval_queue { nodes { requester { id } } }, approval_mode, joinable_mode { mode, link }, event_reminders { nodes { id, lightweight_event_creator { id }, time, location_name, event_title, event_reminder_members { edges { node { id }, guest_list_state } } } } } """ FRAGMENT_PAGE = """ QueryFragment Page: Page { id, name, profile_picture.width(32).height(32) { uri }, url, category_type, city { name } } """ SEARCH_USER = ( """ Query SearchUser( = '', = 10) { entities_named() { search_results.of_type(user).first() as users { nodes { @User } } } } """ + FRAGMENT_USER ) SEARCH_GROUP = ( """ Query SearchGroup( = '', = 10, = 32) { viewer() { message_threads.with_thread_name().last() as groups { nodes { @Group } } } } """ + FRAGMENT_GROUP ) SEARCH_PAGE = ( """ Query SearchPage( = '', = 10) { entities_named() { search_results.of_type(page).first() as pages { nodes { @Page } } } } """ + FRAGMENT_PAGE ) SEARCH_THREAD = ( """ Query SearchThread( = '', = 10) { entities_named() { search_results.first() as threads { nodes { __typename, @User, @Group, @Page } } } } """ + FRAGMENT_USER + FRAGMENT_GROUP + FRAGMENT_PAGE ) PKt%O  fbchat/_group.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from . import _plan from ._thread import ThreadType, Thread @attr.s(cmp=False, init=False) class Group(Thread): """Represents a Facebook group. Inherits `Thread`.""" #: Unique list (set) of the group thread's participant user IDs participants = attr.ib(factory=set, converter=lambda x: set() if x is None else x) #: A dictionary, containing user nicknames mapped to their IDs nicknames = attr.ib(factory=dict, converter=lambda x: {} if x is None else x) #: A :class:`ThreadColor`. The groups's message color color = attr.ib(None) #: The groups's default emoji emoji = attr.ib(None) # Set containing user IDs of thread admins admins = attr.ib(factory=set, converter=lambda x: set() if x is None else x) # True if users need approval to join approval_mode = attr.ib(None) # Set containing user IDs requesting to join approval_requests = attr.ib( factory=set, converter=lambda x: set() if x is None else x ) # Link for joining group join_link = attr.ib(None) def __init__( self, uid, participants=None, nicknames=None, color=None, emoji=None, admins=None, approval_mode=None, approval_requests=None, join_link=None, privacy_mode=None, **kwargs ): super(Group, self).__init__(ThreadType.GROUP, uid, **kwargs) if participants is None: participants = set() self.participants = participants if nicknames is None: nicknames = [] self.nicknames = nicknames self.color = color self.emoji = emoji if admins is None: admins = set() self.admins = admins self.approval_mode = approval_mode if approval_requests is None: approval_requests = set() self.approval_requests = approval_requests self.join_link = join_link @classmethod def _from_graphql(cls, data): if data.get("image") is None: data["image"] = {} c_info = cls._parse_customization_info(data) last_message_timestamp = None if "last_message" in data: last_message_timestamp = data["last_message"]["nodes"][0][ "timestamp_precise" ] plan = None if data.get("event_reminders") and data["event_reminders"].get("nodes"): plan = _plan.Plan._from_graphql(data["event_reminders"]["nodes"][0]) return cls( data["thread_key"]["thread_fbid"], participants=set( [ node["messaging_actor"]["id"] for node in data["all_participants"]["nodes"] ] ), nicknames=c_info.get("nicknames"), color=c_info.get("color"), emoji=c_info.get("emoji"), admins=set([node.get("id") for node in data.get("thread_admins")]), approval_mode=bool(data.get("approval_mode")) if data.get("approval_mode") is not None else None, approval_requests=set( node["requester"]["id"] for node in data["group_approval_queue"]["nodes"] ) if data.get("group_approval_queue") else None, join_link=data["joinable_mode"].get("link"), photo=data["image"].get("uri"), name=data.get("name"), message_count=data.get("messages_count"), last_message_timestamp=last_message_timestamp, plan=plan, ) def _to_send_data(self): return {"thread_fbid": self.uid} @attr.s(cmp=False, init=False) class Room(Group): """Deprecated. Use `Group` instead.""" # True is room is not discoverable privacy_mode = attr.ib(None) def __init__(self, uid, privacy_mode=None, **kwargs): super(Room, self).__init__(uid, **kwargs) self.type = ThreadType.ROOM self.privacy_mode = privacy_mode PKt%OQU&^^fbchat/_location.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._attachment import Attachment from . import _util @attr.s(cmp=False) class LocationAttachment(Attachment): """Represents a user location. Latitude and longitude OR address is provided by Facebook. """ #: Latitude of the location latitude = attr.ib(None) #: Longitude of the location longitude = attr.ib(None) #: URL of image showing the map of the location image_url = attr.ib(None, init=False) #: Width of the image image_width = attr.ib(None, init=False) #: Height of the image image_height = attr.ib(None, init=False) #: URL to Bing maps with the location url = attr.ib(None, init=False) # Address of the location address = attr.ib(None) # Put here for backwards compatibility, so that the init argument order is preserved uid = attr.ib(None) @classmethod def _from_graphql(cls, data): url = data.get("url") address = _util.get_url_parameter(_util.get_url_parameter(url, "u"), "where1") try: latitude, longitude = [float(x) for x in address.split(", ")] address = None except ValueError: latitude, longitude = None, None rtn = cls( uid=int(data["deduplication_key"]), latitude=latitude, longitude=longitude, address=address, ) media = data.get("media") if media and media.get("image"): image = media["image"] rtn.image_url = image.get("uri") rtn.image_width = image.get("width") rtn.image_height = image.get("height") rtn.url = url return rtn @attr.s(cmp=False, init=False) class LiveLocationAttachment(LocationAttachment): """Represents a live user location.""" #: Name of the location name = attr.ib(None) #: Timestamp when live location expires expiration_time = attr.ib(None) #: True if live location is expired is_expired = attr.ib(None) def __init__(self, name=None, expiration_time=None, is_expired=None, **kwargs): super(LiveLocationAttachment, self).__init__(**kwargs) self.expiration_time = expiration_time self.is_expired = is_expired @classmethod def _from_pull(cls, data): return cls( uid=data["id"], latitude=data["coordinate"]["latitude"] / (10 ** 8) if not data.get("stopReason") else None, longitude=data["coordinate"]["longitude"] / (10 ** 8) if not data.get("stopReason") else None, name=data.get("locationTitle"), expiration_time=data["expirationTime"], is_expired=bool(data.get("stopReason")), ) @classmethod def _from_graphql(cls, data): target = data["target"] rtn = cls( uid=int(target["live_location_id"]), latitude=target["coordinate"]["latitude"] if target.get("coordinate") else None, longitude=target["coordinate"]["longitude"] if target.get("coordinate") else None, name=data["title_with_entities"]["text"], expiration_time=target.get("expiration_time"), is_expired=target.get("is_expired"), ) media = data.get("media") if media and media.get("image"): image = media["image"] rtn.image_url = image.get("uri") rtn.image_width = image.get("width") rtn.image_height = image.get("height") rtn.url = data.get("url") return rtn PKt%OE;E;fbchat/_message.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr import json from string import Formatter from . import _util, _attachment, _location, _file, _quick_reply, _sticker from ._core import Enum class EmojiSize(Enum): """Used to specify the size of a sent emoji.""" LARGE = "369239383222810" MEDIUM = "369239343222814" SMALL = "369239263222822" @classmethod def _from_tags(cls, tags): string_to_emojisize = { "large": cls.LARGE, "medium": cls.MEDIUM, "small": cls.SMALL, "l": cls.LARGE, "m": cls.MEDIUM, "s": cls.SMALL, } for tag in tags or (): data = tag.split(":", 1) if len(data) > 1 and data[0] == "hot_emoji_size": return string_to_emojisize.get(data[1]) return None class MessageReaction(Enum): """Used to specify a message reaction.""" HEART = "❤" LOVE = "😍" SMILE = "😆" WOW = "😮" SAD = "😢" ANGRY = "😠" YES = "👍" NO = "👎" @attr.s(cmp=False) class Mention(object): """Represents a ``@mention``.""" #: The thread ID the mention is pointing at thread_id = attr.ib() #: The character where the mention starts offset = attr.ib(0) #: The length of the mention length = attr.ib(10) @attr.s(cmp=False) class Message(object): """Represents a Facebook message.""" #: The actual message text = attr.ib(None) #: A list of :class:`Mention` objects mentions = attr.ib(factory=list, converter=lambda x: [] if x is None else x) #: A :class:`EmojiSize`. Size of a sent emoji emoji_size = attr.ib(None) #: The message ID uid = attr.ib(None, init=False) #: ID of the sender author = attr.ib(None, init=False) #: Timestamp of when the message was sent timestamp = attr.ib(None, init=False) #: Whether the message is read is_read = attr.ib(None, init=False) #: A list of people IDs who read the message, works only with :func:`fbchat.Client.fetchThreadMessages` read_by = attr.ib(factory=list, init=False) #: A dictionary with user's IDs as keys, and their :class:`MessageReaction` as values reactions = attr.ib(factory=dict, init=False) #: A :class:`Sticker` sticker = attr.ib(None) #: A list of attachments attachments = attr.ib(factory=list, converter=lambda x: [] if x is None else x) #: A list of :class:`QuickReply` quick_replies = attr.ib(factory=list, converter=lambda x: [] if x is None else x) #: Whether the message is unsent (deleted for everyone) unsent = attr.ib(False, init=False) #: Message ID you want to reply to reply_to_id = attr.ib(None) #: Replied message replied_to = attr.ib(None, init=False) #: Whether the message was forwarded forwarded = attr.ib(False, init=False) @classmethod def formatMentions(cls, text, *args, **kwargs): """Like `str.format`, but takes tuples with a thread id and text instead. Return a `Message` object, with the formatted string and relevant mentions. >>> Message.formatMentions("Hey {!r}! My name is {}", ("1234", "Peter"), ("4321", "Michael")) , ] emoji_size=None attachments=[]> >>> Message.formatMentions("Hey {p}! My name is {}", ("1234", "Michael"), p=("4321", "Peter")) , ] emoji_size=None attachments=[]> """ result = "" mentions = list() offset = 0 f = Formatter() field_names = [field_name[1] for field_name in f.parse(text)] automatic = "" in field_names i = 0 for (literal_text, field_name, format_spec, conversion) in f.parse(text): offset += len(literal_text) result += literal_text if field_name is None: continue if field_name == "": field_name = str(i) i += 1 elif automatic and field_name.isdigit(): raise ValueError( "cannot switch from automatic field numbering to manual field specification" ) thread_id, name = f.get_field(field_name, args, kwargs)[0] if format_spec: name = f.format_field(name, format_spec) if conversion: name = f.convert_field(name, conversion) result += name mentions.append( Mention(thread_id=thread_id, offset=offset, length=len(name)) ) offset += len(name) message = cls(text=result, mentions=mentions) return message @staticmethod def _get_forwarded_from_tags(tags): if tags is None: return False return any(map(lambda tag: "forward" in tag or "copy" in tag, tags)) def _to_send_data(self): data = {} if self.text or self.sticker or self.emoji_size: data["action_type"] = "ma-type:user-generated-message" if self.text: data["body"] = self.text for i, mention in enumerate(self.mentions): data["profile_xmd[{}][id]".format(i)] = mention.thread_id data["profile_xmd[{}][offset]".format(i)] = mention.offset data["profile_xmd[{}][length]".format(i)] = mention.length data["profile_xmd[{}][type]".format(i)] = "p" if self.emoji_size: if self.text: data["tags[0]"] = "hot_emoji_size:" + self.emoji_size.name.lower() else: data["sticker_id"] = self.emoji_size.value if self.sticker: data["sticker_id"] = self.sticker.uid if self.quick_replies: xmd = {"quick_replies": []} for quick_reply in self.quick_replies: # TODO: Move this to `_quick_reply.py` q = dict() q["content_type"] = quick_reply._type q["payload"] = quick_reply.payload q["external_payload"] = quick_reply.external_payload q["data"] = quick_reply.data if quick_reply.is_response: q["ignore_for_webhook"] = False if isinstance(quick_reply, _quick_reply.QuickReplyText): q["title"] = quick_reply.title if not isinstance(quick_reply, _quick_reply.QuickReplyLocation): q["image_url"] = quick_reply.image_url xmd["quick_replies"].append(q) if len(self.quick_replies) == 1 and self.quick_replies[0].is_response: xmd["quick_replies"] = xmd["quick_replies"][0] data["platform_xmd"] = json.dumps(xmd) if self.reply_to_id: data["replied_to_message_id"] = self.reply_to_id return data @classmethod def _from_graphql(cls, data): if data.get("message_sender") is None: data["message_sender"] = {} if data.get("message") is None: data["message"] = {} tags = data.get("tags_list") rtn = cls( text=data["message"].get("text"), mentions=[ Mention( m.get("entity", {}).get("id"), offset=m.get("offset"), length=m.get("length"), ) for m in data["message"].get("ranges") or () ], emoji_size=EmojiSize._from_tags(tags), sticker=_sticker.Sticker._from_graphql(data.get("sticker")), ) rtn.forwarded = cls._get_forwarded_from_tags(tags) rtn.uid = str(data["message_id"]) rtn.author = str(data["message_sender"]["id"]) rtn.timestamp = data.get("timestamp_precise") rtn.unsent = False if data.get("unread") is not None: rtn.is_read = not data["unread"] rtn.reactions = { str(r["user"]["id"]): MessageReaction._extend_if_invalid(r["reaction"]) for r in data["message_reactions"] } if data.get("blob_attachments") is not None: rtn.attachments = [ _file.graphql_to_attachment(attachment) for attachment in data["blob_attachments"] ] if data.get("platform_xmd_encoded"): quick_replies = json.loads(data["platform_xmd_encoded"]).get( "quick_replies" ) if isinstance(quick_replies, list): rtn.quick_replies = [ _quick_reply.graphql_to_quick_reply(q) for q in quick_replies ] elif isinstance(quick_replies, dict): rtn.quick_replies = [ _quick_reply.graphql_to_quick_reply(quick_replies, is_response=True) ] if data.get("extensible_attachment") is not None: attachment = graphql_to_extensible_attachment(data["extensible_attachment"]) if isinstance(attachment, _attachment.UnsentMessage): rtn.unsent = True elif attachment: rtn.attachments.append(attachment) if data.get("replied_to_message") is not None: rtn.replied_to = cls._from_graphql(data["replied_to_message"]["message"]) rtn.reply_to_id = rtn.replied_to.uid return rtn @classmethod def _from_reply(cls, data): tags = data["messageMetadata"].get("tags") rtn = cls( text=data.get("body"), mentions=[ Mention(m.get("i"), offset=m.get("o"), length=m.get("l")) for m in json.loads(data.get("data", {}).get("prng", "[]")) ], emoji_size=EmojiSize._from_tags(tags), ) metadata = data.get("messageMetadata", {}) rtn.forwarded = cls._get_forwarded_from_tags(tags) rtn.uid = metadata.get("messageId") rtn.author = str(metadata.get("actorFbId")) rtn.timestamp = metadata.get("timestamp") rtn.unsent = False if data.get("data", {}).get("platform_xmd"): quick_replies = json.loads(data["data"]["platform_xmd"]).get( "quick_replies" ) if isinstance(quick_replies, list): rtn.quick_replies = [ _quick_reply.graphql_to_quick_reply(q) for q in quick_replies ] elif isinstance(quick_replies, dict): rtn.quick_replies = [ _quick_reply.graphql_to_quick_reply(quick_replies, is_response=True) ] if data.get("attachments") is not None: for attachment in data["attachments"]: attachment = json.loads(attachment["mercuryJSON"]) if attachment.get("blob_attachment"): rtn.attachments.append( _file.graphql_to_attachment(attachment["blob_attachment"]) ) if attachment.get("extensible_attachment"): extensible_attachment = graphql_to_extensible_attachment( attachment["extensible_attachment"] ) if isinstance(extensible_attachment, _attachment.UnsentMessage): rtn.unsent = True else: rtn.attachments.append(extensible_attachment) if attachment.get("sticker_attachment"): rtn.sticker = _sticker.Sticker._from_graphql( attachment["sticker_attachment"] ) return rtn @classmethod def _from_pull(cls, data, mid=None, tags=None, author=None, timestamp=None): rtn = cls(text=data.get("body")) rtn.uid = mid rtn.author = author rtn.timestamp = timestamp if data.get("data") and data["data"].get("prng"): try: rtn.mentions = [ Mention( str(mention.get("i")), offset=mention.get("o"), length=mention.get("l"), ) for mention in _util.parse_json(data["data"]["prng"]) ] except Exception: _util.log.exception("An exception occured while reading attachments") if data.get("attachments"): try: for a in data["attachments"]: mercury = a["mercury"] if mercury.get("blob_attachment"): image_metadata = a.get("imageMetadata", {}) attach_type = mercury["blob_attachment"]["__typename"] attachment = _file.graphql_to_attachment( mercury["blob_attachment"] ) if attach_type in [ "MessageFile", "MessageVideo", "MessageAudio", ]: # TODO: Add more data here for audio files attachment.size = int(a["fileSize"]) rtn.attachments.append(attachment) elif mercury.get("sticker_attachment"): rtn.sticker = _sticker.Sticker._from_graphql( mercury["sticker_attachment"] ) elif mercury.get("extensible_attachment"): attachment = graphql_to_extensible_attachment( mercury["extensible_attachment"] ) if isinstance(attachment, _attachment.UnsentMessage): rtn.unsent = True elif attachment: rtn.attachments.append(attachment) except Exception: _util.log.exception( "An exception occured while reading attachments: {}".format( data["attachments"] ) ) rtn.emoji_size = EmojiSize._from_tags(tags) rtn.forwarded = cls._get_forwarded_from_tags(tags) return rtn def graphql_to_extensible_attachment(data): story = data.get("story_attachment") if not story: return None target = story.get("target") if not target: return _attachment.UnsentMessage(uid=data.get("legacy_attachment_id")) _type = target["__typename"] if _type == "MessageLocation": return _location.LocationAttachment._from_graphql(story) elif _type == "MessageLiveLocation": return _location.LiveLocationAttachment._from_graphql(story) elif _type in ["ExternalUrl", "Story"]: return _attachment.ShareAttachment._from_graphql(story) return None PKt%O"Gfbchat/_page.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from . import _plan from ._thread import ThreadType, Thread @attr.s(cmp=False, init=False) class Page(Thread): """Represents a Facebook page. Inherits `Thread`.""" #: The page's custom URL url = attr.ib(None) #: The name of the page's location city city = attr.ib(None) #: Amount of likes the page has likes = attr.ib(None) #: Some extra information about the page sub_title = attr.ib(None) #: The page's category category = attr.ib(None) def __init__( self, uid, url=None, city=None, likes=None, sub_title=None, category=None, **kwargs ): super(Page, self).__init__(ThreadType.PAGE, uid, **kwargs) self.url = url self.city = city self.likes = likes self.sub_title = sub_title self.category = category @classmethod def _from_graphql(cls, data): if data.get("profile_picture") is None: data["profile_picture"] = {} if data.get("city") is None: data["city"] = {} plan = None if data.get("event_reminders") and data["event_reminders"].get("nodes"): plan = _plan.Plan._from_graphql(data["event_reminders"]["nodes"][0]) return cls( data["id"], url=data.get("url"), city=data.get("city").get("name"), category=data.get("category_type"), photo=data["profile_picture"].get("uri"), name=data.get("name"), message_count=data.get("messages_count"), plan=plan, ) PKt%Oat fbchat/_plan.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr import json from ._core import Enum class GuestStatus(Enum): INVITED = 1 GOING = 2 DECLINED = 3 @attr.s(cmp=False) class Plan(object): """Represents a plan.""" #: ID of the plan uid = attr.ib(None, init=False) #: Plan time (timestamp), only precise down to the minute time = attr.ib(converter=int) #: Plan title title = attr.ib() #: Plan location name location = attr.ib(None, converter=lambda x: x or "") #: Plan location ID location_id = attr.ib(None, converter=lambda x: x or "") #: ID of the plan creator author_id = attr.ib(None, init=False) #: Dictionary of `User` IDs mapped to their `GuestStatus` guests = attr.ib(None, init=False) @property def going(self): """List of the `User` IDs who will take part in the plan.""" return [ id_ for id_, status in (self.guests or {}).items() if status is GuestStatus.GOING ] @property def declined(self): """List of the `User` IDs who won't take part in the plan.""" return [ id_ for id_, status in (self.guests or {}).items() if status is GuestStatus.DECLINED ] @property def invited(self): """List of the `User` IDs who are invited to the plan.""" return [ id_ for id_, status in (self.guests or {}).items() if status is GuestStatus.INVITED ] @classmethod def _from_pull(cls, data): rtn = cls( time=data.get("event_time"), title=data.get("event_title"), location=data.get("event_location_name"), location_id=data.get("event_location_id"), ) rtn.uid = data.get("event_id") rtn.author_id = data.get("event_creator_id") rtn.guests = { x["node"]["id"]: GuestStatus[x["guest_list_state"]] for x in json.loads(data["guest_state_list"]) } return rtn @classmethod def _from_fetch(cls, data): rtn = cls( time=data.get("event_time"), title=data.get("title"), location=data.get("location_name"), location_id=str(data["location_id"]) if data.get("location_id") else None, ) rtn.uid = data.get("oid") rtn.author_id = data.get("creator_id") rtn.guests = {id_: GuestStatus[s] for id_, s in data["event_members"].items()} return rtn @classmethod def _from_graphql(cls, data): rtn = cls( time=data.get("time"), title=data.get("event_title"), location=data.get("location_name"), ) rtn.uid = data.get("id") rtn.author_id = data["lightweight_event_creator"].get("id") rtn.guests = { x["node"]["id"]: GuestStatus[x["guest_list_state"]] for x in data["event_reminder_members"]["edges"] } return rtn PKt%OSOfbchat/_poll.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr @attr.s(cmp=False) class Poll(object): """Represents a poll.""" #: Title of the poll title = attr.ib() #: List of :class:`PollOption`, can be fetched with :func:`fbchat.Client.fetchPollOptions` options = attr.ib() #: Options count options_count = attr.ib(None) #: ID of the poll uid = attr.ib(None) @classmethod def _from_graphql(cls, data): return cls( uid=int(data["id"]), title=data.get("title") if data.get("title") else data.get("text"), options=[PollOption._from_graphql(m) for m in data.get("options")], options_count=data.get("total_count"), ) @attr.s(cmp=False) class PollOption(object): """Represents a poll option.""" #: Text of the poll option text = attr.ib() #: Whether vote when creating or client voted vote = attr.ib(False) #: ID of the users who voted for this poll option voters = attr.ib(None) #: Votes count votes_count = attr.ib(None) #: ID of the poll option uid = attr.ib(None) @classmethod def _from_graphql(cls, data): if data.get("viewer_has_voted") is None: vote = None elif isinstance(data["viewer_has_voted"], bool): vote = data["viewer_has_voted"] else: vote = data["viewer_has_voted"] == "true" return cls( uid=int(data["id"]), text=data.get("text"), vote=vote, voters=( [m.get("node").get("id") for m in data.get("voters").get("edges")] if isinstance(data.get("voters"), dict) else data.get("voters") ), votes_count=( data.get("voters").get("count") if isinstance(data.get("voters"), dict) else data.get("total_count") ), ) PKt%OטSn n fbchat/_quick_reply.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._attachment import Attachment @attr.s(cmp=False) class QuickReply(object): """Represents a quick reply.""" #: Payload of the quick reply payload = attr.ib(None) #: External payload for responses external_payload = attr.ib(None, init=False) #: Additional data data = attr.ib(None) #: Whether it's a response for a quick reply is_response = attr.ib(False) @attr.s(cmp=False, init=False) class QuickReplyText(QuickReply): """Represents a text quick reply.""" #: Title of the quick reply title = attr.ib(None) #: URL of the quick reply image (optional) image_url = attr.ib(None) #: Type of the quick reply _type = "text" def __init__(self, title=None, image_url=None, **kwargs): super(QuickReplyText, self).__init__(**kwargs) self.title = title self.image_url = image_url @attr.s(cmp=False, init=False) class QuickReplyLocation(QuickReply): """Represents a location quick reply (Doesn't work on mobile).""" #: Type of the quick reply _type = "location" def __init__(self, **kwargs): super(QuickReplyLocation, self).__init__(**kwargs) self.is_response = False @attr.s(cmp=False, init=False) class QuickReplyPhoneNumber(QuickReply): """Represents a phone number quick reply (Doesn't work on mobile).""" #: URL of the quick reply image (optional) image_url = attr.ib(None) #: Type of the quick reply _type = "user_phone_number" def __init__(self, image_url=None, **kwargs): super(QuickReplyPhoneNumber, self).__init__(**kwargs) self.image_url = image_url @attr.s(cmp=False, init=False) class QuickReplyEmail(QuickReply): """Represents an email quick reply (Doesn't work on mobile).""" #: URL of the quick reply image (optional) image_url = attr.ib(None) #: Type of the quick reply _type = "user_email" def __init__(self, image_url=None, **kwargs): super(QuickReplyEmail, self).__init__(**kwargs) self.image_url = image_url def graphql_to_quick_reply(q, is_response=False): data = dict() _type = q.get("content_type").lower() if q.get("payload"): data["payload"] = q["payload"] if q.get("data"): data["data"] = q["data"] if q.get("image_url") and _type is not QuickReplyLocation._type: data["image_url"] = q["image_url"] data["is_response"] = is_response if _type == QuickReplyText._type: if q.get("title") is not None: data["title"] = q["title"] rtn = QuickReplyText(**data) elif _type == QuickReplyLocation._type: rtn = QuickReplyLocation(**data) elif _type == QuickReplyPhoneNumber._type: rtn = QuickReplyPhoneNumber(**data) elif _type == QuickReplyEmail._type: rtn = QuickReplyEmail(**data) return rtn PKt%O--fbchat/_state.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr import bs4 import re import requests import random from . import _graphql, _util, _exception FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"') def get_user_id(session): # TODO: Optimize this `.get_dict()` call! rtn = session.cookies.get_dict().get("c_user") if rtn is None: raise _exception.FBchatException("Could not find user id") return str(rtn) def find_input_fields(html): return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input")) def session_factory(user_agent=None): session = requests.session() session.headers["Referer"] = "https://www.facebook.com" # TODO: Deprecate setting the user agent manually session.headers["User-Agent"] = user_agent or random.choice(_util.USER_AGENTS) return session def client_id_factory(): return hex(int(random.random() * 2 ** 31))[2:] def is_home(url): parts = _util.urlparse(url) # Check the urls `/home.php` and `/` return "home" in parts.path or "/" == parts.path def _2fa_helper(session, code, r): soup = find_input_fields(r.text) data = dict() url = "https://m.facebook.com/login/checkpoint/" data["approvals_code"] = code data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] data["nh"] = soup.find("input", {"name": "nh"})["value"] data["submit[Submit Code]"] = "Submit Code" data["codes_submitted"] = 0 _util.log.info("Submitting 2FA code.") r = session.post(url, data=data) if is_home(r.url): return r del data["approvals_code"] del data["submit[Submit Code]"] del data["codes_submitted"] data["name_action_selected"] = "save_device" data["submit[Continue]"] = "Continue" _util.log.info("Saving browser.") # At this stage, we have dtsg, nh, name_action_selected, submit[Continue] r = session.post(url, data=data) if is_home(r.url): return r del data["name_action_selected"] _util.log.info("Starting Facebook checkup flow.") # At this stage, we have dtsg, nh, submit[Continue] r = session.post(url, data=data) if is_home(r.url): return r del data["submit[Continue]"] data["submit[This was me]"] = "This Was Me" _util.log.info("Verifying login attempt.") # At this stage, we have dtsg, nh, submit[This was me] r = session.post(url, data=data) if is_home(r.url): return r del data["submit[This was me]"] data["submit[Continue]"] = "Continue" data["name_action_selected"] = "save_device" _util.log.info("Saving device again.") # At this stage, we have dtsg, nh, submit[Continue], name_action_selected r = session.post(url, data=data) return r @attr.s(slots=True) # TODO i Python 3: Add kw_only=True class State(object): """Stores and manages state required for most Facebook requests.""" user_id = attr.ib() _fb_dtsg = attr.ib() _revision = attr.ib() _session = attr.ib(factory=session_factory) _counter = attr.ib(0) _client_id = attr.ib(factory=client_id_factory) _logout_h = attr.ib(None) def get_params(self): self._counter += 1 # TODO: Make this operation atomic / thread-safe return { "__a": 1, "__req": _util.str_base(self._counter, 36), "__rev": self._revision, "fb_dtsg": self._fb_dtsg, } @classmethod def login(cls, email, password, on_2fa_callback, user_agent=None): session = session_factory(user_agent=user_agent) soup = find_input_fields(session.get("https://m.facebook.com/").text) data = dict( (elem["name"], elem["value"]) for elem in soup if elem.has_attr("value") and elem.has_attr("name") ) data["email"] = email data["pass"] = password data["login"] = "Log In" r = session.post("https://m.facebook.com/login.php?login_attempt=1", data=data) # Usually, 'Checkpoint' will refer to 2FA if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()): code = on_2fa_callback() r = _2fa_helper(session, code, r) # Sometimes Facebook tries to show the user a "Save Device" dialog if "save-device" in r.url: r = session.get("https://m.facebook.com/login/save-device/cancel/") if is_home(r.url): return cls.from_session(session=session) else: raise _exception.FBchatUserError( "Login failed. Check email/password. " "(Failed on url: {})".format(r.url) ) def is_logged_in(self): # Send a request to the login url, to see if we're directed to the home page url = "https://m.facebook.com/login.php?login_attempt=1" r = self._session.get(url, allow_redirects=False) return "Location" in r.headers and is_home(r.headers["Location"]) def logout(self): logout_h = self._logout_h if not logout_h: url = _util.prefix_url("/bluebar/modern_settings_menu/") h_r = self._session.post(url, data={"pmid": "4"}) logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1) url = _util.prefix_url("/logout.php") return self._session.get(url, params={"ref": "mb", "h": logout_h}).ok @classmethod def from_session(cls, session): # TODO: Automatically set user_id when the cookie changes in the session user_id = get_user_id(session) r = session.get(_util.prefix_url("/")) soup = find_input_fields(r.text) fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) if fb_dtsg_element: fb_dtsg = fb_dtsg_element["value"] else: # Fall back to searching with a regex fb_dtsg = FB_DTSG_REGEX.search(r.text).group(1) revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0]) logout_h_element = soup.find("input", {"name": "h"}) logout_h = logout_h_element["value"] if logout_h_element else None return cls( user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session, logout_h=logout_h, ) def get_cookies(self): return self._session.cookies.get_dict() @classmethod def from_cookies(cls, cookies, user_agent=None): session = session_factory(user_agent=user_agent) session.cookies = requests.cookies.merge_cookies(session.cookies, cookies) return cls.from_session(session=session) def _do_refresh(self): # TODO: Raise the error instead, and make the user do the refresh manually # It may be a bad idea to do this in an exception handler, if you have a better method, please suggest it! _util.log.warning("Refreshing state and resending request") new = State.from_session(session=self._session) self.user_id = new.user_id self._fb_dtsg = new._fb_dtsg self._revision = new._revision self._counter = new._counter self._logout_h = new._logout_h or self._logout_h def _get(self, url, params, error_retries=3): params.update(self.get_params()) r = self._session.get(_util.prefix_url(url), params=params) content = _util.check_request(r) j = _util.to_json(content) try: _util.handle_payload_error(j) except _exception.FBchatPleaseRefresh: if error_retries > 0: self._do_refresh() return self._get(url, params, error_retries=error_retries - 1) raise return j def _post(self, url, data, files=None, as_graphql=False, error_retries=3): data.update(self.get_params()) r = self._session.post(_util.prefix_url(url), data=data, files=files) content = _util.check_request(r) try: if as_graphql: return _graphql.response_to_json(content) else: j = _util.to_json(content) # TODO: Remove this, and move it to _payload_post instead # We can't yet, since errors raised in here need to be caught below _util.handle_payload_error(j) return j except _exception.FBchatPleaseRefresh: if error_retries > 0: self._do_refresh() return self._post( url, data, files=files, as_graphql=as_graphql, error_retries=error_retries - 1, ) raise def _payload_post(self, url, data, files=None): j = self._post(url, data, files=files) try: return j["payload"] except (KeyError, TypeError): raise _exception.FBchatException("Missing payload: {}".format(j)) def _graphql_requests(self, *queries): data = { "method": "GET", "response_format": "json", "queries": _graphql.queries_to_json(*queries), } return self._post("/api/graphqlbatch/", data, as_graphql=True) def _upload(self, files, voice_clip=False): """Upload files to Facebook. `files` should be a list of files that requests can upload, see `requests.request `_. Return a list of tuples with a file's ID and mimetype. """ file_dict = {"upload_{}".format(i): f for i, f in enumerate(files)} data = {"voice_clip": voice_clip} j = self._payload_post( "https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict ) if len(j["metadata"]) != len(files): raise _exception.FBchatException( "Some files could not be uploaded: {}, {}".format(j, files) ) return [ (data[_util.mimetype_to_key(data["filetype"])], data["filetype"]) for data in j["metadata"] ] def _do_send_request(self, data): offline_threading_id = _util.generateOfflineThreadingID() data["client"] = "mercury" data["author"] = "fbid:{}".format(self.user_id) data["timestamp"] = _util.now() data["source"] = "source:chat:web" data["offline_threading_id"] = offline_threading_id data["message_id"] = offline_threading_id data["threading_id"] = _util.generateMessageID(self._client_id) data["ephemeral_ttl_mode:"] = "0" j = self._post("/messaging/send/", data) # update JS token if received in response fb_dtsg = _util.get_jsmods_require(j, 2) if fb_dtsg is not None: self._fb_dtsg = fb_dtsg try: message_ids = [ (action["message_id"], action["thread_fbid"]) for action in j["payload"]["actions"] if "message_id" in action ] if len(message_ids) != 1: log.warning("Got multiple message ids' back: {}".format(message_ids)) return message_ids[0] except (KeyError, IndexError, TypeError) as e: raise _exception.FBchatException( "Error when sending message: " "No message IDs could be found: {}".format(j) ) PKt%Olfbchat/_sticker.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._attachment import Attachment @attr.s(cmp=False, init=False) class Sticker(Attachment): """Represents a Facebook sticker that has been sent to a thread as an attachment.""" #: The sticker-pack's ID pack = attr.ib(None) #: Whether the sticker is animated is_animated = attr.ib(False) # If the sticker is animated, the following should be present #: URL to a medium spritemap medium_sprite_image = attr.ib(None) #: URL to a large spritemap large_sprite_image = attr.ib(None) #: The amount of frames present in the spritemap pr. row frames_per_row = attr.ib(None) #: The amount of frames present in the spritemap pr. column frames_per_col = attr.ib(None) #: The frame rate the spritemap is intended to be played in frame_rate = attr.ib(None) #: URL to the sticker's image url = attr.ib(None) #: Width of the sticker width = attr.ib(None) #: Height of the sticker height = attr.ib(None) #: The sticker's label/name label = attr.ib(None) def __init__(self, uid=None): super(Sticker, self).__init__(uid=uid) @classmethod def _from_graphql(cls, data): if not data: return None self = cls(uid=data["id"]) if data.get("pack"): self.pack = data["pack"].get("id") if data.get("sprite_image"): self.is_animated = True self.medium_sprite_image = data["sprite_image"].get("uri") self.large_sprite_image = data["sprite_image_2x"].get("uri") self.frames_per_row = data.get("frames_per_row") self.frames_per_col = data.get("frames_per_column") self.frame_rate = data.get("frame_rate") self.url = data.get("url") self.width = data.get("width") self.height = data.get("height") if data.get("label"): self.label = data["label"] return self PKt%OU5fbchat/_thread.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._core import Enum class ThreadType(Enum): """Used to specify what type of Facebook thread is being used. See :ref:`intro_threads` for more info. """ USER = 1 GROUP = 2 ROOM = 2 PAGE = 3 def _to_class(self): """Convert this enum value to the corresponding class.""" from . import _user, _group, _page return { ThreadType.USER: _user.User, ThreadType.GROUP: _group.Group, ThreadType.ROOM: _group.Room, ThreadType.PAGE: _page.Page, }[self] class ThreadLocation(Enum): """Used to specify where a thread is located (inbox, pending, archived, other).""" INBOX = "INBOX" PENDING = "PENDING" ARCHIVED = "ARCHIVED" OTHER = "OTHER" class ThreadColor(Enum): """Used to specify a thread colors.""" MESSENGER_BLUE = "#0084ff" VIKING = "#44bec7" GOLDEN_POPPY = "#ffc300" RADICAL_RED = "#fa3c4c" SHOCKING = "#d696bb" PICTON_BLUE = "#6699cc" FREE_SPEECH_GREEN = "#13cf13" PUMPKIN = "#ff7e29" LIGHT_CORAL = "#e68585" MEDIUM_SLATE_BLUE = "#7646ff" DEEP_SKY_BLUE = "#20cef5" FERN = "#67b868" CAMEO = "#d4a88c" BRILLIANT_ROSE = "#ff5ca1" BILOBA_FLOWER = "#a695c7" TICKLE_ME_PINK = "#ff7ca8" MALACHITE = "#1adb5b" RUBY = "#f01d6a" DARK_TANGERINE = "#ff9c19" BRIGHT_TURQUOISE = "#0edcde" @classmethod def _from_graphql(cls, color): if color is None: return None if not color: return cls.MESSENGER_BLUE color = color[2:] # Strip the alpha value value = "#{}".format(color.lower()) return cls._extend_if_invalid(value) @attr.s(cmp=False, init=False) class Thread(object): """Represents a Facebook thread.""" #: The unique identifier of the thread. Can be used a ``thread_id``. See :ref:`intro_threads` for more info uid = attr.ib(converter=str) #: Specifies the type of thread. Can be used a ``thread_type``. See :ref:`intro_threads` for more info type = attr.ib() #: A URL to the thread's picture photo = attr.ib(None) #: The name of the thread name = attr.ib(None) #: Timestamp of last message last_message_timestamp = attr.ib(None) #: Number of messages in the thread message_count = attr.ib(None) #: Set :class:`Plan` plan = attr.ib(None) def __init__( self, _type, uid, photo=None, name=None, last_message_timestamp=None, message_count=None, plan=None, ): self.uid = str(uid) self.type = _type self.photo = photo self.name = name self.last_message_timestamp = last_message_timestamp self.message_count = message_count self.plan = plan @staticmethod def _parse_customization_info(data): if data is None or data.get("customization_info") is None: return {} info = data["customization_info"] rtn = { "emoji": info.get("emoji"), "color": ThreadColor._from_graphql(info.get("outgoing_bubble_color")), } if ( data.get("thread_type") == "GROUP" or data.get("is_group_thread") or data.get("thread_key", {}).get("thread_fbid") ): rtn["nicknames"] = {} for k in info.get("participant_customizations", []): rtn["nicknames"][k["participant_id"]] = k.get("nickname") elif info.get("participant_customizations"): uid = data.get("thread_key", {}).get("other_user_id") or data.get("id") pc = info["participant_customizations"] if len(pc) > 0: if pc[0].get("participant_id") == uid: rtn["nickname"] = pc[0].get("nickname") else: rtn["own_nickname"] = pc[0].get("nickname") if len(pc) > 1: if pc[1].get("participant_id") == uid: rtn["nickname"] = pc[1].get("nickname") else: rtn["own_nickname"] = pc[1].get("nickname") return rtn def _to_send_data(self): # TODO: Only implement this in subclasses return {"other_user_fbid": self.uid} PKt%O{fbchat/_user.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import attr from ._core import Enum from . import _plan from ._thread import ThreadType, Thread GENDERS = { # For standard requests 0: "unknown", 1: "female_singular", 2: "male_singular", 3: "female_singular_guess", 4: "male_singular_guess", 5: "mixed", 6: "neuter_singular", 7: "unknown_singular", 8: "female_plural", 9: "male_plural", 10: "neuter_plural", 11: "unknown_plural", # For graphql requests "UNKNOWN": "unknown", "FEMALE": "female_singular", "MALE": "male_singular", # '': 'female_singular_guess', # '': 'male_singular_guess', # '': 'mixed', "NEUTER": "neuter_singular", # '': 'unknown_singular', # '': 'female_plural', # '': 'male_plural', # '': 'neuter_plural', # '': 'unknown_plural', } class TypingStatus(Enum): """Used to specify whether the user is typing or has stopped typing.""" STOPPED = 0 TYPING = 1 @attr.s(cmp=False, init=False) class User(Thread): """Represents a Facebook user. Inherits `Thread`.""" #: The profile URL url = attr.ib(None) #: The users first name first_name = attr.ib(None) #: The users last name last_name = attr.ib(None) #: Whether the user and the client are friends is_friend = attr.ib(None) #: The user's gender gender = attr.ib(None) #: From 0 to 1. How close the client is to the user affinity = attr.ib(None) #: The user's nickname nickname = attr.ib(None) #: The clients nickname, as seen by the user own_nickname = attr.ib(None) #: A :class:`ThreadColor`. The message color color = attr.ib(None) #: The default emoji emoji = attr.ib(None) def __init__( self, uid, url=None, first_name=None, last_name=None, is_friend=None, gender=None, affinity=None, nickname=None, own_nickname=None, color=None, emoji=None, **kwargs ): super(User, self).__init__(ThreadType.USER, uid, **kwargs) self.url = url self.first_name = first_name self.last_name = last_name self.is_friend = is_friend self.gender = gender self.affinity = affinity self.nickname = nickname self.own_nickname = own_nickname self.color = color self.emoji = emoji @classmethod def _from_graphql(cls, data): if data.get("profile_picture") is None: data["profile_picture"] = {} c_info = cls._parse_customization_info(data) plan = None if data.get("event_reminders") and data["event_reminders"].get("nodes"): plan = _plan.Plan._from_graphql(data["event_reminders"]["nodes"][0]) return cls( data["id"], url=data.get("url"), first_name=data.get("first_name"), last_name=data.get("last_name"), is_friend=data.get("is_viewer_friend"), gender=GENDERS.get(data.get("gender")), affinity=data.get("affinity"), nickname=c_info.get("nickname"), color=c_info.get("color"), emoji=c_info.get("emoji"), own_nickname=c_info.get("own_nickname"), photo=data["profile_picture"].get("uri"), name=data.get("name"), message_count=data.get("messages_count"), plan=plan, ) @classmethod def _from_thread_fetch(cls, data): if data.get("big_image_src") is None: data["big_image_src"] = {} c_info = cls._parse_customization_info(data) participants = [ node["messaging_actor"] for node in data["all_participants"]["nodes"] ] user = next( p for p in participants if p["id"] == data["thread_key"]["other_user_id"] ) last_message_timestamp = None if "last_message" in data: last_message_timestamp = data["last_message"]["nodes"][0][ "timestamp_precise" ] first_name = user.get("short_name") if first_name is None: last_name = None else: last_name = user.get("name").split(first_name, 1).pop().strip() plan = None if data.get("event_reminders") and data["event_reminders"].get("nodes"): plan = _plan.Plan._from_graphql(data["event_reminders"]["nodes"][0]) return cls( user["id"], url=user.get("url"), name=user.get("name"), first_name=first_name, last_name=last_name, is_friend=user.get("is_viewer_friend"), gender=GENDERS.get(user.get("gender")), affinity=user.get("affinity"), nickname=c_info.get("nickname"), color=c_info.get("color"), emoji=c_info.get("emoji"), own_nickname=c_info.get("own_nickname"), photo=user["big_image_src"].get("uri"), message_count=data.get("messages_count"), last_message_timestamp=last_message_timestamp, plan=plan, ) @classmethod def _from_all_fetch(cls, data): return cls( data["id"], first_name=data.get("firstName"), url=data.get("uri"), photo=data.get("thumbSrc"), name=data.get("name"), is_friend=data.get("is_friend"), gender=GENDERS.get(data.get("gender")), ) @attr.s(cmp=False) class ActiveStatus(object): #: Whether the user is active now active = attr.ib(None) #: Timestamp when the user was last active last_active = attr.ib(None) #: Whether the user is playing Messenger game now in_game = attr.ib(None) @classmethod def _from_chatproxy_presence(cls, id_, data): return cls( active=data["p"] in [2, 3] if "p" in data else None, last_active=data.get("lat"), in_game=int(id_) in data.get("gamers", {}), ) @classmethod def _from_buddylist_overlay(cls, data, in_game=None): return cls( active=data["a"] in [2, 3] if "a" in data else None, last_active=data.get("la"), in_game=None, ) PKt%OIfbchat/_util.py# -*- coding: UTF-8 -*- from __future__ import unicode_literals import re import json from time import time from random import random from contextlib import contextmanager from mimetypes import guess_type from os.path import basename import warnings import logging import requests from ._exception import ( FBchatException, FBchatFacebookError, FBchatInvalidParameters, FBchatNotLoggedIn, FBchatPleaseRefresh, ) try: from urllib.parse import urlencode, parse_qs, urlparse basestring = (str, bytes) except ImportError: from urllib import urlencode from urlparse import parse_qs, urlparse basestring = basestring # Python 2's `input` executes the input, whereas `raw_input` just returns the input try: input = raw_input except NameError: pass # Log settings log = logging.getLogger("client") log.setLevel(logging.DEBUG) # Creates the console handler handler = logging.StreamHandler() log.addHandler(handler) #: Default list of user agents USER_AGENTS = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10", "Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1", "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", ] def now(): return int(time() * 1000) def strip_json_cruft(text): """Removes `for(;;);` (and other cruft) that preceeds JSON responses.""" try: return text[text.index("{") :] except ValueError: raise FBchatException("No JSON object found: {!r}".format(text)) def get_decoded_r(r): return get_decoded(r._content) def get_decoded(content): return content.decode("utf-8") def parse_json(content): try: return json.loads(content) except ValueError: raise FBchatFacebookError("Error while parsing JSON: {!r}".format(content)) def digitToChar(digit): if digit < 10: return str(digit) return chr(ord("a") + digit - 10) def str_base(number, base): if number < 0: return "-" + str_base(-number, base) (d, m) = divmod(number, base) if d > 0: return str_base(d, base) + digitToChar(m) return digitToChar(m) def generateMessageID(client_id=None): k = now() l = int(random() * 4294967295) return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id) def getSignatureID(): return hex(int(random() * 2147483648)) def generateOfflineThreadingID(): ret = now() value = int(random() * 4294967295) string = ("0000000000000000000000" + format(value, "b"))[-22:] msgs = format(ret, "b") + string return str(int(msgs, 2)) def handle_payload_error(j): if "error" not in j: return error = j["error"] if j["error"] == 1357001: error_cls = FBchatNotLoggedIn elif j["error"] == 1357004: error_cls = FBchatPleaseRefresh elif j["error"] in (1357031, 1545010, 1545003): error_cls = FBchatInvalidParameters else: error_cls = FBchatFacebookError # TODO: Use j["errorSummary"] # "errorDescription" is in the users own language! raise error_cls( "Error #{} when sending request: {}".format(error, j["errorDescription"]), fb_error_code=error, fb_error_message=j["errorDescription"], ) def handle_graphql_errors(j): errors = [] if j.get("error"): errors = [j["error"]] if "errors" in j: errors = j["errors"] if errors: error = errors[0] # TODO: Handle multiple errors # TODO: Use `summary`, `severity` and `description` raise FBchatFacebookError( "GraphQL error #{}: {} / {!r}".format( error.get("code"), error.get("message"), error.get("debug_info") ), fb_error_code=error.get("code"), fb_error_message=error.get("message"), ) def check_request(r): check_http_code(r.status_code) content = get_decoded_r(r) check_content(content) return content def check_http_code(code): msg = "Error when sending request: Got {} response.".format(code) if code == 404: raise FBchatFacebookError( msg + " This is either because you specified an invalid URL, or because" " you provided an invalid id (Facebook usually requires integer ids).", request_status_code=code, ) if 400 <= code < 600: raise FBchatFacebookError(msg, request_status_code=code) def check_content(content, as_json=True): if content is None or len(content) == 0: raise FBchatFacebookError("Error when sending request: Got empty response") def to_json(content): content = strip_json_cruft(content) j = parse_json(content) log.debug(j) return j def get_jsmods_require(j, index): if j.get("jsmods") and j["jsmods"].get("require"): try: return j["jsmods"]["require"][0][index][0] except (KeyError, IndexError) as e: log.warning( "Error when getting jsmods_require: " "{}. Facebook might have changed protocol".format(j) ) return None def require_list(list_): if isinstance(list_, list): return set(list_) else: return set([list_]) def mimetype_to_key(mimetype): if not mimetype: return "file_id" if mimetype == "image/gif": return "gif_id" x = mimetype.split("/") if x[0] in ["video", "image", "audio"]: return "%s_id" % x[0] return "file_id" def get_files_from_urls(file_urls): files = [] for file_url in file_urls: r = requests.get(file_url) # We could possibly use r.headers.get('Content-Disposition'), see # https://stackoverflow.com/a/37060758 files.append( ( basename(file_url).split("?")[0].split("#")[0], r.content, r.headers.get("Content-Type") or guess_type(file_url)[0], ) ) return files @contextmanager def get_files_from_paths(filenames): files = [] for filename in filenames: files.append( (basename(filename), open(filename, "rb"), guess_type(filename)[0]) ) yield files for fn, fp, ft in files: fp.close() def get_url_parameters(url, *args): params = parse_qs(urlparse(url).query) return [params[arg][0] for arg in args if params.get(arg)] def get_url_parameter(url, param): return get_url_parameters(url, param)[0] def prefix_url(url): if url.startswith("/"): return "https://www.facebook.com" + url return url PKt%Oo6llfbchat/models.py# -*- coding: UTF-8 -*- """This file is here to maintain backwards compatability, and to re-export our models into the global module (see `__init__.py`). A common pattern was to use `from fbchat.models import *`, hence we need this while transitioning to a better code structure. """ from __future__ import unicode_literals from ._core import Enum from ._exception import FBchatException, FBchatFacebookError, FBchatUserError from ._thread import ThreadType, ThreadLocation, ThreadColor, Thread from ._user import TypingStatus, User, ActiveStatus from ._group import Group, Room from ._page import Page from ._message import EmojiSize, MessageReaction, Mention, Message from ._attachment import Attachment, UnsentMessage, ShareAttachment from ._sticker import Sticker from ._location import LocationAttachment, LiveLocationAttachment from ._file import FileAttachment, AudioAttachment, ImageAttachment, VideoAttachment from ._quick_reply import ( QuickReply, QuickReplyText, QuickReplyLocation, QuickReplyPhoneNumber, QuickReplyEmail, ) from ._poll import Poll, PollOption from ._plan import GuestStatus, Plan PKt%O6rfbchat-1.8.2.dist-info/LICENSEBSD 3-Clause License Copyright (c) 2015, Taehoon Kim All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. PK!HMuSafbchat-1.8.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!H-Cfbchat-1.8.2.dist-info/METADATAWms6 _ֻ-IVMt[O5-QTIʎdG֮S51ܿQi.ޑs0 3j:B޲RAP5jYWI$2sInbRcnbR!S9Fk[sC4GpSVhtc*-oaeTY\ *1D sѬ(OXKɹf<3$y"M`ʴv-[RpڭU=?z^;_/˱}yyu ԡ⹡}̌;.s {߻J5q\FBC1p [#8+"Y($;(Yv00d*rMIE (sR)mD>qҙ0*AQ˒s![T,+U_fqu*+&U6;ԾQ8-qij%{ m9eB+a9\yXYB+hul'؎24Ǐx;:1eA]x$gf hZGb ]˨HIѪ\pm`+Ǝ1J6hKѿrG/NϊT"?;ԭg߁ZbHvlHS|ӔR]i»6D*/ y.nDhyw^OïwHFʔReи^QȰXjS"_\jn*w$=j7m6ǙLjɎq<W;iTEfʢ.wyTf+Q`4h5_ތ.k%,53żKiۡ|9]<e^~pIW7X<ИTס-qj9̂ro¬օ1}X饬l- Uk7{M1 ^}jO'^QX0 <9%LZ\]Σ0_xks \@φpHʃ,:]PBAuƢ퓒?$!RU-gkiYVH#V"4R [m!Fqb}[MKafsYJZF3ȴaiZ1ҲZdxЂMϐܦ"_ο/ M߇M%4Vt13BH(!0 /uC>@ wi}_qW?q׿ɛik0*\n\eIt5J<,_G?PK!H!Cfbchat-1.8.2.dist-info/RECORDuɲH}= ܂LE/A@DaC2)OvDݒZy(gu1WD<$:NprraXtxG1(S ;~-uxTfaik |Qj$>HzdiWI\垮=:/IsTÏP,I8iǢ7V`IkL;&n&d.0/4[4i+-m{< β!o^`{kl'6 ņ,fvY&I9Gu%,Y BP4@d&!v K~4,Pjv:KOG0pc&ד,R8bB'P *o>K,d%()mU0ٶ*ᩯv; XxU&|fWqNg?!qSvZ!J[=A4!8N~c~4/̪JdUhτ]شNy6a粋;3Hّ:Fdpmn N7[0 $n䂷ĕ*r!$i^ӫ֍IA(isj6K]xX5wޟMv斔v \o0TSӕWGS=:A]7 b׭F槦~Syv]viTt P% 7֕EQF-i&g ym1Rn uQymNl[5;=.^pf㥌 fٶM޶C  ?PKt%Ofbchat/__init__.pyPKt%O{^ fbchat/_attachment.pyPKt%OTB   fbchat/_client.pyPKt%OHEfbchat/_core.pyPKt%Ofwqfbchat/_exception.pyPKt%O6&&!fbchat/_file.pyPKt%O>V=GGHfbchat/_graphql.pyPKt%O  Zfbchat/_group.pyPKt%OQU&^^5kfbchat/_location.pyPKt%OE;E;yfbchat/_message.pyPKt%O"G9fbchat/_page.pyPKt%Oat fbchat/_plan.pyPKt%OSOfbchat/_poll.pyPKt%OטSn n fbchat/_quick_reply.pyPKt%O--fbchat/_state.pyPKt%Olfbchat/_sticker.pyPKt%OU5fbchat/_thread.pyPKt%O{"fbchat/_user.pyPKt%OI:fbchat/_util.pyPKt%Oo6llVfbchat/models.pyPKt%O6rp[fbchat-1.8.2.dist-info/LICENSEPK!HMuSaafbchat-1.8.2.dist-info/WHEELPK!H-C bfbchat-1.8.2.dist-info/METADATAPK!H!Cgfbchat-1.8.2.dist-info/RECORDPKl