PK!v{!!snucovery/__init__.pyimport pyjq from snucovery.cli import Arguments from snucovery.aws import AwsServices from snucovery.excel import ExcelWorkbooks def main(): # Get arguments passed from the command line args = Arguments().get_args() # Instantiate an Aws session based on the profile name aws = AwsServices(args.profile) # Scan all services that are specified in aws.service_mappings items = aws.scan_services() # Create a new workbook based on the workbook name excel = ExcelWorkbooks(args.workbook_name) service_mappings = aws.get_service_mappings() # Iterate through the aws service mappings for service in service_mappings: for service_attr, jq_filter in service_mappings[service].items(): # Format the service_attr into something more appealing since this # is used for setting the worksheets name service_name = service_attr.replace('describe_', '').replace('_', ' ') excel.create_worksheet(service_name) # Leverage `pyjq` to iterate through a valid aws service response # and parse the details into what we need. `jq_filter` should be # a valid `jq` filter string filtered_items = pyjq.all(jq_filter, items[service][service_attr]) try: excel.create_headers_from_dict(service_name, filtered_items[0]) excel.add_rows_to_worksheet_from_json(service_name, filtered_items) except IndexError: pass excel.close() if __name__ == '__main__': main() PK!p`11snucovery/aws.pyimport json import boto3 from datetime import date, datetime from snucovery.errors import ( UnknownServiceCall, InvalidServiceMappings, ) def json_serial(obj): """JSON serializer for objects not serializable by default json code""" if isinstance(obj, (datetime, date)): return obj.isoformat() raise TypeError("Type %s not serializable" % type(obj)) class AwsServices: def __init__(self, profile): """Object for collecting Aws Assets Describe multiple services within an Aws profile. Args: profile (str): Aws profile name found in `~/.aws/credentials` Self Params: profile (str): profile name session (obj): Aws Session active_service (str): default('ec2') client (obj): Aws Service Client Examples: >>> from aws import AwsService >>> aws = AwsServices(profile='test') >>> print(aws.scan_services()) """ self.profile = profile self.session = self.get_session() self.active_service = 'ec2' self.client = self.get_service_client() self.service_mappings = { 'ec2': { 'describe_instances': ".Reservations[].Instances[] | {Name: (.Tags[]|select(.Key==\"Name\")|.Value), InstanceId, InstanceType, Region: .Placement.AvailabilityZone, LaunchTime, PrivateDnsName, PrivateIpAddresses: [.NetworkInterfaces[].PrivateIpAddresses[].PrivateIpAddress], PublicIpAddress}", 'describe_vpcs': ".Vpcs[] | {Name: (.Tags[]|select(.Key==\"Name\")|.Value), VpcId, CidrBlock}" }, 'elb': { 'describe_load_balancers': ".LoadBalancerDescriptions[] | {LoadBalancerName, DNSName}" }, 'rds': { 'describe_db_instances': ".DBInstances[] | {DBName, AvailabilityZone, DBInstanceIdentifier, DBInstanceClass, Engine}" }, 'elasticache': { 'describe_cache_clusters': ".CacheClusters[] | {CacheClusterId, CacheNodeType, Engine, PreferredAvailabilityZone}" } } def set_session(self, profile_name): """Set Aws Session Instantiates an Aws Session based on the `profile_name`. The `profile_name` must exist within the `~/.aws/credentials` file. Args: profile_name (str): Aws profile name Returns: Object :: Boto3.session ( profile_name ) """ self.session = boto3.session.Session(profile_name=profile_name) if self.session: return self.session def set_service_client(self, service): """Instantiate a boto3.client based on the current session Args: service (str): Valid boto3.client() parameters. eg: ['ec2', 'elb', 'rds'] Returns: Object :: boto3.client object """ self.client = self.get_session().client(service) return self.client def get_session(self): """Get the current boto3 session Attempts to return the current session, if it fails, it will set a new session based on the current `profile` Returns: Object :: boto3.session object """ try: return self.session except AttributeError: return self.set_session(self.profile) def get_service_client(self): """Get the current Aws service client object Attempts to return the current boto3 client. If it's not set, it will create a new service client based on the `active_service` Returns: Object :: boto3.client object """ try: return self.client except AttributeError: return self.set_service_client(self.active_service) def get_service_mappings(self): """Get the aws service mappings THIS IS NOT THE FINAL FORM Need to think about this but it attempts to return the current service mappings. Raises an error if invalid. The mapping works as follows: boto3.client('ec2').describe_instances() service_mappings = { : { } } boto3.client().() Examples: >>> from aws import AwsService >>> aws = AwsServices(profile='test') >>> aws.service_mappings = { ... 'ec2': { ... 'describe_instances': ".Reservations[].Instances[] | {Name: (.Tags[]|select(.Key==\"Name\")|.Value), InstanceId, InstanceType, Region: .Placement.AvailabilityZone, LaunchTime, PrivateDnsName, PrivateIpAddresses: .NetworkInterfaces[].PrivateIpAddresses[].PrivateIpAddress, PublicIpAddress}", ... 'describe_vpcs': ".Vpcs[] | {Name: (.Tags[]|select(.Key==\"Name\")|.Value), VpcId, CidrBlock}" ... } ... } ... >>> aws.get_service_mappings() """ if self.service_mappings: return self.service_mappings raise InvalidServiceMappings def scan_services(self): """Scan all aws services based on service_mappings Returns a ordered dictionary of each mapped service Returns: Dict :: Dictionary of all mapped aws services """ service_response = dict() for service in self.get_service_mappings(): self.set_service_client(service) for service_attr in self.service_mappings[service]: service_items = self.scan_service(service_attr) if service_items: try: service_response[service].update( { service_attr: service_items } ) except KeyError: service_response[service] = dict() service_response[service].update( { service_attr: service_items } ) return service_response def scan_service(self, service_attr, service=None): """Scan a specific aws service and service_attr Scan service based on the service_attr. The service attr is the service clients method. eg: boto3.client('ec2').describe_instances() Args: Required: service_attr (str): String version of a boto3.client('ec2') method Optional: service (str): default(None): If passed, will set a new session client """ if service: self.set_service_client(service) try: return json.loads( json.dumps( getattr(self.get_service_client(), service_attr)(), default=json_serial ) ) except AttributeError: raise UnknownServiceCall PK!3Zw77snucovery/cli.pyimport argparse class Arguments: def __init__(self): """Aws Asset Discovery Discovery command line arguments. """ self.parser = argparse.ArgumentParser( description="Aws Asset Discovery tool that exports to an Excel Workbook", ) self.command_line_arguments() def command_line_arguments(self): """Command line arguments setter Note: Returns nothing since it only modifies the argparse object """ self.parser.add_argument( '-p', '--profile', help='An Aws Profile Name that can be set or found in `~/.aws/credentials`', required=True ) self.parser.add_argument( '-w', '--workbook-name', help='Name of the Excel Workbook that will be created', required=True ) def get_args(self): """Get arguments passed from command line execution Returns: Arguments passed from command line """ return self.parser.parse_args() PK!snucovery/errors.py class UnknownServiceCall(Exception): def __init__(self): Exception.__init__(self, "Boto3 service function does not exist.") class InvalidServiceMappings(Exception): def __init__(self): Exception.__init__(self, "AwsService is missing a valid service mappings.") class EmptyServiceMappings(Exception): def __init__(self): Exception.__init__(self, "AwsService service mappings are empty.") class InvalidWorksheetName(Exception): def __init__(self): Exception.__init__(self, "Worksheet name is not a valid worksheet name.") class WorksheetNotExists(Exception): def __init__(self): Exception.__init__(self, "Specified worksheet name does not exist.") PK!""snucovery/excel.pyimport xlsxwriter from snucovery.errors import InvalidWorksheetName, WorksheetNotExists class ExcelWorkbooks: def __init__(self, workbook_name): """Create Excel Workbooks with tabbed sheets Full Excel workbook with formatted sheets similar subject data Args: workbook_name (str): Desired filename of the workbook to be created Self: workbook_name (str): Valid xlsx filename, `workbook_name`.xlsx workbook (obj): xlsxwriter.Workbook() object. worksheets (dict): Dict of worksheets to be created in the workbook """ self.workbook_name = self.get_workbook_name(workbook_name) self.workbook = self.get_workbook() self.worksheets = dict() def create_workbook(self, workbook_name=None): """Instantiate a new Workbook to work on This creates the parent workbook that will be used for generating the workbook object. Args: workbook_name (str) Optional: Specify a new workbook name or fall back on self.workbook_name Returns: Object :: xlsxwriter.Workbook() """ if not workbook_name: workbook_name = self.workbook_name self.workbook = xlsxwriter.Workbook(self.get_workbook_name(workbook_name)) return self.workbook def create_worksheet(self, worksheet_name): """Create a new worksheet with a specified worksheet_name When a new worksheet is created, it automatically adds it to the worksheets dict by calling self.add_new_worksheet(). Args: worksheet_name (str): name of the worksheet to be created and added to the workbook Returns: Object """ if not worksheet_name: raise InvalidWorksheetName worksheet = self.get_workbook().add_worksheet(worksheet_name.title()) self.add_new_worksheet(worksheet_name, worksheet) return worksheet def add_new_worksheet(self, worksheet_name, worksheet): """Update self.worksheets dict with the worksheet name and object Args: worksheet_name (str): name of the worksheet worksheet (obj): worksheet object to associate with the name Returns: Object :: worksheet object """ self.worksheets.update({ worksheet_name: worksheet }) return worksheet def set_workbook_name(self, workbook_name): """Returns a valid workbook name with the proper extension This will take `filename` or `filename.xlsx` and return a valid filename that contains the `.xlsx` file extension Args: workbook_name (str): workbook name Returns: String :: Valid filename for the excel workbook """ if not '.xlsx' in workbook_name[-5:]: workbook_name = f"{workbook_name}.xlsx" return workbook_name def get_workbook_name(self, workbook_name): """Reurns a valid workbook filename This might need to be reworked. Attempts to return self.workbook_name but if it doesnt exist, it will set the name based on the passed `workbook_name` Args: workbook_name (str): workbook name Returns: String :: Valid filename for the excel workbook """ try: if not workbook_name: return self.workbook_name except AttributeError: pass self.workbook_name = self.set_workbook_name(workbook_name) return self.workbook_name def get_workbook(self): """Returns the current workbook or creates a new one if not exists Try and return the current instantiated workbook, if it doesnt exist, it will create a new workbook based on the instantiated `workbook_name` Returns: Object :: workbook object """ try: return self.workbook except AttributeError: return self.create_workbook() def get_worksheets(self): """Return worksheets dict Returns: Dict :: dict of all created worksheets """ return self.worksheets def get_worksheet(self, worksheet_name): """Return a worksheet by name if it exists or raise exception Args: worksheet_name (str): worksheet name to return Returns: Try to return the worksheet if it exists or raise exception for WorksheetNotExists """ worksheets = self.get_worksheets() if worksheet_name in worksheets: return worksheets[worksheet_name] raise WorksheetNotExists def set_header_formatting(self): """Returns bold formatting for headers Bolden the headers of the workbook Returns: Object :: workbook.add_format() """ return self.get_workbook().add_format({'bold': True}) def create_header(self, worksheet_name, header, col, width, sort=True): """Create a header in a specific column on row 0 and set col width This creates a header on the first row of a worksheet. It also sets the width of the column and if `sort` is True, make the columns sortable. Args: worksheet_name (str): name of the worksheet to add the header to header (str): string to set the cell text to for the header col (int): which column to add the header to width (int): width of the column sort (bool) default=True: Make columns sortable """ worksheet = self.get_worksheet(worksheet_name) worksheet.set_column(col, col, width) worksheet.write(0, col, header, self.set_header_formatting()) if sort: worksheet.autofilter(0, 0, 0, col) def create_headers_from_dict(self, worksheet_name, header_dict): """Create all headers for a worksheet based on a dict This iterates through the dictionary and sets a key for the cell header and creates a width based on the key or value, depending on what the value is. Args: worksheet_name (str): name of the worksheet to create the headers on header_dict (dict): key: value dict where the key is the header """ col = 0 for key, value in header_dict.items(): width = self.get_header_width(key, value) self.create_header(worksheet_name, key, col, width) col += 1 def get_header_width(self, key, value): """Get the width of the header based on the key or the value The width of the header gets set based on key if a value isnt set. If a value exists, it will set the width to +10 of the value width. If value is a list(), it will set the width to twice the width of a string plus 10. Args: key (str): key of the headers value (str or list): value Returns: Int :: width to set the header to """ width = len(key) + 10 if value: width = len(value) + 10 if isinstance(value, list): value = ', '.join(value) width = len(value) + (len(value) * 2) + 10 return width def add_row_to_worksheet(self, worksheet, row_data, row_counter): """Add a new row to the worksheet Iterate through a row of data in dict format and starting at col == 0, increment through the items and place the value in the specified cell Args: worksheet (str): name of a valid worksheet row_data (dict): dictionary of key: values where the key is the header and the value is contents """ col_counter = 0 for value in row_data.values(): if isinstance(value, list): value = ", ".join(value) worksheet.write(row_counter, col_counter, value) col_counter += 1 def add_rows_to_worksheet_from_json(self, worksheet_name, worksheet_data): """Take a JSON doc and fill a worksheet with the data Pass a dictionary and iterate through it to fill the desired worksheet Args: worksheet_name (str): valid worksheet name worksheet_data (dict): dictionary or json doc to iterate through """ worksheet = self.get_worksheet(worksheet_name) row_counter = 1 for row_data in worksheet_data: self.add_row_to_worksheet(worksheet, row_data, row_counter) row_counter += 1 def close(self): """Close the workbook to save the file""" self.get_workbook().close() PK!H0BP*snucovery-0.1.2.dist-info/entry_points.txtN+I/N.,()*,I-.\xWq^ir~YjQ-YoPK!h8,,$snucovery-0.1.2.dist-info/LICENSE.mdCopyright [2018] [Johnny Martin] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!H\TTsnucovery-0.1.2.dist-info/WHEEL 1 0 нR \I$ơ7.ZON `h6oi14m,b4>4ɛpK>X;baP>PK!Hݰw "snucovery-0.1.2.dist-info/METADATAOK@sTh&K-Vmc5-ݝwSx6fHb#H輴Cβ9xV]H!X[ f6f iZJRPY"