PK!kmc-c-aws_request_signer/__init__.pyimport base64 import datetime import hashlib import hmac import json from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit from typing import Dict, Optional, Tuple, List, Mapping, Any __all__ = ["AwsRequestSigner", "UNSIGNED_PAYLOAD"] UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD" CredentialScope = Tuple[str, str, str, str] class AwsRequestSigner: algorithm = "AWS4-HMAC-SHA256" def __init__( self, region: str, access_key_id: str, secret_access_key: str, service: str ) -> None: """ Create a new instance of the AwsRequestSigner. Use the sign_with_headers method to sign a request and get the authenication headers returned. Use the presign_url method to add the authentication query arguments to an existing URL. :param region: The AWS region to connect to. :param access_key_id: The AWS access key id to use for authentication. :param secret_access_key: The AWS secret access key to use for authentication. :param service: The AWS service to generate signatures for. """ self.region = region self.access_key_id = access_key_id self.secret_access_key = secret_access_key self.service = service def _get_credential_scope(self, timestamp: str) -> CredentialScope: """ Internal method. Generates a credential scope containing a datestamp, the region, the service and a marker. :param timestamp: The timestamp on which day the credential is valid. Should be in RFC1123Z format (20190122T170000Z) or just yyyymmdd. :return: A tuple containing the aforementioned credential scope. """ return timestamp[:8], self.region, self.service, "aws4_request" def _get_credential(self, credential_scope: CredentialScope) -> str: return "/".join((self.access_key_id,) + credential_scope) def _sign(self, credential_scope: CredentialScope, string_to_sign: str) -> str: """ Sign a string using the credentials and given scope. :param credential_scope: The credential scope as returned by _get_credential_scope. :param string_to_sign: The string to sign. :return: The hex-encoded signature. """ signing_key = ("AWS4" + self.secret_access_key).encode("utf-8") for element in credential_scope: signing_key = hmac.new( signing_key, element.encode("utf-8"), hashlib.sha256 ).digest() return hmac.new( signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 ).hexdigest() def _get_canonical_headers( self, host: str, headers: Mapping[str, str] ) -> List[Tuple[str, str]]: """ Get the canonical header representation for a host and a set of headers. This inserts the host header, lowercases all the provided header names and sorts them by codepoint. :param host: The host you will connect to. Used to generate a Host header. :param headers: A dictionary of headers. :return: The canonical header represenation as a list of key, value tuples. """ return sorted( { "host": host, **{key.lower(): value for key, value in headers.items()}, }.items() ) def _get_signed_headers(self, headers: List[Tuple[str, str]]) -> str: """ Get the signed headers representation of a set of canonical headers. :param headers: The canonical headers as returned by _get_canonical_headers. :return: The signed headers value. """ return ";".join([key for key, _ in headers]) def _get_request_signature( self, method: str, path: str, query: List[Tuple[str, str]], headers: List[Tuple[str, str]], signed_headers: str, content_hash: str, timestamp: str, credential_scope: CredentialScope, ) -> str: """ Generate a signature for a given request. :param method: The request method. :param path: The request path. :param query: The request's query string. :param headers: The request's canonical headers. :param signed_headers: The request's signed header list. :param content_hash: The has of the request's body or UNSIGNED_PAYLOAD. :param timestamp: The timestamp to apply to the signature. :param credential_scope: The credential scope as returned by _get_credential_scope. :return: The signature for the request. """ canonical_query = urlencode(sorted(query)) canonical_request = "\n".join( ( method, path, canonical_query, "\n".join("{}:{}".format(key, value) for key, value in headers), "", # Extra newline after canonical headers. signed_headers, content_hash, ) ) string_to_sign = "\n".join( ( self.algorithm, timestamp, "/".join(credential_scope), hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(), ) ) return self._sign(credential_scope, string_to_sign) def sign_with_headers( self, method: str, url: str, headers: Optional[Mapping[str, str]] = None, content_hash: Optional[str] = None, ) -> Dict[str, str]: """ Get the required signature headers to perform a signed request. :param method: The request method to use. :param url: The full URL to access. :param headers: Any request headers you want to sign as well. :param content_hash: The SHA256 hash of the request body or `UNSIGNED_PAYLOAD`. Can be `None` if performing a GET request. :return: A dictionary containing the headers required to sign this request. """ parsed_url = urlsplit(url) if headers is None: headers = {} if content_hash is None: if method == "GET": content_hash = hashlib.sha256(b"").hexdigest() else: raise ValueError( "content_hash must be specified for {} request".format(method) ) timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") extra_headers = {"x-amz-content-sha256": content_hash, "x-amz-date": timestamp} canonical_headers = self._get_canonical_headers( parsed_url.netloc, {**headers, **extra_headers} ) signed_headers = self._get_signed_headers(canonical_headers) credential_scope = self._get_credential_scope(timestamp) signature = self._get_request_signature( method, parsed_url.path, parse_qsl(parsed_url.query), canonical_headers, signed_headers, content_hash, timestamp, credential_scope, ) credential = self._get_credential(credential_scope) authorization_header = ( "{algorithm} " "Credential={credential}, " "SignedHeaders={signed_headers}, " "Signature={signature}" ).format( algorithm=self.algorithm, credential=credential, signed_headers=signed_headers, signature=signature, ) return {**extra_headers, "Authorization": authorization_header} def presign_url( self, method: str, url: str, headers: Optional[Mapping[str, str]] = None, content_hash: Optional[str] = None, expires: int = 86400, ) -> str: """ Generate a pre-signed URL. These URLs contain all the required signature parameters in the query string and have controlled expiration. :param method: The request method to use. :param url: The full URL to access. :param headers: Any request headers you want to sign as well. :param content_hash: The SHA256 hash of the request body or `UNSIGNED_PAYLOAD`. Can be `None` if performing a GET request. :param expires: The duration (in seconds) the URL should be valid. At least 1, at most 604800. :return: The URL with the required signature query arguments appended. """ parsed_url = urlsplit(url) if headers is None: headers = {} if content_hash is None: if method == "GET": content_hash = hashlib.sha256(b"").hexdigest() else: raise ValueError( "content_hash must be specified for {} request".format(method) ) timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") canonical_headers = self._get_canonical_headers(parsed_url.netloc, headers) signed_headers = self._get_signed_headers(canonical_headers) credential_scope = self._get_credential_scope(timestamp) credential = self._get_credential(credential_scope) query = parse_qsl(parsed_url.query, True) query.extend( ( ("X-Amz-Algorithm", self.algorithm), ("X-Amz-Content-Sha256", content_hash), ("X-Amz-Credential", credential), ("X-Amz-Date", timestamp), ("X-Amz-Expires", str(expires)), ("X-Amz-SignedHeaders", signed_headers), ) ) signature = self._get_request_signature( method, parsed_url.path, query, canonical_headers, signed_headers, content_hash, timestamp, credential_scope, ) query.append(("X-Amz-Signature", signature)) return urlunsplit( ( parsed_url.scheme, parsed_url.netloc, parsed_url.path, urlencode(query), parsed_url.fragment, ) ) def sign_s3_post_policy(self, policy: Dict[str, Any]) -> Dict[str, str]: """ Sign an S3 POST policy as used by browser based uploads. :param policy: The POST policy to sign. :return: All the required POST field to use the policy. """ assert ( self.service == "s3" ), "Signing POST policies only applies to the S3 service." required_keys = {"expiration", "conditions"} if policy.keys() & required_keys != required_keys: raise ValueError( "POST policy should contain expiration and conditions keys" ) timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") credential_scope = self._get_credential_scope(timestamp) credential = self._get_credential(credential_scope) policy_json = json.dumps(policy).encode("utf-8") encoded_policy = base64.b64encode(policy_json).decode("utf-8") signature = self._sign(credential_scope, encoded_policy) return { "policy": encoded_policy, "x-amz-algorithm": self.algorithm, "x-amz-credential": credential, "x-amz-date": timestamp, "x-amz-signature": signature, } PK!o&aws_request_signer/requests.pyimport hashlib import requests.auth from aws_request_signer import UNSIGNED_PAYLOAD, AwsRequestSigner __all__ = ["AwsAuth"] class AwsAuth(requests.auth.AuthBase): def __init__( self, region: str, access_key_id: str, secret_access_key: str, service: str ) -> None: """ Intialize the authentication helper for requests. Use this with the auth argument of the requests methods, or assign it to a session's auth property. :param region: The AWS region to connect to. :param access_key_id: The AWS access key id to use for authentication. :param secret_access_key: The AWS secret access key to use for authentication. :param service: The service to connect to (f.e. `'s3'`). """ self.request_signer = AwsRequestSigner( region, access_key_id, secret_access_key, service ) def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: if isinstance(request.body, bytes): content_hash = hashlib.sha256(request.body).hexdigest() else: content_hash = UNSIGNED_PAYLOAD assert isinstance(request.method, str) assert isinstance(request.url, str) auth_headers = self.request_signer.sign_with_headers( request.method, request.url, request.headers, content_hash ) request.headers.update(auth_headers) return request PK!QQ*aws_request_signer-1.0.0.dist-info/LICENSEThe MIT License (MIT) Copyright © 2019 Ingmar Steen Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU(aws_request_signer-1.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hb+aws_request_signer-1.0.0.dist-info/METADATAWn?O1U~D2l'{Q ^[[RvQ5FHwfhY- }~gHJrnkԀmr.\x\H/{?)ɇD2SC.7gկr*WNIen[zn½t< KUX`O*.Mz\Ag}f+r.byWLֱ\MY +_+'t^LtkQVJj\60GmxxJR+5\񳢰A-'WnYYeW%&z:}y7yoN=@?z+|2l ~ߏNqݩt~]o5'Z؃eO"oL:W't _HD_T{ﮇNior)v\lu޹ɽ}o-:}pm0W_˚g~iwL/8e&O(-M*-H֔,y o~֖Bz=OU'HDti@2Pħk.1ecו_J繜sKDz5,}jJyr Ŧw# ݤbx;xk- D<5LRSU*0S}W8Wc^;۸jcR!W>rԓVx?;?O&.h9]Mwi1PV]x{|-Nq<~0\ADȡt!oQjM ]RfsK?9o55:%7z#+8sl"1~~mǰ˟W-=[xC4QU'F澡`͹[8sJ ٽV'w޶S-OgPF'U9A"J>n~H(u*oP&  g EQNqfک̉;_:D8$y#Ȕr0-)ґ% 'bOk穙/bksTj 4,#u+WMݎS74ݼ[(olޅB/`m-ˮƿd*TP9܇,(W_=d2ċU &q5˨5*}4^a!$^g1lNݭ.a 3*E籱[Hjil}GA"B'f,ShjX9` z6JK̰^|_Cߏ%%> KedV5;U%%FU(8#m U S7 ??q؃iȠdCj d h[D i "Dd<0C"% ͌GCtƎ@Хe6w؉h:PȌ}8YH.{u)TR+ B]߬vz2㯩;27ߖhm`\^