PK!};typedtsv/__init__.py__version__ = '0.9.1' from typedtsv.typedtsv import dumps, loads, dump_list, load_list __all__ = ['dumps', 'loads', 'dump_list', 'load_list'] PK!NOOtypedtsv/typedtsv.pyfrom collections import OrderedDict from datetime import datetime, timedelta, timezone import distutils.util import json import re def loads(serialized_data): '''Load data from a file-like object: header_info, rows = loads(open('data.ttsv', 'r', newline='\\n')) header_info is an OrderedDict containing column name and type info rows is a list of lists that contain values''' line = serialized_data.readline().rstrip('\n') # throw away comment lines before the header line while line.startswith('#'): line = serialized_data.readline().rstrip('\n') header_info = load_header(line) rows = [] for line in serialized_data: # remove trailing newline, artifact of text file reading line = line.rstrip('\n') # only process non-empty rows; empty rows are not considered data if line and not line.startswith('#'): row = load_line(header_info, line) rows.append(row) return header_info, rows def load_list(serialized_data): '''Convenience method for loading a single column file into a list, rather than list of lists: header_info, values = loads(open('one_col_data.ttsv', 'r', newline='\\n')) header_info is a tuple (column_name, column_type) values is a list that contains values directly''' line = serialized_data.readline().rstrip('\n') # throw away comment lines before the header line while line.startswith('#'): line = serialized_data.readline().rstrip('\n') header_info = load_header(line) rows = [] for line in serialized_data: # remove trailing newline, artifact of text file reading line = line.rstrip('\n') # only process non-empty rows; empty rows are not considered data if line and not line.startswith('#'): row = load_line(header_info, line) rows.append(row[0]) return header_info, rows def load_header(line): header_info = OrderedDict() for column in line.split('\t'): col_name, _, col_type = column.partition(':') col_type = col_type if col_type else 'str' header_info[col_name] = col_type return header_info def load_line(header_info, line, as_dict=False): if as_dict: cols = OrderedDict() else: cols = [] ordered_keys = tuple(header_info.keys()) for i, val in enumerate(line.split('\t')): if val == 'null': parsed_value = None else: parsed_value = COL_PARSERS[header_info[ordered_keys[i]]](val) if as_dict: cols[header_info] = parsed_value else: cols.append(parsed_value) return cols def dumps(header_info, data, outfile): """Serialize a list of rows to a typed tsv file-like object header_info may be either a tuple/list of column names and the types will be inferred from the data or it may be a full OrderedDict whose keys are the column names and values are the column types data is a list of lists that contained the values to be serialized """ if type(header_info) in (list, tuple): header_info = header_info_types_from_row(header_info, data[0]) raw_header = dump_header(header_info) outfile.write(raw_header) outfile.write('\n') for row in data: raw_row = dump_line(header_info, row) outfile.write(raw_row) outfile.write('\n') def dump_list(header_info, data, outfile): """Serialize a list representing a single column to a typed tsv file-like object header_info may be either a column name and the type will be inferred from the data or it may be a tuple pair whose first element is the column names and second element is the column type or it may be an OrderedDict data is a list of values """ if type(header_info) == str: header_info = header_info_types_from_row((header_info,), [data[0]]) elif type(header_info) != OrderedDict: header_info = OrderedDict((header_info,)) raw_header = dump_header(header_info) outfile.write(raw_header) outfile.write('\n') for value in data: raw_row = dump_line(header_info, (value,)) outfile.write(raw_row) outfile.write('\n') def header_info_types_from_row(names, row): header_info = OrderedDict() default_type = PYTHON2TYPEDTSV['_'] for name, col in zip(names, row): header_info[name] = PYTHON2TYPEDTSV.get(type(col), default_type) return header_info def dump_header(header_info): cols = [] for col_name, col_type in header_info.items(): cols.append('%s:%s' % (col_name, col_type)) return '\t'.join(cols) def dump_line(header_info, row): ordered_keys = tuple(header_info.keys()) raw_cols = [] for i, col in enumerate(row): if col == None: raw_col = 'null' else: raw_col = COL_SERIALIZERS[header_info[ordered_keys[i]]](col) raw_cols.append(raw_col) return '\t'.join(raw_cols) def parse_str(raw_str): if raw_str == '\\null': return 'null' else: return SUB_DECODE_RE.sub(_sub_decode, raw_str) def dump_str(python_str): if python_str == 'null': return '\\null' else: return SUB_ENCODE_RE.sub(_sub_encode, python_str) SUB_ENCODE_RE = re.compile(r'\t|\n|\\|#') SUB_DECODE_RE = re.compile(r'\\t|\\n|\\#|\\\\') SUB_ENCODE = { '\t': '\\t', '\n': '\\n', '\\': '\\\\', '#': '\\#', } SUB_DECODE = { '\\\\': '\\', '\\t': '\t', '\\n': '\n', '\\#': '#', } def _sub_encode(matchobj): return SUB_ENCODE[matchobj.group(0)] def _sub_decode(matchobj): return SUB_DECODE[matchobj.group(0)] # date and time separator can be either 'T' or ' ' # time will be assumed to be 00:00:00.000 if not specified # when time is specified, seconds and milliseconds are optional (and will be assumed to be 00.000) # utc will be assumed if timezone not specified DATETIME_RE = re.compile( r'''(?P\d{1,4})-(?P\d{1,2})-(?P\d{1,2})''' r'''(?P