PK!ldnsdb/__init__.py# -*- coding: utf-8 -*- __title__ = 'dnsdb' __version__ = '0.2.3' __author__ = 'Gabriel Iovino' __license__ = 'MIT' __copyright__ = 'Copyright (C) 2019 Gabriel Iovino' from .dnsdb import Dnsdb from .dnsdb import Result PK!A8gf'f'dnsdb/dnsdb.py# -*- coding: utf-8 -*- """ Python client for Farsight Security's DNSDB API Farsight Security DNSDB is a database that stores and indexes both the passive DNS data available via Farsight Security's Security Information Exchange as well as the authoritative DNS data that various zone operators make available. DNSDB makes it easy to search for individual DNS RRsets and provides additional metadata for search results such as first seen and last seen timestamps as well as the DNS bailiwick associated with an RRset. DNSDB also has the ability to perform inverse or rdata searches Farsight DNSDB API Documentation https://api.dnsdb.info/ INITIALIZE EXAMPLE:: from dnsdb import Dnsdb api_key="12345" dnsdb = Dnsdb(api_key) SIMPLE USAGE EXAMPLES::: result = dnsdb.search(name="fsi.io") result = dnsdb.search(name="mail.fsi.io", inverse=True) result = dnsdb.search(ip="104.244.14.108") result = dnsdb.search(ip="104.244.14.0/24") result = dnsdb.search(ip="2620:11c:f008::108") result = dnsdb.search(hexadecimal="36757a35") result = dnsdb.search(name="fsi.io", type="A") result = dnsdb.search(name="farsightsecurity.com", bailiwick="com.") result = dnsdb.search(name="fsi.io", wildcard_left=True) result = dnsdb.search(name="fsi", wildcard_right=True) result = dnsdb.search(name="fsi.io", sort=False) result = dnsdb.search(name="fsi.io", remote_limit=150000, return_limit=1000) result = dnsdb.search(name="fsi.io", time_last_after=1514764800) result = dnsdb.search(name="fsi.io", epoch=True) result = dnsdb.search(name="fsi.io", cache=True) result = dnsdb.search(name="fsi.io", cache=True, cache_timeout=900) result = dnsdb.search(name="fsi.io", cache=True, cache_location="/tmp/dnsdb-cache") result = dnsdb.quota() print(result.records) print(result.status_code) print(result.error) print(result.quota) print(result.cached) """ import json import gzip import requests from diskcache import Cache from dnsdb import utils class Dnsdb: """ A dnsdb object for the Farsight Security DNSDB API """ def __init__( self, api_key=None, server="https://api.dnsdb.info", cache=False, cache_location="/tmp/dnsdb-cache", cache_timeout=900, ): """ :param api_key: string (required) :param server: string (optional: default='https://api.dnsdb.info') :param cache: boolean (optional) enable caching of dnsdb results to disk :param cache_location: string (optional: default='/tmp/dnsdb-cache') directory to store cached results :param cache_timeout: integer (optional: default=900) seconds until the cached result expires :return: object EXAMPLE USAGE::: client = dnsdb.Client(api_key) """ self.api_key = api_key self.server = server self.cache = cache self.cache_location = cache_location self.cache_timeout = cache_timeout if api_key is None: raise Exception("You must supply a DNSDB API key.") def search( self, name=None, ip=None, hexadecimal=None, type="ANY", bailiwick=None, wildcard_left=None, wildcard_right=None, inverse=False, sort=True, return_limit=10000, remote_limit=50000, epoch=False, time_first_before=None, time_first_after=None, time_last_before=None, time_last_after=None, ): """ A method of the DNSDB Class to search the DNSDB API. :param name: string (required) fully qualified domain name :param ip: string IPv4 or IPv6 address, CIDR notation is valid :param hexadecimal: string hexadecimal digits specifying a raw octet string :param type: string (optional: default="ANY") dns resource record types (ANY, A, MX, SIG, etc) :param bailiwick: string (optional: default=None) a label in a fqdn, not valid for inverse queries :param wildcard_left: Boolean (optional: default=None) wildcard search to the left of a dot in a domain name :param wildcard_right: Boolean (optional: default=None) wildcard search to the right of a dot in a domain name :param inverse: boolean (optional: default=False) search for names resolving to names (e.g. MX, NS, CNAME, etc) only valid when used with name :param sort: boolean (optional: default=True) :param return_limit: integer (optional: default=10000) :param remote_limit: integer (optional: default=50000) :param epoch: boolean (optional: default=False) :param time_first_before: :param time_first_after: :param time_last_before: :param time_last_after: :return: Object """ options = dict() options["name"] = name options["ip"] = ip options["hex"] = hexadecimal options["type"] = type options["bailiwick"] = bailiwick options["wildcard_left"] = wildcard_left options["wildcard_right"] = wildcard_right options["inverse"] = inverse options["sort"] = sort options["return_limit"] = return_limit options["remote_limit"] = remote_limit options["epoch"] = epoch options["time_first_before"] = time_first_before options["time_first_after"] = time_first_after options["time_last_before"] = time_last_before options["time_last_after"] = time_last_after options["api_key"] = self.api_key options["server"] = self.server options["cache"] = self.cache options["cache_location"] = self.cache_location options["cache_timeout"] = self.cache_timeout options = utils.pre_process(options) uri = utils.build_uri(options) if options["cache"] is True: cache = Cache(options["cache_location"]) cached_result = cache.get(uri) if cached_result: data = json.loads(gzip.decompress(cached_result).decode("utf-8")) results = Result( records=data["records"], status_code=data["status_code"], error=data["error"], quota=data["quota"], cached=True, ) else: results = _query(options, uri) if results.status_code == 200 or results.status_code == 404: compressed = Result.to_compressed(results) cache.set(uri, compressed, expire=options["cache_timeout"]) else: results = _query(options, uri) if results.status_code == 200: results = utils.post_process(options, results) return results return None def quota(self): """ Query DNSDB API for the current quota of the given API key :return: object """ options = dict() options["api_key"] = self.api_key options["server"] = self.server path = "/lookup/rate_limit" uri_parts = (options["server"], path) uri = "".join(uri_parts) results = _query(options, uri, quota=True) return results class Result: """ A object to store the results of a DNSDB Search and related meta data. """ def __init__( self, records=None, status_code=None, error=None, quota=None, cached=None ): """ :param records: list of dictionaries :param status_code: integer DNSDB status code :param error: dictionary DNSDB error message :param quota: dictionary DNSDB quota information :param cached: boolean """ self.status_code = status_code self.records = records self.error = error self.quota = quota self.cached = cached def to_dict(self): """ Return the object as a dictionary :return: dictionary """ data = dict( status_code=self.status_code, records=self.records, error=self.error, quota=self.quota, cached=self.cached, ) return data def to_json(self): """ Return the object as a JSON string :return: string """ data = Result.to_dict(self) return json.dumps(data) def to_compressed(self): """ Return the object as a gzipped JSON string :return: bytes """ encoded = Result.to_json(self).encode("utf-8") compressed = gzip.compress(bytes(encoded)) return compressed def _query(options, uri, quota=False): """ An internal HTTP function to query DNSDB API :param uri: string :param quota: boolean (default: False) :return: object """ results = Result() error = dict() error.update({"code": None, "message": None}) headers = {"Accept": "application/json", "X-API-Key": options["api_key"]} resp = requests.get(uri, headers=headers, stream=True) results.status_code = resp.status_code results.quota = utils.get_quota(response_headers=resp.headers) results.cached = False if resp.status_code == 200: records = [] if quota is True: response = resp.json() results.quota = utils.get_quota(rate_limit=response["rate"]) return results for line in resp.iter_lines(): if line: decoded_line = line.decode("utf-8") records.append(json.loads(decoded_line)) results.records = records else: error["code"] = resp.status_code if resp.content: error["message"] = resp.content.decode("utf-8").rstrip() else: error["message"] = "Unavailable" results.error = error return results PK!3A%A%dnsdb/utils.py# -*- coding: utf-8 -*- """ Utility functions needed by the DNSDB module """ from dateutil.parser import parse def build_uri(options): """ Build URI for DNSDB API query :param options: Dictionary :return: String """ path = build_path(options) uri = build_parameters(options, path) return uri def build_path(options): """ Build the URI path needed to query the DNSDB API :param options: Dictionary :return: string """ if options["name"]: if options["inverse"]: path = "/lookup/rdata/name/{}/{}".format(options["name"], options["type"]) return path else: path = "/lookup/rrset/name/{}/{}".format(options["name"], options["type"]) if options["bailiwick"]: path += "/{}".format(options["bailiwick"]) return path return path elif options["ip"]: options["ip"] = options["ip"].replace("/", ",") path = "/lookup/rdata/ip/{}".format(options["ip"]) return path elif options["hex"]: path = "/lookup/rdata/raw/{}".format(options["hex"]) return path else: raise LookupError("name, ip, or hex was not specified") def build_parameters(options, path): """ Build the URI parameters needed to query the DNSDB API :param options: Dictionary :param path: String :return: String """ time_filters = { "time_first_before": options["time_first_before"], "time_first_after": options["time_first_after"], "time_last_before": options["time_last_before"], "time_last_after": options["time_last_after"], } server_limit = "?limit={}".format(options["remote_limit"]) uri_parts = [options["server"], path, server_limit] for key, value in time_filters.items(): if value: uri_parts.append("&{}={}".format(key, value)) uri = "".join(uri_parts) return uri def post_process(options, result): """ Post processing of records; supports: 1. converting epoch to 8601 2. sorting of records by last seen 3. Limiting the number of results returned :param options: Dictionary :param result: Result Object :return: list (of dictionaries) """ records = normalize(result.records) return_limit = options["return_limit"] if options["sort"]: records = sort(records) if not options["epoch"]: records = epoch_to_timestamp(records) result.records = records[0:return_limit] return result def normalize(records): """ Normalize result by removing the zone_time_first and zone_time_last keys and adding a source [sensor or zone] key pair. :param records: List (of dictionaries) :return: List (of dictionaries) """ normalized = [] for record in records: normalized_record = dict() normalized_record["source"] = "sensor" keys = record.keys() for key in keys: if key == "zone_time_first": normalized_record["time_first"] = record[key] normalized_record["source"] = "zone" elif key == "zone_time_last": normalized_record["time_last"] = record[key] normalized_record["source"] = "zone" else: normalized_record[key] = record[key] normalized.append(normalized_record) return normalized def sort(records): """ Function to sort records by time_last :param records: List (of dictionaries) :return: List (of dictionaries) """ from operator import itemgetter sorted_results = sorted(records, key=itemgetter("time_last"), reverse=True) return sorted_results def epoch_to_timestamp(records): """ Convert epoch timestamps to ISO 8601 (2015-01-04T09:30:21Z) :param records: List (of dictionaries) :return: List (of dictionaries) """ from datetime import datetime for record in records: timestamp_keys = ["time_first", "time_last"] for key in timestamp_keys: if key in record: record[key] = datetime.fromtimestamp(record[key]).isoformat() + "Z" return records def validate_options(options): """ Validate wildcard options :param options: Dictionary :return: Dictionary """ name = options["name"] wildecard_left = options["wildcard_left"] wildcard_right = options["wildcard_right"] if wildecard_left and wildcard_right: raise Exception( "wildcard_left and wildcard_right cannot be used " "simultaneously" ) if name: if wildecard_left or wildcard_right: name = validate_wildcard(name, wildecard_left, wildcard_right) options["name"] = name return options def validate_wildcard(name, wildcard_left, wildcard_right): """ A function to initiate the validation of wildcard_left or wildcard_right queries :param name: String :param wildcard_left: Boolean :param wildcard_right: Boolean :return: String """ if wildcard_left: return validate_wildcard_left(name) if wildcard_right: return validate_wildcard_right(name) return name def validate_wildcard_left(name): """ Validate a name query using the wildcard_left option and add the correct wildcard syntax if needed :param name: String :return: String """ if name[-1] == "*": raise Exception( "Wildcard left lookup cannot end with an asterisk on " "the right " "side" ) # Correct wildcard syntax, do nothing if name[0] == "*" and name[1] == ".": return name # Missing asterisk, add if name[0] == ".": return "*" + name # Missing asterisk and dot, add return "*." + name def validate_wildcard_right(name): """ Validate a name query using the wildcard_right option and add the correct wildcard syntax if needed :param name: String :return: String """ if name[0] == "*": raise Exception( "Wildcard right lookup cannot start with an asterisk " "on the " "left side" ) # Correct wildcard syntax, do nothing if name[-1] == "*" and name[-2] == ".": return name # Missing asterisk, add if name[-1] == ".": return name + "*" # Missing asterisk and dot, add return name + ".*" def get_quota(response_headers=None, rate_limit=None): """ Function to normalize rate limit information into a consistent data structure One of the two optional named arguments must be passed to get actual results :param response_headers: dictionary (optional) :param rate_limit: dictionary (optional) :return: dictionary """ rate = dict() rate.update( { "reset": None, "results_max": None, "expires": None, "limit": None, "remaining": None, } ) if rate_limit: for key in rate.keys(): rate[key] = rate_limit.get(key, None) return normalize_rate(rate) elif response_headers: rate["limit"] = response_headers.get("X-RateLimit-Limit", None) rate["reset"] = response_headers.get("X-RateLimit-Reset", None) rate["remaining"] = response_headers.get("X-RateLimit-Remaining", None) rate["expires"] = response_headers.get("X-RateLimit-Expires", None) return normalize_rate(rate) else: return rate def normalize_rate(rate): """ Function to change any string 'n/a' values in rate limit information to None values. :param rate: dictionary :return: dictionary """ for key in rate.keys(): if rate[key] == "n/a": rate[key] = None return rate def pre_process(options): """ Function to initial the pre-processing of specified options :param options: dictionary :return: dictionary """ options = validate_options(options) if options["epoch"] is False: options = parse_date(options) return options def parse_date(options): """ Function to convert human readable date / time to epoch time :param options: dictionary :return: dictionary """ options_time = [ "time_first_before", "time_first_after", "time_last_before", "time_last_after", ] for time_field in options_time: if options[time_field]: dt = parse(options[time_field]) options[time_field] = int(dt.timestamp()) return options def debug(result): """ Function to debug output using the console / ipython. USAGE::: from dnsdb.utils import debug r = dnsdb.search(name="www.fsi.io") debug(r) :param result: object :return: text: stdout """ if result.status_code: print("Status Code: {}".format(result.status_code)) else: print("Status Code: None") if result.error: print("Error: {}".format(result.error)) else: print("Error: None") if result.quota: print("Quota: {}".format(result.quota)) else: print("Quota: None") if result.cached: print("Cached: {}".format(result.cached)) else: print("Cached: None") if result.records: print("Records exist: True") print("Number of records: {}".format(len(result.records))) else: print("Records exist: False") PK!Xm//dnsdb-0.2.3.dist-info/LICENSEMIT License Copyright (c) 2019 Gabriel Iovino Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUdnsdb-0.2.3.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hnu{dnsdb-0.2.3.dist-info/METADATAWmS8_ Bnbo 0s}a e:lr%voWvBӻ,W_Sn(-d Ajwo_&-R #ժ[O@JsCʽXnLj*5얁>/gAk-+u]vӈ34)n~A'a ߲Q 6\~" o{|d9ܔ QݞQS|p_}^IM}"w ^0JO19b»CBUMNx*-U=VO++H)&R٨ <֍T؛=!ڜMs)i8yRNXF ]0եFFe0 T_o֮^1PZ|vvjy+xal-؎vv:J co*KBhZ~[JtpG`[wv'nS^S;4,%.XfND1ejh2\,9g郫C_zS:kȜ-;~FU@ٵ-"l`J̸@Utwqф$OCAI B]ey/왃IJ]>C]O aPF_PK!ldnsdb/__init__.pyPK!A8gf'f' dnsdb/dnsdb.pyPK!3A%A%(dnsdb/utils.pyPK!Xm// Ndnsdb-0.2.3.dist-info/LICENSEPK!HڽTUuRdnsdb-0.2.3.dist-info/WHEELPK!Hnu{Sdnsdb-0.2.3.dist-info/METADATAPK!HhRXdnsdb-0.2.3.dist-info/RECORDPKZ