PK!YWtdameritrade_client/__init__.pyfrom tdameritrade_client import auth from tdameritrade_client import client from tdameritrade_client import utils __version__ = '0.1.1' __doc__ = \ """ python client for the TDA API """ PK![y""tdameritrade_client/auth.pyimport json import os import socket import ssl import webbrowser from contextlib import contextmanager from typing import Dict, Tuple from urllib.parse import quote, unquote import requests from tdameritrade_client.utils import urls class TDAuthenticator(object): def __init__(self, host: str, port: int, oauth_user_id: str, token_path: str): """ TDAuthenticator object retrieves a valid access token. This class is a member of the TDClient class and is meant to be called as a member of a TDClient object. Args: host: The redirect URI host port: The redirect URI port oauth_user_id: The oauth user id (without @AMER.OAUTHAP) token_path: Path to the current token json file """ self._redirect_uri = 'http://{}:{}'.format(host, port) self._oauth_user_id = oauth_user_id + '@AMER.OAUTHAP' self._host = host self._port = port if os.path.dirname(token_path) == '': raise ValueError('Provided token_path {} is invalid.'.format(token_path)) self._token_path = token_path def authenticate(self) -> str: """ Runs the OAUTH flow for the TDA API. Returns: token: A string containing the decoded access token to authorize API requests. """ # Check if token already exists if os.path.isfile(self._token_path): print('Loading existing token...') with open(self._token_path) as token_pointer: token_json = json.load(token_pointer) # Attempt to refresh the token refresh_token = token_json['refresh_token'] successfully_refreshed, token = self.refresh_auth_token(refresh_token) print('Token successfully refreshed.') if not successfully_refreshed: print('Failed to refresh token. Requesting new token.') token = self.run_full_flow() else: token = self.run_full_flow() return token def refresh_auth_token(self, refresh_token: str) -> Tuple[bool, str]: """ Request a new access token via a refresh token. Args: refresh_token: A refresh token as a decoded string. Returns: refreshed: True if token was refreshed successfully. token: The valid access token as a string. """ refreshed = False refresh_request = self._get_url('token_request') headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': self._oauth_user_id } response = requests.post(refresh_request, data=data, headers=headers) token = '' if response.status_code == 200: refreshed = True token = response.json()['access_token'] return refreshed, token def run_full_flow(self) -> str: """ Request an access token and a request token by authorizing through TDA online. Should be run if the auth-token.json does not exist or if the refresh token fails to authorize a new access token. Returns: access_token: A decoded access token as a string. """ # Check if SSL certs exists cert_path = os.path.join(os.path.dirname(self._token_path), 'certificate.pem') key_path = os.path.join(os.path.dirname(self._token_path), 'key.pem') if not os.path.isfile(cert_path): print('SSL certificate not found. Creating new certificate.') self.create_ssl_cert(cert_path, key_path) # Login online to receive an auth code auth_request = self._get_url('auth_request') webbrowser.open(url=auth_request) # Setup server to receive auth code with self.start_server(key_path, cert_path) as ssock: code_received = False while not code_received: conn, addr = ssock.accept() print('Accepting connection from {}'.format(addr)) with conn: data = conn.recv(1024) code, code_received = self.extract_code(data) # Send response with auth code to receive auth token headers, data = self._get_token_request(code) token = requests.post(self._get_url('token_request'), headers=headers, data=data) with open(self._token_path, 'w') as outfile: json.dump(token.json(), outfile) access_token = token.json()['access_token'] return access_token def _get_token_request(self, code: str) -> Tuple[Dict, Dict]: """ Helper that builds the headers and data for a token request. Args: code: The auth code sent by TDA after a successful authentication on the website. Returns: headers: The headers for a token request. data: The data for a token request. """ headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = { 'grant_type': 'authorization_code', 'access_type': 'offline', 'code': code, 'client_id': self._oauth_user_id, 'redirect_uri': self._redirect_uri } return headers, data def _get_url(self, request: str): """ Get the correct request URL for a given task. Args: request: A string specifying what type of request can be made. Supports: auth_request: Build the URL to initiate full flow auth by authenticating online. token_request: Build the URL used to ask for a token. Returns: url: The requested url """ url = urls.BASE_URL if request == 'auth_request': url = urls.AUTH_URL + urls.AUTH_QUERIES + \ quote(self._redirect_uri) + urls.CLIENT_ID_QUERY + \ quote(self._oauth_user_id) elif request == 'token_request': url += urls.TOKEN_REQUEST else: raise NotImplementedError('The requested url type {} is not supported.'.format(request)) return url @contextmanager def start_server(self, key_path: str, cert_path: str): """ Context manager that builds secure socket listening on the callback address. Args: key_path: path to key.pem file. cert_path: path to certificate.pem file. Yields: ssock: A secure socket listening on (self._host, self._port). """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self._host, self._port)) sock.listen() # Wrap the socket with SSL context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(cert_path, key_path) ssock = context.wrap_socket(sock, server_side=True) # Yield the secure socket yield ssock # Close the wrapper and the original socket when context finishes print('Shutting down server.') ssock.close() sock.close() @staticmethod def create_ssl_cert(cert_path: str, key_path: str) -> None: """ Creates .pem files used to encrypt a secure socket. Args: cert_path: Where to write a new certificate.pem. key_path: Where to write a new key.pem. """ try: os.makedirs(os.path.dirname(cert_path)) except FileExistsError: pass subj_flag = '/C=US/ST=CO/L=CO/OU=IT' ssl_cmd = 'openssl req -newkey rsa:2048 -nodes -keyout {} -x509 ' \ '-days 365 -out {} -subj {}'.format(key_path, cert_path, subj_flag) os.system(ssl_cmd) @staticmethod def extract_code(data: bytes) -> Tuple[str, bool]: """ Extracts the code passed by TDA from the response. If the response has the wrong form, returns nothing. Args: data: bytes from ssock.accept(). Returns: code: the auth code as a string. code_received: True if code was extracted successfully. """ code = '' code_received = False if data != b'': try: code = data.decode('utf-8').split('code=')[1] code = code.split(' ')[0] code = unquote(code) code_received = True except IndexError: pass return code, code_received PK!m9`\ \ tdameritrade_client/client.pyfrom typing import Dict, Type, TypeVar import requests from tdameritrade_client.auth import TDAuthenticator from tdameritrade_client.utils import urls from tdameritrade_client.utils.tools import check_auth # For typehint of the classmethod T = TypeVar('T', bound='TrivialClass') class TDClient(object): def __init__(self, acct_number: int, oauth_user_id: str, redirect_uri: str = 'http://127.0.0.1:8080', token_path: str = urls.DEFAULT_TOKEN_PATH): """ An object which executes requests on the TDA API. Args: acct_number: The account number to authenticate against. oauth_user_id: The oauth user ID of the TD developer app this client is authenticating against. redirect_uri: The redirect URI of the TD developer app this client is authenticating against. token_path: Path where the auth-token.json should be written. Defaults to $HOME/.tda_certs/auth-token.json. """ self._acct_number = acct_number self._redirect_uri = redirect_uri self._oauth_user_id = oauth_user_id.upper() self._token_path = token_path self.token = None ip = redirect_uri.split('/')[-1] host, port = ip.split(':') self._authenticator = TDAuthenticator(host, int(port), self._oauth_user_id, self._token_path) @classmethod def from_dict(cls: Type[T], acct_info: Dict) -> T: """ Create an instance of this class from a dictionary. Args: acct_info: A dictionary of init parameters Returns: An instance of this class """ return cls(**acct_info) def run_auth(self) -> None: """ Runs the authentication flow. See the TDAuthenticator class for details. """ self.token = self._authenticator.authenticate() @check_auth def get_positions(self) -> Dict: """ Requests the positions information of self._acct_number Returns: A json object containing the account positions information. """ reply = requests.get(self._get_url('positions'), headers=self._build_header()) # TODO handle exception on error return reply.json() def _get_url(self, type: str) -> str: """ Build the correct url to perform an API action. Args: type: What type of url to build. Supports: positions: Return account positions. Returns: The requested url. """ url = urls.BASE_URL if type == 'positions': url += urls.ACCOUNT_URL + str(self._acct_number) + urls.FIELDS + type else: raise NotImplementedError('URL type {} not supported.'.format(type)) return url @check_auth def _build_header(self) -> Dict: """ Builds auth header to include with all requests. Returns: The header object to use with requests """ return {'Authorization': 'Bearer ' + self.token} PK!`f;22%tdameritrade_client/utils/__init__.pyfrom tdameritrade_client.utils import tools, urls PK!>K"tdameritrade_client/utils/tools.pyimport functools from typing import Callable def check_auth(func: Callable) -> Callable: """ Decorator that ensures auth has been run before calling func Args: func: The decorated function """ @functools.wraps(func) def wrapper_check_auth(*args, **kwargs): if args[0].token is None: raise AssertionError('Cannot run {} before performing auth flow'.format(func.__name__)) value = func(*args, **kwargs) return value return wrapper_check_auth PK!I!tdameritrade_client/utils/urls.pyimport os from environs import Env env = Env() # Package urls PACKAGE_BASE = os.path.dirname(os.path.dirname(__file__)) DEFAULT_TOKEN_PATH = os.path.join(env('HOME'), '.tda_certs', 'auth-token.json') # TDAmeritrade urls AUTH_URL = 'https://auth.tdameritrade.com/auth' AUTH_QUERIES = '?response_type=code&redirect_uri=' CLIENT_ID_QUERY = '&client_id=' BASE_URL = 'https://api.tdameritrade.com' TOKEN_REQUEST = '/v1/oauth2/token' ACCOUNT_URL = '/v1/accounts/' FIELDS = '?fields=' REFRESH_FIELD = 'grant_type=refresh_token&refresh_token=' PK!*.22+tdameritrade_client-0.1.1.dist-info/LICENSECopyright (c) 2018 The Python Packaging Authority Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU)tdameritrade_client-0.1.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H;! ,tdameritrade_client-0.1.1.dist-info/METADATAUMo6WLK R>[@5CzvAa-s-\c?3AR %j͛gy=)Z%p^|Vz3O )gmqt,&0:ڂ_ ^ axw~ӥ#{8Υ/"JuZ.~-HB96J~.?Dɥ Ikc|˽T9L΋bp0z::,iÈB–@}|_' ԣZ98|Nz?GݗA_!4:Ũ3„_DQW:J,JU-+&ө ?9 n{+R+ _jq?x1Zzb#(^ !< pbh)}$} =L^0W[<6C:X735 H4ŞJǩhucnm\֞bC ~u}+e>Y{ UN:VhӢuOظR0G_sH 8`(ñޡl OTm4TN8g7 ;rq4a Bo`LJ͈QZj9K:kQX`) O}uF Of%Hb![ 9%*l}7׆ZPAehF$LAz%2[w)hJs>uRZ4py{݃B7+cHĬ!-4{PBll{F7 ^gG=/Qj7d?PK!H%f*tdameritrade_client-0.1.1.dist-info/RECORDK@}R,f!X ȦMA PO''M:8M>Fe %*PQ<үAP ";BگUEb牽z4֊*`/d-{fLCW%nNPps7<`^ȿ%?i^Q>(ɕ5S4ZT!̜I&S3"Eg5*Kx ) 0lx`Ӻ6c~R8^\SBoQl9FF}e w5ңʏ%wd!2;uiA([Tcmhpτ j'HF&s;ķ#uEC9yrog*|ʶBSڸ-&Ѳ,3AYe8 ˅x&ڻ'~6;ɽ6۵b֢ KwV%*8#03?#.lw0oPK!YWtdameritrade_client/__init__.pyPK![y""tdameritrade_client/auth.pyPK!m9`\ \ #tdameritrade_client/client.pyPK!`f;22%N0tdameritrade_client/utils/__init__.pyPK!>K"0tdameritrade_client/utils/tools.pyPK!I!3tdameritrade_client/utils/urls.pyPK!*.22+`5tdameritrade_client-0.1.1.dist-info/LICENSEPK!HڽTU)9tdameritrade_client-0.1.1.dist-info/WHEELPK!H;! ,v:tdameritrade_client-0.1.1.dist-info/METADATAPK!H%f*>tdameritrade_client-0.1.1.dist-info/RECORDPK 5A