PK!- c""binance_downloader/__init__.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Kline downloader for Binance API""" import sys from logbook import StreamHandler, TimedRotatingFileHandler from .util import ensure_dir __version__ = "0.3" # Log to file (date-based) LOG_FILENAME = "./logs/bd_applog.log" ensure_dir(LOG_FILENAME) TimedRotatingFileHandler(LOG_FILENAME, bubble=True).push_application() # Log to stdout for info and above StreamHandler(sys.stdout, level="NOTICE", bubble=True).push_application() PK!eebinance_downloader/__main__.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Execute the main cli function""" from binance_downloader import cli cli.main() PK!C++binance_downloader/api.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Classes for interacting with the Binance API""" from multiprocessing.pool import ThreadPool from typing import Optional, Tuple import pandas as pd from logbook import Logger from tqdm import tqdm from binance_downloader import binance_utils, db, util from binance_downloader.db import Kline class KlineFetcher(object): """Convenience class to fetch all klines within a given range (in parallel)""" REQ_LIMIT = 1000 def __init__( self, interval: str, symbol: str, start_date, end_date, logger=None, max_per_second=1, ): if logger is None: self.log = Logger(__name__.split(".", 1)[-1]) self.symbol = symbol if not interval or interval not in binance_utils.KLINE_INTERVALS: raise ValueError(f"'{interval}' not a valid Binance kline interval.") self.interval = interval self.interval_ms = binance_utils.interval_to_milliseconds(interval) self.interval_td = binance_utils.interval_to_timedelta(interval) self.start_time, self.end_time = self._fill_dates(start_date, end_date) self.kline_df: Optional[pd.DataFrame] = None self.rate_limiter = util.rate_limited(max_per_second) @property def ohlcv(self): if self.kline_df is None: raise ValueError("Must call fetch_parallel() first") ohlcv = self.kline_df[ :, [ Kline.OPEN_TIME, Kline.OPEN, Kline.HIGH, Kline.LOW, Kline.CLOSE, Kline.VOLUME, ], ] return ohlcv.set_index(Kline.OPEN_TIME) def fetch_parallel(self) -> None: """Fetch klines in specified range from Binance API. Splits the requested range up into chunks and processes them in parallel, while respecting the API rate limits. """ # Create list of all start and end timestamps ranges = self._get_chunk_ranges() if not ranges: self.log.warn( f"There are no klines for {self.symbol} at {self.interval} intervals " f"on Binance between {pd.to_datetime(self.start_time, unit='ms')} " f"and {pd.to_datetime(self.end_time, unit='ms')}" ) return # Check if any needed chunks aren't already cached needed_ranges = self._uncached_ranges(ranges) if not needed_ranges: self.log.notice("All requested chunks already cached") return # At least some chunks actually need to be downloaded self.log.notice(f"Downloading {len(needed_ranges)} chunks...") # Create workers for all needed requests and create iterator pool = ThreadPool() rate_limited_fetch = self.rate_limiter(self._fetch_chunks) results = pool.imap(rate_limited_fetch, needed_ranges) pool.close() result_df = pd.DataFrame() # Show progress meter with tqdm(total=len(needed_ranges), desc="Download ", unit=" chunk") as pbar: for chunk in results: pbar.update(1) result_df = pd.concat([result_df, chunk], ignore_index=True) pool.join() # Block until all workers are done self.kline_df = ( result_df.drop_duplicates(Kline.OPEN_TIME) .sort_values(Kline.OPEN_TIME) .reset_index(drop=True) ) self.log.info(f"Download complete for {len(self.kline_df)} klines from API") def write_to_csv(self): """Write k-lines retrieved from Binance into a csv file""" data_frame = db.range_from_hdf( self.symbol, self.interval, self.start_time, self.end_time ) if data_frame is None or data_frame.empty: self.log.notice( f"Not writing CSV: no data between {self.start_time} and {self.end_time}" ) return db.to_csv(data_frame, self.symbol, self.interval) def write_to_hdf(self): """If data was retrieved from API, will write to local cache (HDF5) Future calls for same data will be retrieved from cache instead of making an unneeded call to the API. """ if self.kline_df is None or self.kline_df.empty: self.log.notice("Not writing to .h5 since no data was received from API") return db.to_hdf(self.kline_df, self.symbol, self.interval) def _uncached_ranges(self, desired_ranges): cached_df = db.from_hdf(self.symbol, self.interval) if cached_df is None or cached_df.empty: return desired_ranges # Need all cached_df.set_index(Kline.OPEN_TIME, inplace=True) uncached_ranges = [] for chunk_range in desired_ranges: start, end = [util.from_ms_utc(timestamp) for timestamp in chunk_range] expected_rows = 1 + (end - start) // self.interval_td tolerance = self.interval_td / 2 same_start = len(cached_df.loc[start - tolerance : start + tolerance]) > 0 same_end = len(cached_df.loc[end - tolerance : end + tolerance]) > 0 same_rows = len(cached_df.loc[start:end]) == expected_rows if not (same_start and same_end and same_rows): uncached_ranges.append(chunk_range) num_cached = len(desired_ranges) - len(uncached_ranges) self.log.notice(f"Found {num_cached} chunks already cached") return uncached_ranges def _get_chunk_ranges(self): # Get [(chunk_start_ms, chunk_end_ms)] for all 1000-kline chunks needed # to fill the requested (or clamped) period ranges = [] period_start = self._get_valid_start() period_end = self._get_valid_end() if period_start > self.start_time: self.log.notice( "First available kline starts on {from_ms_utc(period_start)}" ) if period_start >= period_end: # No valid ranges due to later available start time, so return early return [] chunk_start = chunk_end = period_start chunk_span = (self.REQ_LIMIT - 1) * self.interval_ms while chunk_end < period_end: chunk_end = min(chunk_start + chunk_span, period_end) ranges.append((chunk_start, chunk_end)) # Add overlap to ensure we don't miss any of the range if Binance # screwed up some of their data chunk_start = chunk_end - self.interval_ms * 10 return ranges def _get_valid_end(self): # End date cannot be later than current time end = min(self.end_time, util.date_to_milliseconds("now")) # Subtract one interval from the end since it's really a start time end -= self.interval_ms return end def _get_valid_start(self): # Get earliest possible kline (may be later than desired start date) earliest = binance_utils.earliest_valid_timestamp(self.symbol, self.interval) start = max(self.start_time, earliest) return start def _fill_dates(self, start: Optional[int], end: Optional[int]) -> Tuple[int, int]: # Get interval (in milliseconds) for limit * interval # (i.e. 1000 * 1m = 60,000,000 milliseconds) span = self.REQ_LIMIT * self.interval_ms if start and end: self.log.info("Found start and end dates. Fetching full interval") return start, end if start: # No end date, so go forward by 1000 intervals self.log.notice( f"Found start date but no end: fetching {self.REQ_LIMIT} klines" ) end = start + span elif end: # No start date, so go back 1000 intervals self.log.notice( f"Found end date but no start. Fetching previous {self.REQ_LIMIT} klines" ) start = end - span else: # Neither start nor end date. Get most recent 1000 intervals self.log.notice( f"Neither start nor end dates found. Fetching most recent {self.REQ_LIMIT} klines" ) end = util.date_to_milliseconds("now") start = end - span return start, end def _fetch_chunks(self, chunk_range: Tuple[int, int]): start, end = chunk_range # In milliseconds # Get results from API into a pandas DataFrame result_list = binance_utils.get_klines( self.symbol, self.interval, start=start, end=end ) data_frame = pd.DataFrame(result_list, columns=list(Kline)) # Set data types correctly for col in list(Kline): if col in [Kline.OPEN_TIME, Kline.CLOSE_TIME]: data_frame[col] = util.from_ms_utc(data_frame[col]) else: data_frame[col] = pd.to_numeric(data_frame[col]) return self._clean_data(data_frame, chunk_range) def _clean_data(self, data_frame, chunk_range): start, end = chunk_range # Sometimes Binance shifts off the bottom of the minute; normalize to minutes clean_start = util.from_ms_utc(start).replace(second=0, microsecond=0) expected_rows = 1 + (end - start) // self.interval_ms # Temporarily index off of open_time to allow for reindexing data_frame.set_index(Kline.OPEN_TIME, inplace=True) # Generate new indexes to account for missing data or klines that don't start # on the bottom of the minute new_idx = [clean_start + i * self.interval_td for i in range(expected_rows)] if data_frame.empty: # Return a data frame with full range of open_time, but NaN for all others return data_frame.reindex(index=new_idx).reset_index() new_df = data_frame.reindex( index=new_idx, tolerance=self.interval_td / 2, method="nearest", limit=1 ) return new_df.reset_index() # Other code is expecting an integer index PK!'&&#binance_downloader/binance_utils.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Utility functions that are specific to the Binance API""" from typing import List, Optional import pandas as pd import requests from logbook import Logger from binance_downloader import util log = Logger(__name__.split(".", 1)[-1]) BASE_URL = "https://api.binance.com/api/v1" KLINE_URL = BASE_URL + "/klines" KLINE_INTERVALS = ( "1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "3d", "1w", "1M", ) EXCHANGE_INFO_FILE = "exchange_info.json" EARLIEST_TIMESTAMPS_FILE = "earliest_timestamps.json" def max_request_freq(req_weight: int = 1) -> float: """Get smallest allowable frequency for API calls. The return value is the maximum number of calls allowed per second :param req_weight: (int) weight assigned to this type of request Default: 1-weight :return: float of the maximum calls permitted per second """ # Pull Binance exchange metadata (including limits) either from cached value, or # from the server if cached data is too old request_limits = _req_limits(get_exchange_info()) for rate in request_limits: # Convert JSON response components (e.g.) "5" "minutes" to a Timedelta interval = pd.Timedelta(f"{rate['intervalNum']} {rate['interval']}") # Frequency (requests/second) is, e.g., 5000 / 300 sec or 1200 / 60 sec rate["req_freq"] = int(rate["limit"]) / interval.total_seconds() max_allowed_freq = None for limit in request_limits: # RAW_REQUESTS type should be treated as a request weight of 1 weight = req_weight if limit["rateLimitType"] == "REQUEST_WEIGHT" else 1 this_allowed_freq = limit["req_freq"] / weight if max_allowed_freq is None: max_allowed_freq = this_allowed_freq else: max_allowed_freq = min(max_allowed_freq, this_allowed_freq) log.info( f"Maximum permitted request frequency for weight {req_weight} is " f"{max_allowed_freq} / sec" ) return 0 if max_allowed_freq is None else max_allowed_freq def _req_limits(exchange_info: dict) -> List: return [ rate for rate in (exchange_info["rateLimits"]) if "REQUEST" in rate["rateLimitType"] ] def get_exchange_info(force_update=False) -> dict: """Pull `exchange_info` data from local cache (if fresh) or Binance API otherwise :param force_update: if True, will always pull from the Binance API instead of first attempting to pull from cache :return: dict with the API response """ prev_json = util.json_from_cache(EXCHANGE_INFO_FILE) if not force_update else {} if prev_json: cache_time = util.from_ms_utc(prev_json.get("serverTime", 0)) age = pd.Timestamp("now", tz="utc") - cache_time if age <= pd.Timedelta("1 day"): log.info(f"Using cached exchange info (age={age})") return prev_json log.notice("Fetching new exchange info from API") response = requests.get(BASE_URL + "/exchangeInfo") _validate_api_response(response) data = response.json() if not isinstance(data, dict): raise ConnectionError("No exchange info returned from Binance") # Write out to disk for next time util.json_to_cache(data, EXCHANGE_INFO_FILE) return data def interval_to_milliseconds(interval) -> Optional[int]: """Try to get milliseconds from an interval input :param interval: (str, pandas.Timedelta, int) Interval in one of several types. Attempt to convert this value into milliseconds for a Binance API call :return: (int) milliseconds of the interval if successful, otherwise None """ if isinstance(interval, pd.Timedelta): return int(interval.total_seconds() * 1000) if isinstance(interval, int): log.info(f"Assuming interval '{interval}' is already in milliseconds") return interval # Try to convert from a string seconds_per_unit = {"m": 60, "h": 60 * 60, "d": 24 * 60 * 60, "w": 7 * 24 * 60 * 60} try: return int(interval[:-1]) * seconds_per_unit[interval[-1]] * 1000 except (ValueError, KeyError): return None def interval_to_timedelta(interval) -> Optional[pd.Timedelta]: """Convert a string Binance kline interval to a pandas.Timedelta :param interval: string matching one of the allowed kline intervals :return: pandas.Timedelta representing the interval if valid, otherwise None """ msec = interval_to_milliseconds(interval) return None if msec is None else pd.Timedelta(msec, unit="ms") def get_klines(symbol, interval, start=None, end=None, limit=1000) -> List: """Helper function to get klines from Binance for a single request :param symbol: (str) Symbol pair of interest (e.g. 'XRPBTC') :param interval: (str) Valid kline interval (e.g. '1m'). :param start: (int, str, pandas.Timestamp) First kline open time desired. If int, should be in milliseconds since Epoch. If string or pandas.Timestamp, will assume UTC unless otherwise specified. :param end: (int, str, pandas.Timestamp) Last kline open time desired. If int, should be in milliseconds since Epoch. If string or pandas.Timestamp, will assume UTC unless otherwise specified. :param limit: (int) Maximum number of klines to fetch. Will be clamped to 1000 if higher due to current maximum Binance limit. A value <= 0 will be assumed as no limit, and 1000 will be used. :return: List[List]] Returns a list of klines in list format if successful (may be empty list) """ if not isinstance(symbol, str): raise ValueError(f"Cannot get kline for symbol {symbol}") if not isinstance(start, int) and start is not None: start = util.date_to_milliseconds(start) if not isinstance(end, int) and end is not None: end = util.date_to_milliseconds(end) if not limit or (1 > limit > 1000): log.warn(f"Invalid limit ({limit}), using 1000 instead") limit = 1000 # Set parameters and make the request params = {"symbol": symbol, "interval": interval, "limit": limit} if end is not None: params["endTime"] = end if start is not None: params["startTime"] = start response = requests.get(KLINE_URL, params=params) # Check for valid response _validate_api_response(response) return response.json() def _validate_api_response(response): if response.status_code in [429, 418]: raise ConnectionError(f"Rate limits exceeded or IP banned: {response.json()}") elif response.status_code // 100 == 4: raise ConnectionError(f"Request error: {response.json()}") elif response.status_code // 100 == 5: raise ConnectionError(f"API error, status is unknown: {response.json()}") elif response.status_code != 200: raise ConnectionError(f"Unknown error on kline request: {response.json()}") def earliest_valid_timestamp(symbol: str, interval: str) -> int: """Get the first open time for which Binance has symbol/interval klines :param symbol: (str) Symbol pair of interest (e.g. 'XRPBTC') :param interval: (str) Valid kline interval (e.g. '1m') :return: timestamp (in milliseconds) for the open time of the first available kline """ if interval not in KLINE_INTERVALS: raise ValueError(f"{interval} is not a valid kline interval") # Check for locally cached response identifier = f"{symbol}_{interval}" prev_json = util.json_from_cache(EARLIEST_TIMESTAMPS_FILE) if prev_json: # Loaded JSON from disk, check if we already have this value: timestamp = prev_json.get(identifier, None) if timestamp is not None: log.info( f"Found cached earliest timestamp for {identifier}: " f"{util.from_ms_utc(timestamp)}" ) return timestamp log.info(f"No cached earliest timestamp for {identifier}, so fetching from server") # This will return the first recorded k-line for this interval and symbol kline = get_klines(symbol, interval, start=0, limit=1) # Get just the OpenTime value (timestamp in milliseconds) earliest_timestamp = int(kline[0][0]) # Cache on disk prev_json[identifier] = earliest_timestamp util.json_to_cache(prev_json, EARLIEST_TIMESTAMPS_FILE) log.info(f"Wrote new data to {EARLIEST_TIMESTAMPS_FILE} for {identifier}") return earliest_timestamp PK!  binance_downloader/cli.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """CLI to parse arguments and run appropriate API calls""" import argparse from logbook import Logger from binance_downloader import api, util, binance_utils log = Logger(__name__.split(".", 1)[-1]) def main(): """Parse arguments, download data from API, write to cache and CSV""" log.info("*" * 80) log.info("***" + "Starting CLI Parser for binance-downloader".center(74) + "***") log.info("*" * 80) parser = argparse.ArgumentParser( description="CLI for downloading Binance Candlestick (k-line) data in bulk" ) parser.add_argument("symbol", help="(Required) Binance symbol pair, e.g. ETHBTC") parser.add_argument( "interval", help="(Required) Frequency interval in minutes(m); hours(h); days(d); weeks(w); months(M);" " All possibles values: 1m 3m 5m 15m 30m 1h 2h 4h 6h 8h 12h 1d 3d 1w 1M", ) parser.add_argument( "--start", help="Start date to get data (inclusive). Format: yyyy/mm/dd" ) parser.add_argument( "--end", help="End date to get data (exclusive). Format: yyyy/mm/dd" ) # Allow to choose MM/DD/YYYY for date input parser.add_argument( "--dtfmt", metavar="DATE_FORMAT", help="Format to use for dates (DMY, MDY, YMD, etc). Default: YMD", default="YMD", ) args = parser.parse_args() if args.dtfmt: if args.dtfmt in ["DMY", "MDY", "YMD"]: date_format = args.dtfmt else: log.warn(f"Date format given ({args.dtfmt}) not known. Using YMD") date_format = "YMD" else: date_format = "YMD" if args.start: start_date = util.date_to_milliseconds(args.start, date_format=date_format) else: start_date = None if args.end: end_date = util.date_to_milliseconds(args.end, date_format=date_format) else: end_date = None symbol = str(args.symbol) interval = str(args.interval) max_per_second = binance_utils.max_request_freq(1) binance = api.KlineFetcher( interval, symbol, start_date, end_date, max_per_second=max_per_second ) binance.fetch_parallel() binance.write_to_hdf() binance.write_to_csv() PK!D&#&#binance_downloader/db.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Save or load data with different file types""" import os from collections import namedtuple from typing import Optional import numpy as np import pandas as pd from logbook import Logger from tqdm import tqdm from binance_downloader import util log = Logger(__name__.split(".", 1)[-1]) _BASE_DATA_DIR = "./downloaded/" KlineCols = namedtuple( "KlineCols", [ "OPEN_TIME", "OPEN", "HIGH", "LOW", "CLOSE", "VOLUME", "CLOSE_TIME", "QUOTE_ASSET_VOLUME", "NUMBER_OF_TRADES", "TAKER_BY_BAV", "TAKER_BY_QAV", "IGNORED", ], ) Kline = KlineCols( "open_time", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_by_qav", "taker_by_bav", "ignored", ) def from_hdf(symbol: str, interval: str) -> Optional[pd.DataFrame]: """Try to load a DataFrame from .h5 store for a given symbol and interval :param symbol: Binance symbol pair, e.g. `ETHBTC` for the klines to retrieve :param interval: Binance kline interval to retrieve :return: Pandas.DataFrame with requested data if found, otherwise None """ file_name = _get_file_name(symbol, interval, ext="h5", with_ts=False) if not os.path.isfile(file_name): log.info(f"{file_name} does not exist, returning None") return None # Python will complain if we just use interval name, since it goes against naming # conventions of not starting a variable name with a number interval_key = f"interval_{interval}" with pd.HDFStore(file_name, mode="r") as store: if interval_key not in store: log.info(f"{symbol}/{interval} data not stored in HDF at {file_name}") return None try: data_frame = store.get(interval_key) except (KeyError, AttributeError): log.notice(f"Corrupted/missing data for {symbol}/{interval} at {file_name}") return None else: return data_frame def range_from_hdf(symbol, interval, start, end) -> Optional[pd.DataFrame]: """Lookup a symbol/interval in local cache and return all values between start/end :param symbol: Binance symbol pair, e.g. `ETHBTC` for the klines to retrieve :param interval: Binance kline interval to retrieve :param start: Start date, either as timestamp (millisecond, UTC) or datetime-like :param end: End date, either as timestamp (millisecond, UTC) or datetime-like :return: pandas.DataFrame with requested data if found, otherwise None """ if isinstance(start, int): start = util.from_ms_utc(start) if isinstance(end, int): end = util.from_ms_utc(end) full_df = from_hdf(symbol, interval).set_index(Kline.OPEN_TIME, drop=False) return full_df.loc[start:end] def to_csv(data_frame: pd.DataFrame, symbol: str, interval: str, show_progress=True): """Write a pandas DataFrame out to CSV :param data_frame: DataFrame to be written :param symbol: Binance symbol pair, e.g. `ETHBTC` for the klines to retrieve :param interval: Binance kline interval to retrieve :param show_progress: Show a progress bar while writing? Default True :return: None """ if data_frame is None or data_frame.empty: log.notice(f"Not writing CSV: empty DataFrame") return f_name = _get_file_name(symbol, interval, ext="csv", with_ts=True) log.info(f"Writing CSV output to {f_name}") csv_opts = {"index": False, "float_format": "%.9f"} if not show_progress: data_frame.to_csv(f_name, **csv_opts) else: # Split data into chunks to allow for updating progress bar num_chunks = 100 chunks = np.array_split(data_frame.index, num_chunks) bar_params = {"total": num_chunks, "desc": "Write CSV", "unit": " pct"} # tqdm handles the progress bar display automatically for i, chunk in tqdm(enumerate(chunks), **bar_params): if i == 0: # For the first chunk, create file and write header data_frame.loc[chunk].to_csv(f_name, mode="w", **csv_opts, header=True) else: # For subsequent chunks, append and don't write header data_frame.loc[chunk].to_csv(f_name, mode="a", **csv_opts, header=False) log.notice(f"Done writing {f_name} for {len(data_frame)} lines") def to_hdf(data_frame: pd.DataFrame, symbol: str, interval: str, force_merge=False): """Store kline data to HDF5 store. This function will try to avoid rewriting data if it already exists, since merging, de-duplicating, sorting, and re-indexing can be expensive. :param data_frame: DataFrame with the (possibly) new klines :param symbol: Binance symbol (used for file name generation) :param interval: Binance kline interval (used for key lookup in the HDF5 store) :param force_merge: default is False. If True, will merge, de-duplicate, sort, and re-index even if it's likely all data is already contained in the HDF5 store. :return: None """ # Ensure there is something to save and we know where to save at if data_frame is None or data_frame.empty: log.notice("Cannot save to HDF file since the supplied DataFrame was empty") return if not symbol or not interval: log.error("Cannot save to HDF file without both symbol and interval specified") return file_name = _get_file_name(symbol, interval, ext="h5", with_ts=False) with pd.HDFStore(file_name, "a") as store: key = f"interval_{interval}" if key not in store: store.put(key, data_frame, format="table") log.info(f"Adding interval {key} to {symbol} HDF5 file") return # Check whether given data is already stored old_df = store.get(key) has_start = ( old_df[Kline.OPEN_TIME].iloc[0] <= data_frame[Kline.OPEN_TIME].iloc[0] ) has_end = ( old_df[Kline.CLOSE_TIME].iloc[-1] >= data_frame[Kline.CLOSE_TIME].iloc[-1] ) same_length = len(old_df) == len(data_frame) matching_data = has_start and has_end and same_length if not matching_data or force_merge: new_df = ( pd.concat([old_df, data_frame], ignore_index=True) .drop_duplicates(Kline.OPEN_TIME) .sort_values(Kline.OPEN_TIME) .reset_index(drop=True) ) store.put(key, new_df, format="table") log.notice(f"Merged cache: {len(old_df)} lines -> {len(new_df)} lines") else: log.info(f"No new data not already contained in HDF5 store") def _get_file_name(symbol: str, interval: str, ext: str, with_ts: bool = True): """Get an appropriate storage file path and name based on data being stored and format :param symbol: Binance symbol pair, e.g. `ETHBTC` for the klines being stored :param interval: Binance kline interval for data being stored, e.g. `1m` :param ext: desired file extension, e.g. .csv or .h5 :param with_ts: if True, current timestamp will be prepended to the filename Default: True :return: string representation of the file path """ # Normalise the file extension if ext[0] != ".": ext = f".{ext}" if ext == ".h5": # HDF files will store all intervals in the same file (different keys) file_name = f"{symbol}{ext}" else: # CSV (and other formats) will create separate file for different intervals file_name = f"{symbol}_{interval}{ext}" if with_ts: timestamp = pd.Timestamp("now").strftime("%Y-%m-%d_%H%M%S") file_name = f"{timestamp}_{file_name}" return os.path.join(_BASE_DATA_DIR, file_name) PK!`binance_downloader/util.py# MIT License # # Copyright (C) 2018 Anson VanDoren # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons # to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next paragraph) shall # be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE # FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. """Utilities that are not specific to Binance API""" import json import os import threading import time from functools import wraps from typing import Dict, Optional import dateparser import pandas as pd import pytz from logbook import Logger CACHE_DIR = "cache/" log = Logger(__name__.split(".", 1)[-1]) def rate_limited(max_per_second): """Prevents the decorated function from being called more than `max_per_second` times per second, locally, for one process """ lock = threading.Lock() min_interval = 1.0 / max_per_second def decorate(func): last_time_called = time.perf_counter() @wraps(func) def rate_limited_function(*args, **kwargs): with lock: nonlocal last_time_called elapsed = time.perf_counter() - last_time_called left_to_wait = min_interval - elapsed if left_to_wait > 0: time.sleep(left_to_wait) last_time_called = time.perf_counter() return func(*args, **kwargs) return rate_limited_function return decorate def ensure_dir(file_path) -> None: """Convenience function to make a folder if the path doesn't already exist :param file_path: fully qualified file path :return: None """ directory = os.path.dirname(file_path) if not os.path.exists(directory): os.makedirs(directory) def json_from_cache(file_name: str) -> Optional[Dict]: """Try to read JSON in from a given filename in a pre-defined folder :param file_name: desired file name. Appropriate folder will be prepended :return: JSON (as a dict) if file is present, otherwise None """ json_path = os.path.join(CACHE_DIR, file_name) try: with open(json_path, "r") as cache_file: return json.load(cache_file) except IOError: log.notice(f"Error reading JSON from {json_path}") return None def json_to_cache(new_json: Dict, file_name: str) -> None: """Write some JSON to disk in a pre-defined folder :param new_json: JSON to cache :param file_name: file name in which to cache (will be overwritten) Appropriate folder is prepended to the file name :return: None """ json_path = os.path.join(CACHE_DIR, file_name) ensure_dir(json_path) with open(json_path, "w") as outfile: json.dump(new_json, outfile, ensure_ascii=False) def from_ms_utc(binance_time: int) -> pd.Timestamp: """Convert Binance timestamps (milliseconds) to a datetime-like representation :param binance_time: integer number of milliseconds since epoch :return: pandas.Timestamp representing the integer timestamp """ return pd.to_datetime(binance_time, unit="ms", utc=True) def date_to_milliseconds(date_str, date_format="YMD") -> int: """Convert a date-like string to milliseconds since epoch :param date_str: string representing a date :param date_format: format order for the date. Defaults to YMD (e.g., 2018-01-30) :return: milliseconds since epoch """ epoch = pd.Timestamp(0, tz="utc") to_date = dateparser.parse(date_str, settings={"DATE_ORDER": date_format}) if to_date is None: raise ValueError(f"Unable to parse valid date from '{date_str}'") if to_date.tzinfo is None or to_date.tzinfo.utcoffset(to_date) is None: to_date = to_date.replace(tzinfo=pytz.utc) return int((to_date - epoch).total_seconds() * 1000.0) PK!H؀9=3binance_downloader-0.2.1.dist-info/entry_points.txtN+I/N.,()KMKKN)y9)Ez9Vy\\PK!NN*binance_downloader-0.2.1.dist-info/LICENSEMIT License Copyright (C) 2018 Anson VanDoren Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice (including the next paragraph) shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU(binance_downloader-0.2.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hlz+binance_downloader-0.2.1.dist-info/METADATAXnϧ8ER@DK[rvJxGHp9C)j}>Id)ZBsswk1܏PBfCy|HS,n$WY"Y ]X꒴n"2NeQ•{MF J<_:oeݜ͡5:Wnw.t\NP])KȒϺvy vD3 חo|Eyε "uMx8!KhdrFe1,cL$C6yMX\+٫w'L)1|fB$C!tM/1[r$*Fʀ9J;BވWqAr1 ZX.[XrYǔ#D,Dę1)'Y<[C'3븛q={PjI00{$*x 2꧵,\431*ʬrr>3dsXzۜM'B{.dluljZ?*r qvG.[ ͽ{L&)fˈ|uhp,䞐Hl[q1aOYxs׽Y@I1[?Z~)[s.i |C\M"ZY9Ҙ5Dn{Nath3Vf%ZeYF SD7CGM*scJ2E\j2H6!ČN5 Q9 +-t`58'i.gk*X~J"aױ`W ېj}T0rQ9sζ9C4Ι,EޯqPgtL?mM ƅmb d 9뺔aA:S_9 3`vؿԻ [ҎGO @^̓)=ݻg|udt;tnl#=K5G77ףg~RtFcΗK{Aptb^WCQժ'Ԩ˙(:ĽG'5ol*H p̡ZiK8 /kՊ@|Z+ܥ[bպ6#PפS:J);^Lc:xGԏ(XQp8UVA-ydL?Ř5fppAx'ʕ0ד xto;9O^l6E[ةPeCzZ::z'/PhcjZ=a;doO|)8+@UrmsPK!Ha_G)binance_downloader-0.2.1.dist-info/RECORDMw0} X |.fQD@A맳sʢ,7ɽ eQLuiֿFEvHskn$p~Pn)m*&䬱< ,?f6:sGC4֎{hd 9_,DZKl'QwN8x9 AW9kyCrD>W gJRC,J\8(~A"Ɣn H(E%Uץ%Ex8ͰLx ZΆu,4y׏Y~Ai9t"MG^TiS&{e[RKg?|~aԥ @ &3%1KF,&ifŭU@:o^zmp=9