PK!z&,00gtfstk/__init__.pyfrom .constants import * from .helpers import * from .calendar import * from .routes import * from .shapes import * from .stops import * from .stop_times import * from .trips import * from .miscellany import * from .cleaners import * from .validators import * from .feed import * __version__ = "9.4.0" PK!12P P gtfstk/calendar.py""" Functions about calendar and calendar_dates. """ import dateutil.relativedelta as rd from typing import List, TYPE_CHECKING from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def get_dates(feed: "Feed", *, as_date_obj: bool = False) -> List[str]: """ Return a list of dates for which the given "Feed" is valid, which could be the empty list if the "Feed" has no calendar information. Parameters ---------- feed : "Feed" as_date_obj : boolean If ``True``, then return the dates as ``datetime.date`` objects; otherwise return them as strings Returns ------- list Dates """ dates = [] if feed.calendar is not None and not feed.calendar.empty: if "start_date" in feed.calendar.columns: dates.append(feed.calendar["start_date"].min()) if "end_date" in feed.calendar.columns: dates.append(feed.calendar["end_date"].max()) if feed.calendar_dates is not None and not feed.calendar_dates.empty: if "date" in feed.calendar_dates.columns: start = feed.calendar_dates["date"].min() end = feed.calendar_dates["date"].max() dates.extend([start, end]) if not dates: return [] start_date, end_date = min(dates), max(dates) start_date, end_date = map(hp.datestr_to_date, [start_date, end_date]) num_days = (end_date - start_date).days result = [ start_date + rd.relativedelta(days=+d) for d in range(num_days + 1) ] # Convert dates back to strings if required if not as_date_obj: result = [hp.datestr_to_date(x, inverse=True) for x in result] return result def get_first_week(feed: "Feed", *, as_date_obj: bool = False) -> List[str]: """ Return a list of date corresponding to the first Monday--Sunday week for which this feed is valid. If the given feed does not cover a full Monday--Sunday week, then return whatever initial segment of the week it does cover, which could be the empty list. Parameters ---------- feed : "Feed" as_date_obj : boolean If ``True``, then return the dates as ``datetime.date`` objects; otherwise return them as strings Returns ------- list Dates """ dates = feed.get_dates(as_date_obj=True) if not dates: return [] # Get first Monday monday_index = None for (i, date) in enumerate(dates): if date.weekday() == 0: monday_index = i break if monday_index is None: return [] result = [] for j in range(7): try: result.append(dates[monday_index + j]) except: break # Convert to date strings if requested if not as_date_obj: result = [hp.datestr_to_date(x, inverse=True) for x in result] return result def restrict_dates(feed: "Feed", dates: List[str]) -> List[str]: """ Given a "Feed" and a date (YYYYMMDD string) or list of dates, coerce the date/dates into a list and drop the dates not in ``feed.get_dates()``, preserving the original order of ``dates``. Intended as a helper function. """ # Coerce string to set if isinstance(dates, str): dates = [dates] # Restrict return [d for d in dates if d in feed.get_dates()] PK!n??gtfstk/cleaners.py""" Functions about cleaning feeds. """ import math from typing import TYPE_CHECKING import pandas as pd from pandas import DataFrame from . import constants as cs # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def clean_column_names(df: DataFrame) -> DataFrame: """ Strip the whitespace from all column names in the given DataFrame and return the result. """ f = df.copy() f.columns = [col.strip() for col in f.columns] return f def drop_zombies(feed: "Feed") -> "Feed": """ In the given "Feed", drop stops with no stop times, trips with no stop times, shapes with no trips, routes with no trips, and services with no trips, in that order. Return the resulting "Feed". """ feed = feed.copy() # Drop stops of location type 0 that lack stop times ids = feed.stop_times["stop_id"].unique() f = feed.stops cond = f["stop_id"].isin(ids) if "location_type" in f.columns: cond |= f["location_type"] != 0 feed.stops = f[cond].copy() # Drop trips with no stop times ids = feed.stop_times["trip_id"].unique() f = feed.trips feed.trips = f[f["trip_id"].isin(ids)] # Drop shapes with no trips ids = feed.trips["shape_id"].unique() f = feed.shapes if f is not None: feed.shapes = f[f["shape_id"].isin(ids)] # Drop routes with no trips ids = feed.trips["route_id"].unique() f = feed.routes feed.routes = f[f["route_id"].isin(ids)] # Drop services with no trips ids = feed.trips["service_id"].unique() if feed.calendar is not None: f = feed.calendar feed.calendar = f[f["service_id"].isin(ids)] if feed.calendar_dates is not None: f = feed.calendar_dates feed.calendar_dates = f[f["service_id"].isin(ids)] return feed def clean_ids(feed: "Feed") -> "Feed": """ In the given "Feed", strip whitespace from all string IDs and then replace every remaining whitespace chunk with an underscore. Return the resulting "Feed". """ # Alter feed inputs only, and build a new feed from them. # The derived feed attributes, such as feed.trips_i, # will be automatically handled when creating the new feed. feed = feed.copy() for table in cs.GTFS_REF["table"].unique(): f = getattr(feed, table) if f is None: continue for column in cs.GTFS_REF.loc[cs.GTFS_REF["table"] == table, "column"]: if column in f.columns and column.endswith("_id"): try: f[column] = f[column].str.strip().str.replace(r"\s+", "_") setattr(feed, table, f) except AttributeError: # Column is not of string type continue return feed def clean_times(feed: "Feed") -> "Feed": """ In the given "Feed", convert H:MM:SS time strings to HH:MM:SS time strings to make sorting by time work as expected. Return the resulting "Feed". """ def reformat(t): if pd.isnull(t): return t t = t.strip() if len(t) == 7: t = "0" + t return t feed = feed.copy() tables_and_columns = [ ("stop_times", ["arrival_time", "departure_time"]), ("frequencies", ["start_time", "end_time"]), ] for table, columns in tables_and_columns: f = getattr(feed, table) if f is not None: f[columns] = f[columns].applymap(reformat) setattr(feed, table, f) return feed def clean_route_short_names(feed: "Feed") -> "Feed": """ In ``feed.routes``, assign 'n/a' to missing route short names and strip whitespace from route short names. Then disambiguate each route short name that is duplicated by appending '-' and its route ID. Return the resulting "Feed". """ feed = feed.copy() r = feed.routes if r is None: return feed # Fill NaNs and strip whitespace r["route_short_name"] = r["route_short_name"].fillna("n/a").str.strip() # Disambiguate def disambiguate(row): rsn, rid = row return rsn + "-" + rid r["dup"] = r["route_short_name"].duplicated(keep=False) r.loc[r["dup"], "route_short_name"] = r.loc[ r["dup"], ["route_short_name", "route_id"] ].apply(disambiguate, axis=1) del r["dup"] feed.routes = r return feed def aggregate_routes( feed: "Feed", by: str = "route_short_name", route_id_prefix: str = "route_" ) -> "Feed": """ Aggregate routes by route short name, say, and assign new route IDs. Parameters ---------- feed : "Feed" by : string A column of ``feed.routes`` route_id_prefix : string Prefix to use when creating new route IDs Returns ------- "Feed" The result is built from the given "Feed" as follows. Group ``feed.routes`` by the ``by`` column, and for each group 1. Choose the first route in the group 2. Assign a new route ID based on the given ``route_id_prefix`` string and a running count, e.g. ``'route_013'`` 3. Assign all the trips associated with routes in the group to that first route 4. Update the route IDs in the other "Feed" tables """ if by not in feed.routes.columns: raise ValueError(f"Column {by} not in feed.routes") feed = feed.copy() # Create new route IDs routes = feed.routes n = routes.groupby(by).ngroups k = int(math.log10(n)) + 1 # Number of digits for padding IDs nrid_by_orid = dict() i = 1 for col, group in routes.groupby(by): nrid = f"route_{i:0{k}d}" d = {orid: nrid for orid in group["route_id"].values} nrid_by_orid.update(d) i += 1 routes["route_id"] = routes["route_id"].map(lambda x: nrid_by_orid[x]) routes = routes.groupby(by).first().reset_index() feed.routes = routes # Update route IDs of trips trips = feed.trips trips["route_id"] = trips["route_id"].map(lambda x: nrid_by_orid[x]) feed.trips = trips # Update route IDs of transfers if feed.transfers is not None: transfers = feed.transfers transfers["route_id"] = transfers["route_id"].map( lambda x: nrid_by_orid[x] ) feed.transfers = transfers return feed def clean(feed: "Feed") -> "Feed": """ Apply #. :func:`drop_zombies` #. :func:`clean_ids` #. :func:`clean_times` #. :func:`clean_route_short_names` to the given "Feed" in that order. Return the resulting "Feed". """ feed = feed.copy() ops = [ "clean_ids", "clean_times", "clean_route_short_names", "drop_zombies", ] for op in ops: feed = globals()[op](feed) return feed def drop_invalid_columns(feed: "Feed") -> "Feed": """ Drop all DataFrame columns of the given "Feed" that are not listed in the GTFS. Return the resulting new "Feed". """ feed = feed.copy() for table, group in cs.GTFS_REF.groupby("table"): f = getattr(feed, table) if f is None: continue valid_columns = group["column"].values for col in f.columns: if col not in valid_columns: print(f"{table}: dropping invalid column {col}") del f[col] setattr(feed, table, f) return feed PK!G˸gtfstk/constants.py""" Constants useful across modules. """ import pandas as pd # Record some data from the GTFS reference at # https://developers.google.com/transit/gtfs/reference/ columns = ["table", "table_required", "column", "column_required", "dtype"] rows = [ ["agency", True, "agency_id", False, "str"], ["agency", True, "agency_name", True, "str"], ["agency", True, "agency_url", True, "str"], ["agency", True, "agency_timezone", True, "str"], ["agency", True, "agency_lang", False, "str"], ["agency", True, "agency_phone", False, "str"], ["agency", True, "agency_fare_url", False, "str"], ["agency", True, "agency_email", False, "str"], ["calendar", False, "service_id", True, "str"], ["calendar", False, "monday", True, "int"], ["calendar", False, "tuesday", True, "int"], ["calendar", False, "wednesday", True, "int"], ["calendar", False, "thursday", True, "int"], ["calendar", False, "friday", True, "int"], ["calendar", False, "saturday", True, "int"], ["calendar", False, "sunday", True, "int"], ["calendar", False, "start_date", True, "str"], ["calendar", False, "end_date", True, "str"], ["calendar_dates", False, "service_id", True, "str"], ["calendar_dates", False, "date", True, "str"], ["calendar_dates", False, "exception_type", True, "int"], ["fare_attributes", False, "fare_id", True, "str"], ["fare_attributes", False, "price", True, "float"], ["fare_attributes", False, "currency_type", True, "str"], ["fare_attributes", False, "payment_method", True, "int"], ["fare_attributes", False, "transfers", True, "int"], ["fare_attributes", False, "transfer_duration", False, "int"], ["fare_rules", False, "fare_id", True, "str"], ["fare_rules", False, "route_id", False, "str"], ["fare_rules", False, "origin_id", False, "str"], ["fare_rules", False, "destination_id", False, "str"], ["fare_rules", False, "contains_id", False, "str"], ["feed_info", False, "feed_publisher_name", True, "str"], ["feed_info", False, "feed_publisher_url", True, "str"], ["feed_info", False, "feed_lang", True, "str"], ["feed_info", False, "feed_start_date", False, "str"], ["feed_info", False, "feed_end_date", False, "str"], ["feed_info", False, "feed_version", False, "str"], ["frequencies", False, "trip_id", True, "str"], ["frequencies", False, "start_time", True, "str"], ["frequencies", False, "end_time", True, "str"], ["frequencies", False, "headway_secs", True, "int"], ["frequencies", False, "exact_times", False, "int"], ["routes", True, "route_id", True, "str"], ["routes", True, "agency_id", False, "str"], ["routes", True, "route_short_name", True, "str"], ["routes", True, "route_long_name", True, "str"], ["routes", True, "route_desc", False, "str"], ["routes", True, "route_type", True, "int"], ["routes", True, "route_url", False, "str"], ["routes", True, "route_color", False, "str"], ["routes", True, "route_text_color", False, "str"], ["shapes", False, "shape_id", True, "str"], ["shapes", False, "shape_pt_lat", True, "float"], ["shapes", False, "shape_pt_lon", True, "float"], ["shapes", False, "shape_pt_sequence", True, "int"], ["shapes", False, "shape_dist_traveled", False, "float"], ["stops", True, "stop_id", True, "str"], ["stops", True, "stop_code", False, "str"], ["stops", True, "stop_name", True, "str"], ["stops", True, "stop_desc", False, "str"], ["stops", True, "stop_lat", True, "float"], ["stops", True, "stop_lon", True, "float"], ["stops", True, "zone_id", False, "str"], ["stops", True, "stop_url", False, "str"], ["stops", True, "location_type", False, "int"], ["stops", True, "parent_station", False, "str"], ["stops", True, "stop_timezone", False, "str"], ["stops", True, "wheelchair_boarding", False, "int"], ["stop_times", True, "trip_id", True, "str"], ["stop_times", True, "arrival_time", True, "str"], ["stop_times", True, "departure_time", True, "str"], ["stop_times", True, "stop_id", True, "str"], ["stop_times", True, "stop_sequence", True, "int"], ["stop_times", True, "stop_headsign", False, "str"], ["stop_times", True, "pickup_type", False, "int"], ["stop_times", True, "drop_off_type", False, "int"], ["stop_times", True, "shape_dist_traveled", False, "float"], ["stop_times", True, "timepoint", False, "int"], ["transfers", False, "from_stop_id", True, "str"], ["transfers", False, "to_stop_id", True, "str"], ["transfers", False, "transfer_type", True, "int"], ["transfers", False, "min_transfer_time", False, "int"], ["trips", True, "route_id", True, "str"], ["trips", True, "service_id", True, "str"], ["trips", True, "trip_id", True, "str"], ["trips", True, "trip_headsign", False, "str"], ["trips", True, "trip_short_name", False, "str"], ["trips", True, "direction_id", False, "int"], ["trips", True, "block_id", False, "str"], ["trips", True, "shape_id", False, "str"], ["trips", True, "wheelchair_accessible", False, "int"], ["trips", True, "bikes_allowed", False, "int"], ] GTFS_REF = pd.DataFrame(rows, columns=columns) #: Columns that must be formatted as integers when outputting GTFS INT_COLS = GTFS_REF.loc[GTFS_REF["dtype"] == "int", "column"].values.tolist() #: Columns that must be read as strings by Pandas STR_COLS = GTFS_REF.loc[GTFS_REF["dtype"] == "str", "column"].values.tolist() DTYPE = {col: str for col in STR_COLS} #: Valid distance units DIST_UNITS = ["ft", "mi", "m", "km"] #: Primary feed attributes FEED_ATTRS_1 = [ "agency", "calendar", "calendar_dates", "fare_attributes", "fare_rules", "feed_info", "frequencies", "routes", "shapes", "stops", "stop_times", "trips", "transfers", "dist_units", ] #: Secondary feed attributes; derived from primary ones FEED_ATTRS_2 = ["_trips_i", "_calendar_i", "_calendar_dates_g"] #: FEED_ATTRS = FEED_ATTRS_1 + FEED_ATTRS_2 #: WGS84 coordinate reference system for Geopandas WGS84 = {"init": "epsg:4326"} #: Colorbrewer 8-class Set2 colors COLORS_SET2 = [ "#66c2a5", "#fc8d62", "#8da0cb", "#e78ac3", "#a6d854", "#ffd92f", "#e5c494", "#b3b3b3", ] PK!S88gtfstk/feed.py""" This module defines a Feed class to represent GTFS feeds. There is an instance attribute for every GTFS table (routes, stops, etc.), which stores the table as a Pandas DataFrame, or as ``None`` in case that table is missing. The Feed class also has heaps of methods: a method to compute route stats, a method to compute screen line counts, validations methods, etc. To ease reading, almost all of these methods are defined in other modules and grouped by theme (``routes.py``, ``stops.py``, etc.). These methods, or rather functions that operate on feeds, are then imported within the Feed class. This separation of methods unfortunately messes up slightly the ``Feed`` class documentation generated by Sphinx, introducing an extra leading ``feed`` parameter in the method signatures. Ignore that extra parameter; it refers to the Feed instance, usually called ``self`` and usually hidden automatically by Sphinx. """ from pathlib import Path import tempfile import shutil from copy import deepcopy from collections import OrderedDict import zipfile from typing import Optional import pandas as pd from pandas.core.frame import DataFrame from . import constants as cs from . import helpers as hp from . import cleaners as cn class Feed(object): """ An instance of this class represents a not-necessarily-valid GTFS feed, where GTFS tables are stored as DataFrames. Beware that the stop times DataFrame can be big (several gigabytes), so make sure you have enough memory to handle it. Primary instance attributes: - ``dist_units``: a string in :const:`.constants.DIST_UNITS`; specifies the distance units to use when calculating various stats, such as route service distance; should match the implicit distance units of the ``shape_dist_traveled`` column values, if present - ``agency`` - ``stops`` - ``routes`` - ``trips`` - ``stop_times`` - ``calendar`` - ``calendar_dates`` - ``fare_attributes`` - ``fare_rules`` - ``shapes`` - ``frequencies`` - ``transfers`` - ``feed_info`` There are also a few secondary instance attributes that are derived from the primary attributes and are automatically updated when the primary attributes change. However, for this update to work, you must update the primary attributes like this (good):: feed.trips['route_short_name'] = 'bingo' feed.trips = feed.trips and **not** like this (bad):: feed.trips['route_short_name'] = 'bingo' The first way ensures that the altered trips DataFrame is saved as the new ``trips`` attribute, but the second way does not. """ # Import heaps of methods from modules split by functionality; # i learned this trick from # https://groups.google.com/d/msg/comp.lang.python/goLBrqcozNY/DPgyaZ6gAwAJ from .calendar import get_dates, get_first_week, restrict_dates from .routes import ( get_routes, compute_route_stats, build_null_route_time_series, compute_route_time_series, build_route_timetable, route_to_geojson, map_routes, ) from .shapes import ( build_geometry_by_shape, shapes_to_geojson, get_shapes_intersecting_geometry, append_dist_to_shapes, ) from .stops import ( get_stops, build_geometry_by_stop, compute_stop_activity, compute_stop_stats, build_null_stop_time_series, compute_stop_time_series, build_stop_timetable, get_stops_in_polygon, map_stops, ) from .stop_times import ( get_stop_times, append_dist_to_stop_times, get_start_and_end_times, ) from .trips import ( is_active_trip, get_trips, compute_trip_activity, compute_busiest_date, compute_trip_stats, locate_trips, trip_to_geojson, map_trips, ) from .miscellany import ( summarize, describe, assess_quality, convert_dist, compute_feed_stats, compute_feed_time_series, create_shapes, compute_bounds, compute_center, restrict_to_dates, restrict_to_routes, restrict_to_polygon, compute_screen_line_counts, ) from .validators import ( validate, check_agency, check_calendar, check_calendar_dates, check_fare_attributes, check_fare_rules, check_feed_info, check_frequencies, check_routes, check_shapes, check_stops, check_stop_times, check_transfers, check_trips, ) from .cleaners import ( clean_ids, clean_times, clean_route_short_names, drop_zombies, aggregate_routes, clean, drop_invalid_columns, ) def __init__( self, dist_units: str, agency: Optional[DataFrame] = None, stops: Optional[DataFrame] = None, routes: Optional[DataFrame] = None, trips: Optional[DataFrame] = None, stop_times: Optional[DataFrame] = None, calendar: Optional[DataFrame] = None, calendar_dates: Optional[DataFrame] = None, fare_attributes: Optional[DataFrame] = None, fare_rules: Optional[DataFrame] = None, shapes: Optional[DataFrame] = None, frequencies: Optional[DataFrame] = None, transfers: Optional[DataFrame] = None, feed_info: Optional[DataFrame] = None, ): """ Assume that every non-None input is a Pandas DataFrame, except for ``dist_units`` which should be a string in :const:`.constants.DIST_UNITS`. No other format checking is performed. In particular, a Feed instance need not represent a valid GTFS feed. """ # Set primary attributes from inputs. # The @property magic below will then # validate some and set some derived attributes for prop, val in locals().items(): if prop in cs.FEED_ATTRS_1: setattr(self, prop, val) @property def dist_units(self): """ The distance units of the Feed. """ return self._dist_units @dist_units.setter def dist_units(self, val): if val not in cs.DIST_UNITS: raise ValueError( f"Distance units are required and " f"must lie in {cs.DIST_UNITS}" ) else: self._dist_units = val @property def trips(self): """ The trips table of this Feed. """ return self._trips @trips.setter def trips(self, val): """ Update ``self._trips_i`` if ``self.trips`` changes. """ self._trips = val if val is not None and not val.empty: self._trips_i = self._trips.set_index("trip_id") else: self._trips_i = None @property def calendar(self): """ The calendar table of this Feed. """ return self._calendar @calendar.setter def calendar(self, val): """ Update ``self._calendar_i``if ``self.calendar`` changes. """ self._calendar = val if val is not None and not val.empty: self._calendar_i = self._calendar.set_index("service_id") else: self._calendar_i = None @property def calendar_dates(self): """ The calendar_dates table of this Feed. """ return self._calendar_dates @calendar_dates.setter def calendar_dates(self, val): """ Update ``self._calendar_dates_g`` if ``self.calendar_dates`` changes. """ self._calendar_dates = val if val is not None and not val.empty: self._calendar_dates_g = self._calendar_dates.groupby( ["service_id", "date"] ) else: self._calendar_dates_g = None def __str__(self): """ Print the first five rows of each GTFS table. """ d = OrderedDict() for table in cs.GTFS_REF["table"].unique(): try: d[table] = getattr(self, table).head(5) except: d[table] = None d["dist_units"] = self.dist_units return "\n".join( ["* {k} --------------------\n\t{v}" for k, v in d.items()] ) def __eq__(self, other): """ Define two feeds be equal if and only if their :const:`.constants.FEED_ATTRS` attributes are equal, or almost equal in the case of DataFrames (but not groupby DataFrames). Almost equality is checked via :func:`.helpers.almost_equal`, which canonically sorts DataFrame rows and columns. """ # Return False if failures for key in cs.FEED_ATTRS_1: x = getattr(self, key) y = getattr(other, key) # DataFrame case if isinstance(x, pd.DataFrame): if not isinstance(y, pd.DataFrame) or not hp.almost_equal( x, y ): return False # Other case else: if x != y: return False # No failures return True def copy(self) -> "Feed": """ Return a copy of this feed, that is, a feed with all the same attributes. """ other = Feed(dist_units=self.dist_units) for key in set(cs.FEED_ATTRS) - set(["dist_units"]): value = getattr(self, key) if isinstance(value, pd.DataFrame): # Pandas copy DataFrame value = value.copy() elif isinstance(value, pd.core.groupby.DataFrameGroupBy): # Pandas does not have a copy method for groupby objects # as far as i know value = deepcopy(value) setattr(other, key, value) return other # ------------------------------------- # Functions about input and output # ------------------------------------- def list_gtfs(path: Path) -> DataFrame: """ Given a path (string or Path object) to a GTFS zip file or directory, record the file names and file sizes of the contents, and return the result in a DataFrame with the columns: - ``'file_name'`` - ``'file_size'`` """ path = Path(path) if not path.exists(): raise ValueError(f"Path {path} does not exist") # Collect rows of DataFrame rows = [] if path.is_file(): # Zip file with zipfile.ZipFile(str(path)) as src: for x in src.infolist(): if x.filename == "./": continue d = {} d["file_name"] = x.filename d["file_size"] = x.file_size rows.append(d) else: # Directory for x in path.iterdir(): d = {} d["file_name"] = x.name d["file_size"] = x.stat().st_size rows.append(d) return pd.DataFrame(rows) def read_gtfs(path: Path, dist_units: str) -> "Feed": """ Create a Feed instance from the given path and given distance units. The path should be a directory containing GTFS text files or a zip file that unzips as a collection of GTFS text files (and not as a directory containing GTFS text files). The distance units given must lie in :const:`constants.dist_units` Notes ----- - Ignore non-GTFS files - Automatically strip whitespace from the column names in GTFS files """ path = Path(path) if not path.exists(): raise ValueError(f"Path {path} does not exist") # Unzip path to temporary directory if necessary if path.is_file(): zipped = True tmp_dir = tempfile.TemporaryDirectory() src_path = Path(tmp_dir.name) shutil.unpack_archive(str(path), tmp_dir.name, "zip") else: zipped = False src_path = path # Read files into feed dictionary of DataFrames feed_dict = {table: None for table in cs.GTFS_REF["table"]} for p in src_path.iterdir(): table = p.stem # Skip empty files, irrelevant files, and files with no data if p.is_file() and p.stat().st_size and table in feed_dict: # utf-8-sig gets rid of the byte order mark (BOM); # see http://stackoverflow.com/questions/17912307/u-ufeff-in-python-string df = pd.read_csv(p, dtype=cs.DTYPE, encoding="utf-8-sig") if not df.empty: feed_dict[table] = cn.clean_column_names(df) feed_dict["dist_units"] = dist_units # Delete temporary directory if zipped: tmp_dir.cleanup() # Create feed return Feed(**feed_dict) def write_gtfs(feed: "Feed", path: Path, ndigits: int = 6) -> None: """ Export the given feed to the given path. If the path end in '.zip', then write the feed as a zip archive. Otherwise assume the path is a directory, and write the feed as a collection of CSV files to that directory, creating the directory if it does not exist. Round all decimals to ``ndigits`` decimal places. All distances will be the distance units ``feed.dist_units``. """ path = Path(path) if path.suffix == ".zip": # Write to temporary directory before zipping zipped = True tmp_dir = tempfile.TemporaryDirectory() new_path = Path(tmp_dir.name) else: zipped = False if not path.exists(): path.mkdir() new_path = path for table in cs.GTFS_REF["table"].unique(): f = getattr(feed, table) if f is None: continue f = f.copy() # Some columns need to be output as integers. # If there are NaNs in any such column, # then Pandas will format the column as float, which we don't want. f_int_cols = set(cs.INT_COLS) & set(f.columns) for s in f_int_cols: f[s] = f[s].fillna(-1).astype(int).astype(str).replace("-1", "") p = new_path / (table + ".txt") f.to_csv(str(p), index=False, float_format=f"%.{ndigits}f") # Zip directory if zipped: basename = str(path.parent / path.stem) shutil.make_archive(basename, format="zip", root_dir=tmp_dir.name) tmp_dir.cleanup() PK!%ca4a4gtfstk/helpers.py""" Functions useful across modules. """ import datetime as dt from typing import Optional, Dict, List, Union, Callable import pandas as pd from pandas import DataFrame import numpy as np from shapely.geometry import LineString, Point from shapely.ops import transform import utm import json2table as j2t from . import constants as cs def datestr_to_date( x: Union[dt.date, str], format_str: str = "%Y%m%d", *, inverse: bool = False, ) -> Union[str, dt.date]: """ Given a string ``x`` representing a date in the given format, convert it to a Datetime Date object and return the result. If ``inverse``, then assume that ``x`` is a date object and return its corresponding string in the given format. """ if x is None: return None if not inverse: result = dt.datetime.strptime(x, format_str).date() else: result = x.strftime(format_str) return result def timestr_to_seconds( x: Union[dt.date, str], *, inverse: bool = False, mod24: bool = False ) -> int: """ Given an HH:MM:SS time string ``x``, return the number of seconds past midnight that it represents. In keeping with GTFS standards, the hours entry may be greater than 23. If ``mod24``, then return the number of seconds modulo ``24*3600``. If ``inverse``, then do the inverse operation. In this case, if ``mod24`` also, then first take the number of seconds modulo ``24*3600``. """ if not inverse: try: hours, mins, seconds = x.split(":") result = int(hours) * 3600 + int(mins) * 60 + int(seconds) if mod24: result %= 24 * 3600 except: result = np.nan else: try: seconds = int(x) if mod24: seconds %= 24 * 3600 hours, remainder = divmod(seconds, 3600) mins, secs = divmod(remainder, 60) result = f"{hours:02d}:{mins:02d}:{secs:02d}" except: result = np.nan return result def timestr_mod24(timestr: str) -> int: """ Given a GTFS HH:MM:SS time string, return a timestring in the same format but with the hours taken modulo 24. """ try: hours, mins, secs = [int(x) for x in timestr.split(":")] hours %= 24 result = f"{hours:02d}:{mins:02d}:{secs:02d}" except: result = None return result def weekday_to_str( weekday: Union[int, str], *, inverse: bool = False ) -> Union[int, str]: """ Given a weekday number (integer in the range 0, 1, ..., 6), return its corresponding weekday name as a lowercase string. Here 0 -> 'monday', 1 -> 'tuesday', and so on. If ``inverse``, then perform the inverse operation. """ s = [ "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", ] if not inverse: try: return s[weekday] except: return else: try: return s.index(weekday) except: return def get_segment_length( linestring: LineString, p: Point, q: Optional[Point] = None ) -> float: """ Given a Shapely linestring and two Shapely points, project the points onto the linestring, and return the distance along the linestring between the two points. If ``q is None``, then return the distance from the start of the linestring to the projection of ``p``. The distance is measured in the native coordinates of the linestring. """ # Get projected distances d_p = linestring.project(p) if q is not None: d_q = linestring.project(q) d = abs(d_p - d_q) else: d = d_p return d def get_max_runs(x) -> np.array: """ Given a list of numbers, return a NumPy array of pairs (start index, end index + 1) of the runs of max value. Example:: >>> get_max_runs([7, 1, 2, 7, 7, 1, 2]) array([[0, 1], [3, 5]]) Assume x is not empty. Recipe comes from `Stack Overflow `_. """ # Get 0-1 array where 1 marks the max values of x x = np.array(x) m = np.max(x) y = (x == m) * 1 # Bound y by zeros to detect runs properly bounded = np.hstack(([0], y, [0])) # Get 1 at run starts and -1 at run ends diffs = np.diff(bounded) run_starts = np.where(diffs > 0)[0] run_ends = np.where(diffs < 0)[0] return np.array([run_starts, run_ends]).T # # Get lengths of runs and find index of longest # idx = np.argmax(run_ends - run_starts) # return run_starts[idx], run_ends[idx] def get_peak_indices(times: List, counts: List) -> np.array: """ Given an increasing list of times as seconds past midnight and a list of trip counts at those respective times, return a pair of indices i, j such that times[i] to times[j] is the first longest time period such that for all i <= x < j, counts[x] is the max of counts. Assume times and counts have the same nonzero length. """ max_runs = get_max_runs(counts) def get_duration(a): return times[a[1]] - times[a[0]] index = np.argmax(np.apply_along_axis(get_duration, 1, max_runs)) return max_runs[index] def get_convert_dist( dist_units_in: str, dist_units_out: str ) -> Callable[[float], float]: """ Return a function of the form distance in the units ``dist_units_in`` -> distance in the units ``dist_units_out`` Only supports distance units in :const:`constants.DIST_UNITS`. """ di, do = dist_units_in, dist_units_out DU = cs.DIST_UNITS if not (di in DU and do in DU): raise ValueError(f"Distance units must lie in {DU}") d = { "ft": {"ft": 1, "m": 0.3048, "mi": 1 / 5280, "km": 0.0003048}, "m": {"ft": 1 / 0.3048, "m": 1, "mi": 1 / 1609.344, "km": 1 / 1000}, "mi": {"ft": 5280, "m": 1609.344, "mi": 1, "km": 1.609344}, "km": {"ft": 1 / 0.0003048, "m": 1000, "mi": 1 / 1.609344, "km": 1}, } return lambda x: d[di][do] * x def almost_equal(f: DataFrame, g: DataFrame) -> bool: """ Return ``True`` if and only if the given DataFrames are equal after sorting their columns names, sorting their values, and reseting their indices. """ if f.empty or g.empty: return f.equals(g) else: # Put in canonical order F = ( f.sort_index(axis=1) .sort_values(list(f.columns)) .reset_index(drop=True) ) G = ( g.sort_index(axis=1) .sort_values(list(g.columns)) .reset_index(drop=True) ) return F.equals(G) def is_not_null(df: DataFrame, col_name: str) -> bool: """ Return ``True`` if the given DataFrame has a column of the given name (string), and there exists at least one non-NaN value in that column; return ``False`` otherwise. """ if ( isinstance(df, pd.DataFrame) and col_name in df.columns and df[col_name].notnull().any() ): return True else: return False def get_utm_crs(lat: float, lon: float) -> Dict: """ Return a GeoPandas coordinate reference system (CRS) dictionary corresponding to the UTM projection appropriate to the given WGS84 latitude and longitude. """ zone = utm.from_latlon(lat, lon)[2] south = lat < 0 return { "proj": "utm", "zone": zone, "south": south, "ellps": "WGS84", "datum": "WGS84", "units": "m", "no_defs": True, } def linestring_to_utm(linestring: LineString) -> LineString: """ Given a Shapely LineString in WGS84 coordinates, convert it to the appropriate UTM coordinates. If ``inverse``, then do the inverse. """ proj = lambda x, y: utm.from_latlon(y, x)[:2] return transform(proj, linestring) def get_active_trips_df(trip_times: DataFrame) -> DataFrame: """ Count the number of trips in ``trip_times`` that are active at any given time. Parameters ---------- trip_times : DataFrame Contains columns - start_time: start time of the trip in seconds past midnight - end_time: end time of the trip in seconds past midnight Returns ------- Series index is times from midnight when trips start and end, values are number of active trips for that time """ active_trips = ( pd.concat( [ pd.Series(1, trip_times.start_time), # departed add 1 pd.Series(-1, trip_times.end_time), # arrived subtract 1 ] ) .groupby(level=0, sort=True) .sum() .cumsum() .ffill() ) return active_trips def combine_time_series( time_series_dict: Dict, kind: str, *, split_directions: bool = False ) -> DataFrame: """ Combine the many time series DataFrames in the given dictionary into one time series DataFrame with hierarchical columns. Parameters ---------- time_series_dict : dictionary Has the form string -> time series kind : string ``'route'`` or ``'stop'`` split_directions : boolean If ``True``, then assume the original time series contains data separated by trip direction; otherwise, assume not. The separation is indicated by a suffix ``'-0'`` (direction 0) or ``'-1'`` (direction 1) in the route ID or stop ID column values. Returns ------- DataFrame Columns are hierarchical (multi-index). The top level columns are the keys of the dictionary and the second level columns are ``'route_id'`` and ``'direction_id'``, if ``kind == 'route'``, or 'stop_id' and ``'direction_id'``, if ``kind == 'stop'``. If ``split_directions``, then third column is ``'direction_id'``; otherwise, there is no ``'direction_id'`` column. """ if kind not in ["stop", "route"]: raise ValueError("kind must be 'stop' or 'route'") names = ["indicator"] if kind == "stop": names.append("stop_id") else: names.append("route_id") if split_directions: names.append("direction_id") def process_index(k): a, b = k.rsplit("-", 1) return a, int(b) frames = list(time_series_dict.values()) new_frames = [] if split_directions: for f in frames: ft = f.T ft.index = pd.MultiIndex.from_tuples( [process_index(k) for (k, __) in ft.iterrows()] ) new_frames.append(ft.T) else: new_frames = frames result = pd.concat( new_frames, axis=1, keys=list(time_series_dict.keys()), names=names ) return result def downsample(time_series: DataFrame, freq: str) -> DataFrame: """ Downsample the given route, stop, or feed time series, (outputs of :func:`.routes.compute_route_time_series`, :func:`.stops.compute_stop_time_series`, or :func:`.miscellany.compute_feed_time_series`, respectively) to the given Pandas frequency string (e.g. '15Min'). Return the given time series unchanged if the given frequency is shorter than the original frequency. """ f = time_series.copy() # Can't downsample to a shorter frequency if f.empty or pd.tseries.frequencies.to_offset(freq) < f.index.freq: return f result = None if "stop_id" in time_series.columns.names: # It's a stops time series result = f.resample(freq).sum() else: # It's a route or feed time series. inds = [ "num_trips", "num_trip_starts", "num_trip_ends", "service_distance", "service_duration", ] frames = [] # Resample num_trips in a custom way that depends on # num_trips and num_trip_ends def agg_num_trips(group): return ( group["num_trips"].iloc[-1] + group["num_trip_ends"].iloc[:-1].sum() ) num_trips = f.groupby(pd.Grouper(freq=freq)).apply(agg_num_trips) frames.append(num_trips) # Resample the rest of the indicators via summing frames.extend([f[ind].resample(freq).agg("sum") for ind in inds[1:]]) g = pd.concat(frames, axis=1, keys=inds) # Calculate speed and add it to f. Can't resample it. speed = g["service_distance"] / g["service_duration"] speed = pd.concat({"service_speed": speed}, axis=1) result = pd.concat([g, speed], axis=1) # Reset column names and sort the hierarchical columns to allow slicing; # see http://pandas.pydata.org/pandas-docs/stable/advanced.html#sorting-a-multiindex result.columns.names = f.columns.names result = result.sort_index(axis=1, sort_remaining=True) return result def make_html(d: Dict) -> str: """ Convert the given dictionary into an HTML table (string) with two columns: keys of dictionary, values of dictionary. """ return j2t.convert( d, table_attributes={"class": "table table-condensed table-hover"} ) PK!D[ llgtfstk/miscellany.py""" Functions about miscellany. """ from collections import OrderedDict import math import copy from typing import List, Optional, Tuple, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np import shapely.geometry as sg from shapely.geometry import Polygon, LineString from . import helpers as hp from . import constants as cs # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def summarize(feed: "Feed", table: str = None) -> DataFrame: """ Return a DataFrame summarizing all GTFS tables in the given feed or in the given table if specified. Parameters ---------- feed : Feed table : string A GTFS table name, e.g. ``'stop_times'`` Returns ------- DataFrame Columns are - ``'table'``: name of the GTFS table, e.g. ``'stops'`` - ``'column'``: name of a column in the table, e.g. ``'stop_id'`` - ``'num_values'``: number of values in the column - ``'num_nonnull_values'``: number of nonnull values in the column - ``'num_unique_values'``: number of unique values in the column, excluding null values - ``'min_value'``: minimum value in the column - ``'max_value'``: maximum value in the column Notes ----- - If the table is not in the feed, then return an empty DataFrame - If the table is not valid, raise a ValueError """ gtfs_tables = cs.GTFS_REF.table.unique() if table is not None: if table not in gtfs_tables: raise ValueError(f"{table} is not a GTFS table") else: tables = [table] else: tables = gtfs_tables frames = [] for table in tables: f = getattr(feed, table) if f is None: continue def my_agg(col): d = {} d["column"] = col.name d["num_values"] = col.size d["num_nonnull_values"] = col.count() d["num_unique_values"] = col.nunique() d["min_value"] = col.dropna().min() d["max_value"] = col.dropna().max() return pd.Series(d) g = f.apply(my_agg).T.reset_index(drop=True) g["table"] = table frames.append(g) cols = [ "table", "column", "num_values", "num_nonnull_values", "num_unique_values", "min_value", "max_value", ] if not frames: f = pd.DataFrame() else: f = pd.concat(frames) # Rearrange columns f = f[cols].copy() return f def describe(feed: "Feed", sample_date: Optional[str] = None) -> DataFrame: """ Return a DataFrame of various feed indicators and values, e.g. number of routes. Specialize some those indicators to the given sample date, e.g. number of routes active on the date. Parameters ---------- feed : Feed sample_date : string YYYYMMDD date string specifying the date to compute sample stats; defaults to the first Thursday of the Feed's period Returns ------- DataFrame The columns are - ``'indicator'``: string; name of an indicator, e.g. 'num_routes' - ``'value'``: value of the indicator, e.g. 27 """ from . import calendar as cl d = OrderedDict() dates = cl.get_dates(feed) d["agencies"] = feed.agency["agency_name"].tolist() d["timezone"] = feed.agency["agency_timezone"].iat[0] d["start_date"] = dates[0] d["end_date"] = dates[-1] d["num_routes"] = feed.routes.shape[0] d["num_trips"] = feed.trips.shape[0] d["num_stops"] = feed.stops.shape[0] if feed.shapes is not None: d["num_shapes"] = feed.shapes["shape_id"].nunique() else: d["num_shapes"] = 0 if sample_date is None or sample_date not in feed.get_dates(): sample_date = cl.get_first_week(feed)[3] d["sample_date"] = sample_date d["num_routes_active_on_sample_date"] = feed.get_routes(sample_date).shape[ 0 ] trips = feed.get_trips(sample_date) d["num_trips_active_on_sample_date"] = trips.shape[0] d["num_stops_active_on_sample_date"] = feed.get_stops(sample_date).shape[0] f = pd.DataFrame(list(d.items()), columns=["indicator", "value"]) return f def assess_quality(feed: "Feed") -> DataFrame: """ Return a DataFrame of various feed indicators and values, e.g. number of trips missing shapes. Parameters ---------- feed : Feed Returns ------- DataFrame The columns are - ``'indicator'``: string; name of an indicator, e.g. 'num_routes' - ``'value'``: value of the indicator, e.g. 27 Notes ----- - An odd function, but useful to see roughly how broken a feed is - Not a GTFS validator """ d = OrderedDict() # Count duplicate route short names r = feed.routes dup = r.duplicated(subset=["route_short_name"]) n = dup[dup].count() d["num_route_short_names_duplicated"] = n d["frac_route_short_names_duplicated"] = n / r.shape[0] # Count stop times missing shape_dist_traveled values st = feed.stop_times.sort_values(["trip_id", "stop_sequence"]) if "shape_dist_traveled" in st.columns: # Count missing distances n = st[st["shape_dist_traveled"].isnull()].shape[0] d["num_stop_time_dists_missing"] = n d["frac_stop_time_dists_missing"] = n / st.shape[0] else: d["num_stop_time_dists_missing"] = st.shape[0] d["frac_stop_time_dists_missing"] = 1 # Count direction_ids missing t = feed.trips if "direction_id" in t.columns: n = t[t["direction_id"].isnull()].shape[0] d["num_direction_ids_missing"] = n d["frac_direction_ids_missing"] = n / t.shape[0] else: d["num_direction_ids_missing"] = t.shape[0] d["frac_direction_ids_missing"] = 1 # Count trips missing shapes if feed.shapes is not None: n = t[t["shape_id"].isnull()].shape[0] else: n = t.shape[0] d["num_trips_missing_shapes"] = n d["frac_trips_missing_shapes"] = n / t.shape[0] # Count missing departure times n = st[st["departure_time"].isnull()].shape[0] d["num_departure_times_missing"] = n d["frac_departure_times_missing"] = n / st.shape[0] # Count missing first departure times missing g = st.groupby("trip_id").first().reset_index() n = g[g["departure_time"].isnull()].shape[0] d["num_first_departure_times_missing"] = n d["frac_first_departure_times_missing"] = n / st.shape[0] # Count missing last departure times g = st.groupby("trip_id").last().reset_index() n = g[g["departure_time"].isnull()].shape[0] d["num_last_departure_times_missing"] = n d["frac_last_departure_times_missing"] = n / st.shape[0] # Opine if ( (d["frac_first_departure_times_missing"] >= 0.1) or (d["frac_last_departure_times_missing"] >= 0.1) or d["frac_trips_missing_shapes"] >= 0.8 ): d["assessment"] = "bad feed" elif ( d["frac_direction_ids_missing"] or d["frac_stop_time_dists_missing"] or d["num_route_short_names_duplicated"] ): d["assessment"] = "probably a fixable feed" else: d["assessment"] = "good feed" f = pd.DataFrame(list(d.items()), columns=["indicator", "value"]) return f def convert_dist(feed: "Feed", new_dist_units: str) -> "Feed": """ Convert the distances recorded in the ``shape_dist_traveled`` columns of the given Feed to the given distance units. New distance units must lie in :const:`.constants.DIST_UNITS`. Return the resulting feed. """ feed = feed.copy() if feed.dist_units == new_dist_units: # Nothing to do return feed old_dist_units = feed.dist_units feed.dist_units = new_dist_units converter = hp.get_convert_dist(old_dist_units, new_dist_units) if hp.is_not_null(feed.stop_times, "shape_dist_traveled"): feed.stop_times["shape_dist_traveled"] = feed.stop_times[ "shape_dist_traveled" ].map(converter) if hp.is_not_null(feed.shapes, "shape_dist_traveled"): feed.shapes["shape_dist_traveled"] = feed.shapes[ "shape_dist_traveled" ].map(converter) return feed def compute_feed_stats( feed: "Feed", trip_stats: DataFrame, dates: List[str] ) -> DataFrame: """ Compute some feed stats for the given dates and trip stats. Parameters ---------- feed : Feed trip_stats : DataFrame Trip stats to consider in the format output by :func:`.trips.compute_trip_stats` dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats Returns ------- DataFrame The columns are - ``'date'`` - ``'num_stops'``: number of stops active on the date - ``'num_routes'``: number of routes active on the date - ``'num_trips'``: number of trips that start on the date - ``'num_trip_starts'``: number of trips with nonnull start times on the date - ``'num_trip_ends'``: number of trips with nonnull start times and nonnull end times on the date, ignoring trips that end after 23:59:59 on the date - ``'peak_num_trips'``: maximum number of simultaneous trips in service on the date - ``'peak_start_time'``: start time of first longest period during which the peak number of trips occurs on the date - ``'peak_end_time'``: end time of first longest period during which the peak number of trips occurs on the date - ``'service_distance'``: sum of the service distances for the active routes on the date - ``'service_duration'``: sum of the service durations for the active routes on the date - ``'service_speed'``: service_distance/service_duration on the date Dates with no trip activity will have null stats. Exclude dates that lie outside of the Feed's date range. If all the dates given lie outside of the Feed's date range, then return an empty DataFrame. Notes ----- - The route and trip stats for date d contain stats for trips that start on date d only and ignore trips that start on date d-1 and end on date d - Assume the following feed attributes are not ``None``: * Those used in :func:`.trips.get_trips` * Those used in :func:`.routes.get_routes` * Those used in :func:`.stops.get_stops` """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() ts = trip_stats.copy() activity = feed.compute_trip_activity(dates) stop_times = feed.stop_times.copy() # Convert timestrings to seconds for quicker calculations ts[["start_time", "end_time"]] = ts[["start_time", "end_time"]].applymap( hp.timestr_to_seconds ) # Collect stats for each date, memoizing stats by trip ID sequence # to avoid unnecessary recomputations. # Store in dictionary of the form # trip ID sequence -> # [stats dictionary, date list that stats apply] stats_and_dates_by_ids = {} cols = [ "num_stops", "num_routes", "num_trips", "num_trip_starts", "num_trip_ends", "peak_num_trips", "peak_start_time", "peak_end_time", "service_distance", "service_duration", "service_speed", ] null_stats = {c: np.nan for c in cols} for date in dates: stats = {} ids = tuple(activity.loc[activity[date] > 0, "trip_id"]) if ids in stats_and_dates_by_ids: # Append date to date list stats_and_dates_by_ids[ids][1].append(date) elif not ids: # Null stats stats_and_dates_by_ids[ids] = [null_stats, [date]] else: # Compute stats f = ts[ts["trip_id"].isin(ids)].copy() stats["num_stops"] = stop_times.loc[ stop_times["trip_id"].isin(ids), "stop_id" ].nunique() stats["num_routes"] = f["route_id"].nunique() stats["num_trips"] = f.shape[0] stats["num_trip_starts"] = f["start_time"].count() stats["num_trip_ends"] = f.loc[ f["end_time"] < 24 * 3600, "end_time" ].count() stats["service_distance"] = f["distance"].sum() stats["service_duration"] = f["duration"].sum() stats["service_speed"] = ( stats["service_distance"] / stats["service_duration"] ) # Compute peak stats, which is the slowest part active_trips = hp.get_active_trips_df( f[["start_time", "end_time"]] ) times, counts = active_trips.index.values, active_trips.values start, end = hp.get_peak_indices(times, counts) stats["peak_num_trips"] = counts[start] stats["peak_start_time"] = times[start] stats["peak_end_time"] = times[end] # Record stats stats_and_dates_by_ids[ids] = [stats, [date]] # Assemble stats into DataFrame rows = [] for stats, dates_ in stats_and_dates_by_ids.values(): for date in dates_: s = copy.copy(stats) s["date"] = date rows.append(s) f = pd.DataFrame(rows).sort_values("date") # Convert seconds back to timestrings times = ["peak_start_time", "peak_end_time"] f[times] = f[times].applymap( lambda t: hp.timestr_to_seconds(t, inverse=True) ) return f def compute_feed_time_series( feed: "Feed", trip_stats: DataFrame, dates: List[str], freq: str = "5Min" ) -> DataFrame: """ Compute some feed stats in time series form for the given dates and trip stats. Parameters ---------- feed : Feed trip_stats : DataFrame Trip stats to consider in the format output by :func:`.trips.compute_trip_stats` dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats freq : string Pandas frequency string specifying the frequency of the resulting time series, e.g. '5Min'; highest frequency allowable is one minute ('Min'). Returns ------- DataFrame A time series with a timestamp index across the given dates sampled at the given frequency. The maximum allowable frequency is 1 minute. The columns are - ``'num_trips'``: number of trips in service during during the time period - ``'num_trip_starts'``: number of trips with starting during the time period - ``'num_trip_ends'``: number of trips ending during the time period, ignoring the trips the end past midnight - ``'service_distance'``: distance traveled during the time period by all trips active during the time period - ``'service_duration'``: duration traveled during the time period by all trips active during the time period - ``'service_speed'``: ``service_distance/service_duration`` Exclude dates that lie outside of the Feed's date range. If all the dates given lie outside of the Feed's date range, then return an empty DataFrame with the specified columns. Notes ----- - See the notes for :func:`.routes.compute_route_time_series_base` - If all dates lie outside the Feed's date range, then return an empty DataFrame - Assume the following feed attributes are not ``None``: * Those used in :func:`.routes.compute_route_time_series` """ rts = feed.compute_route_time_series(trip_stats, dates, freq=freq) if rts.empty: return pd.DataFrame() cols = [ "num_trip_starts", "num_trip_ends", "num_trips", "service_distance", "service_duration", "service_speed", ] f = pd.concat( [rts[col].sum(axis=1, min_count=1) for col in cols], axis=1, keys=cols ) f["service_speed"] = f["service_distance"] / f["service_duration"] return f.sort_index(axis=1) def create_shapes(feed: "Feed", *, all_trips: bool = False) -> "Feed": """ Given a feed, create a shape for every trip that is missing a shape ID. Do this by connecting the stops on the trip with straight lines. Return the resulting feed which has updated shapes and trips tables. If ``all_trips``, then create new shapes for all trips by connecting stops, and remove the old shapes. Assume the following feed attributes are not ``None``: - ``feed.stop_times`` - ``feed.trips`` - ``feed.stops`` """ feed = feed.copy() if all_trips: trip_ids = feed.trips["trip_id"] else: trip_ids = feed.trips[feed.trips["shape_id"].isnull()]["trip_id"] # Get stop times for given trips f = feed.stop_times[feed.stop_times["trip_id"].isin(trip_ids)][ ["trip_id", "stop_sequence", "stop_id"] ] f = f.sort_values(["trip_id", "stop_sequence"]) if f.empty: # Nothing to do return feed # Create new shape IDs for given trips. # To do this, collect unique stop sequences, # sort them to impose a canonical order, and # assign shape IDs to them stop_seqs = sorted( set( tuple(group["stop_id"].values) for trip, group in f.groupby("trip_id") ) ) k = int(math.log10(len(stop_seqs))) + 1 # Digits for padding shape IDs shape_by_stop_seq = { seq: f"shape_{i:0{k}d}" for i, seq in enumerate(stop_seqs) } # Assign these new shape IDs to given trips shape_by_trip = { trip: shape_by_stop_seq[tuple(group["stop_id"].values)] for trip, group in f.groupby("trip_id") } trip_cond = feed.trips["trip_id"].isin(trip_ids) feed.trips.loc[trip_cond, "shape_id"] = feed.trips.loc[ trip_cond, "trip_id" ].map(lambda x: shape_by_trip[x]) # Build new shapes for given trips G = [ [shape, i, stop] for stop_seq, shape in shape_by_stop_seq.items() for i, stop in enumerate(stop_seq) ] g = pd.DataFrame(G, columns=["shape_id", "shape_pt_sequence", "stop_id"]) g = g.merge(feed.stops[["stop_id", "stop_lon", "stop_lat"]]).sort_values( ["shape_id", "shape_pt_sequence"] ) g = g.drop(["stop_id"], axis=1) g = g.rename( columns={"stop_lon": "shape_pt_lon", "stop_lat": "shape_pt_lat"} ) if feed.shapes is not None and not all_trips: # Update feed shapes with new shapes feed.shapes = pd.concat([feed.shapes, g]) else: # Create all new shapes feed.shapes = g return feed def compute_bounds(feed: "Feed") -> Tuple: """ Return the tuple (min longitude, min latitude, max longitude, max latitude) where the longitudes and latitude vary across all the Feed's stop coordinates. """ lons, lats = feed.stops["stop_lon"], feed.stops["stop_lat"] return lons.min(), lats.min(), lons.max(), lats.max() def compute_convex_hull(feed: "Feed") -> Polygon: """ Return a Shapely Polygon representing the convex hull formed by the stops of the given Feed. """ m = sg.MultiPoint(feed.stops[["stop_lon", "stop_lat"]].values) return m.convex_hull def compute_center( feed: "Feed", num_busiest_stops: Optional[int] = None ) -> Tuple: """ Return the centroid (WGS84 longitude-latitude pair) of the convex hull of the stops of the given Feed. If ``num_busiest_stops`` (integer) is given, then compute the ``num_busiest_stops`` busiest stops in the feed on the first Monday of the feed and return the mean of the longitudes and the mean of the latitudes of these stops, respectively. """ s = feed.stops.copy() if num_busiest_stops is None: hull = compute_convex_hull(feed) lon, lat = list(hull.centroid.coords)[0] else: date = feed.get_first_week()[0] ss = feed.compute_stop_stats([date]).sort_values( "num_trips", ascending=False ) if ss.stop_id.isnull().all(): # No stats, which could happen with a crappy feed. # Fall back to all stops. hull = compute_convex_hull(feed) lon, lat = list(hull.centroid.coords)[0] else: f = ss.head(num_busiest_stops) f = s.merge(f) lon = f["stop_lon"].mean() lat = f["stop_lat"].mean() return lon, lat def restrict_to_dates(feed: "Feed", dates: List[str]) -> "Feed": """ Build a new feed by restricting this one to only the stops, trips, shapes, etc. active on at least one of the given dates (YYYYMMDD strings). Return the resulting feed, which will have empty non-agency tables if no trip is active on any of the given dates. """ # Initialize the new feed as the old feed. # Restrict its DataFrames below. feed = feed.copy() # Get every trip that is active on at least one of the dates try: trip_ids = feed.compute_trip_activity(dates).loc[ lambda x: x[[c for c in x.columns if c != "trip_id"]].sum(axis=1) > 0, "trip_id", ] except KeyError: # No trips trip_ids = [] # Slice trips feed.trips = feed.trips.loc[lambda x: x.trip_id.isin(trip_ids)] # Slice routes feed.routes = feed.routes.loc[ lambda x: x.route_id.isin(feed.trips.route_id) ] # Slice stop times feed.stop_times = feed.stop_times.loc[lambda x: x.trip_id.isin(trip_ids)] # Slice stops stop_ids = feed.stop_times.stop_id.unique() feed.stops = feed.stops.loc[lambda x: x.stop_id.isin(stop_ids)] # Slice calendar service_ids = feed.trips.service_id if feed.calendar is not None: feed.calendar = feed.calendar.loc[ lambda x: x.service_id.isin(service_ids) ] # Get agency for trips if "agency_id" in feed.routes.columns: agency_ids = feed.routes.agency_id if len(agency_ids): feed.agency = feed.agency.loc[ lambda x: x.agency_id.isin(agency_ids) ] # Now for the optional files. # Get calendar dates for trips. if feed.calendar_dates is not None: feed.calendar_dates = feed.calendar_dates.loc[ lambda x: x.service_id.isin(service_ids) ] # Get frequencies for trips if feed.frequencies is not None: feed.frequencies = feed.frequencies.loc[ lambda x: x.trip_id.isin(trip_ids) ] # Get shapes for trips if feed.shapes is not None: shape_ids = feed.trips.shape_id feed.shapes = feed.shapes.loc[lambda x: x.shape_id.isin(shape_ids)] # Get transfers for stops if feed.transfers is not None: feed.transfers = feed.transfers.loc[ lambda x: x.from_stop_id.isin(stop_ids) | x.to_stop_id.isin(stop_ids) ] return feed def restrict_to_routes(feed: "Feed", route_ids: List[str]) -> "Feed": """ Build a new feed by restricting this one to only the stops, trips, shapes, etc. used by the routes with the given list of route IDs. Return the resulting feed. """ # Initialize the new feed as the old feed. # Restrict its DataFrames below. feed = feed.copy() # Slice routes feed.routes = feed.routes[feed.routes["route_id"].isin(route_ids)].copy() # Slice trips feed.trips = feed.trips[feed.trips["route_id"].isin(route_ids)].copy() # Slice stop times trip_ids = feed.trips["trip_id"] feed.stop_times = feed.stop_times[ feed.stop_times["trip_id"].isin(trip_ids) ].copy() # Slice stops stop_ids = feed.stop_times["stop_id"].unique() feed.stops = feed.stops[feed.stops["stop_id"].isin(stop_ids)].copy() # Slice calendar service_ids = feed.trips["service_id"] if feed.calendar is not None: feed.calendar = feed.calendar[ feed.calendar["service_id"].isin(service_ids) ].copy() # Get agency for trips if "agency_id" in feed.routes.columns: agency_ids = feed.routes["agency_id"] if len(agency_ids): feed.agency = feed.agency[ feed.agency["agency_id"].isin(agency_ids) ].copy() # Now for the optional files. # Get calendar dates for trips. if feed.calendar_dates is not None: feed.calendar_dates = feed.calendar_dates[ feed.calendar_dates["service_id"].isin(service_ids) ].copy() # Get frequencies for trips if feed.frequencies is not None: feed.frequencies = feed.frequencies[ feed.frequencies["trip_id"].isin(trip_ids) ].copy() # Get shapes for trips if feed.shapes is not None: shape_ids = feed.trips["shape_id"] feed.shapes = feed.shapes[ feed.shapes["shape_id"].isin(shape_ids) ].copy() # Get transfers for stops if feed.transfers is not None: feed.transfers = feed.transfers[ feed.transfers["from_stop_id"].isin(stop_ids) | feed.transfers["to_stop_id"].isin(stop_ids) ].copy() return feed def restrict_to_polygon(feed: "Feed", polygon: Polygon) -> "Feed": """ Build a new feed by restricting this one to only the trips that have at least one stop intersecting the given Shapely polygon, then restricting stops, routes, stop times, etc. to those associated with that subset of trips. Return the resulting feed. Requires GeoPandas. Assume the following feed attributes are not ``None``: - ``feed.stop_times`` - ``feed.trips`` - ``feed.stops`` - ``feed.routes`` - Those used in :func:`.stops.get_stops_in_polygon` """ # Initialize the new feed as the old feed. # Restrict its DataFrames below. feed = feed.copy() # Get IDs of stops within the polygon stop_ids = feed.get_stops_in_polygon(polygon)["stop_id"] # Get all trips that stop at at least one of those stops st = feed.stop_times.copy() trip_ids = st[st["stop_id"].isin(stop_ids)]["trip_id"] feed.trips = feed.trips[feed.trips["trip_id"].isin(trip_ids)].copy() # Get stop times for trips feed.stop_times = st[st["trip_id"].isin(trip_ids)].copy() # Get stops for trips stop_ids = feed.stop_times["stop_id"] feed.stops = feed.stops[feed.stops["stop_id"].isin(stop_ids)].copy() # Get routes for trips route_ids = feed.trips["route_id"] feed.routes = feed.routes[feed.routes["route_id"].isin(route_ids)].copy() # Get calendar for trips service_ids = feed.trips["service_id"] if feed.calendar is not None: feed.calendar = feed.calendar[ feed.calendar["service_id"].isin(service_ids) ].copy() # Get agency for trips if "agency_id" in feed.routes.columns: agency_ids = feed.routes["agency_id"] if len(agency_ids): feed.agency = feed.agency[ feed.agency["agency_id"].isin(agency_ids) ].copy() # Now for the optional files. # Get calendar dates for trips. cd = feed.calendar_dates if cd is not None: feed.calendar_dates = cd[cd["service_id"].isin(service_ids)].copy() # Get frequencies for trips if feed.frequencies is not None: feed.frequencies = feed.frequencies[ feed.frequencies["trip_id"].isin(trip_ids) ].copy() # Get shapes for trips if feed.shapes is not None: shape_ids = feed.trips["shape_id"] feed.shapes = feed.shapes[ feed.shapes["shape_id"].isin(shape_ids) ].copy() # Get transfers for stops if feed.transfers is not None: t = feed.transfers feed.transfers = t[ t["from_stop_id"].isin(stop_ids) | t["to_stop_id"].isin(stop_ids) ].copy() return feed def compute_screen_line_counts( feed: "Feed", linestring: LineString, dates: List[str], geo_shapes=None ) -> DataFrame: """ Find all the Feed trips active on the given dates that intersect the given Shapely LineString (with WGS84 longitude-latitude coordinates). Parameters ---------- feed : Feed linestring : Shapely LineString dates : list YYYYMMDD date strings Returns ------- DataFrame The columns are - ``'date'`` - ``'trip_id'`` - ``'route_id'`` - ``'route_short_name'`` - ``'crossing_time'``: time that the trip's vehicle crosses the linestring; one trip could cross multiple times - ``'orientation'``: 1 or -1; 1 indicates trip travel from the left side to the right side of the screen line; -1 indicates trip travel in the opposite direction Notes ----- - Requires GeoPandas - The first step is to geometrize ``feed.shapes`` via :func:`.shapes.geometrize_shapes`. Alternatively, use the ``geo_shapes`` GeoDataFrame, if given. - Assume ``feed.stop_times`` has an accurate ``shape_dist_traveled`` column. - Assume that trips travel in the same direction as their shapes. That restriction is part of GTFS, by the way. To calculate direction quickly and accurately, assume that the screen line is straight and doesn't double back on itself. - Probably does not give correct results for trips with self-intersecting shapes. - The algorithm works as follows 1. Compute all the shapes that intersect the linestring 2. For each such shape, compute the intersection points 3. For each point p, scan through all the trips in the feed that have that shape 4. For each date in ``dates``, restrict to trips active on the date and interpolate a stop time for p by assuming that the feed has the shape_dist_traveled field in stop times 5. Use that interpolated time as the crossing time of the trip vehicle, and compute the trip orientation to the screen line via a cross product of a vector in the direction of the screen line and a tiny vector in the direction of trip travel - Assume the following feed attributes are not ``None``: * ``feed.shapes``, if ``geo_shapes`` is not given """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() # Get all shapes that intersect the screen line shapes = feed.get_shapes_intersecting_geometry( linestring, geo_shapes, geometrized=True ) # Convert shapes to UTM lat, lon = feed.shapes.loc[0, ["shape_pt_lat", "shape_pt_lon"]].values crs = hp.get_utm_crs(lat, lon) shapes = shapes.to_crs(crs) # Convert linestring to UTM linestring = hp.linestring_to_utm(linestring) # Get all intersection points of shapes and linestring shapes["intersection"] = shapes.intersection(linestring) # Make a vector in the direction of the screen line # to later calculate trip orientation. # Does not work in case of a bent screen line. p1 = sg.Point(linestring.coords[0]) p2 = sg.Point(linestring.coords[-1]) w = np.array([p2.x - p1.x, p2.y - p1.y]) # Build a dictionary from the shapes DataFrame of the form # shape ID -> list of pairs (d, v), one for each intersection point, # where d is the distance of the intersection point along shape, # and v is a tiny vectors from the point in direction of shape. # Assume here that trips travel in the same direction as their shapes. dv_by_shape = {} eps = 1 convert_dist = hp.get_convert_dist("m", feed.dist_units) for __, sid, geom, intersection in shapes.itertuples(): # Get distances along shape of intersection points (in meters) distances = [geom.project(p) for p in intersection] # Build tiny vectors vectors = [] for i, p in enumerate(intersection): q = geom.interpolate(distances[i] + eps) vector = np.array([q.x - p.x, q.y - p.y]) vectors.append(vector) # Convert distances to units used in feed distances = [convert_dist(d) for d in distances] dv_by_shape[sid] = list(zip(distances, vectors)) # Get trips with those shapes t = feed.trips t = t[t["shape_id"].isin(dv_by_shape.keys())].copy() # Merge in route short names and stop times t = t.merge(feed.routes[["route_id", "route_short_name"]]).merge( feed.stop_times ) # Drop NaN departure times and convert to seconds past midnight t = t[t["departure_time"].notnull()].copy() t["departure_time"] = t["departure_time"].map(hp.timestr_to_seconds) # Compile crossings by date a = feed.compute_trip_activity(dates) rows = [] for date in dates: # Slice to trips active on date ids = a.loc[a[date] == 1, "trip_id"] f = t[t["trip_id"].isin(ids)].copy() # For each shape find the trips that cross the screen line # and get crossing times and orientation f = f.sort_values(["trip_id", "stop_sequence"]) for tid, group in f.groupby("trip_id"): sid = group["shape_id"].iat[0] rid = group["route_id"].iat[0] rsn = group["route_short_name"].iat[0] stop_times = group["departure_time"].values stop_distances = group["shape_dist_traveled"].values for d, v in dv_by_shape[sid]: # Interpolate crossing time time = np.interp(d, stop_distances, stop_times) # Compute direction of trip travel relative to # screen line by looking at the sign of the cross # product of tiny shape vector and screen line vector det = np.linalg.det(np.array([v, w])) if det >= 0: orientation = 1 else: orientation = -1 # Update rows rows.append([date, tid, rid, rsn, time, orientation]) # Create DataFrame cols = [ "date", "trip_id", "route_id", "route_short_name", "crossing_time", "orientation", ] g = pd.DataFrame(rows, columns=cols).sort_values(["date", "crossing_time"]) # Convert departure times back to time strings g["crossing_time"] = g["crossing_time"].map( lambda x: hp.timestr_to_seconds(x, inverse=True) ) return g PK!!igtfstk/routes.py""" Functions about routes. """ from collections import OrderedDict from typing import Optional, List, Dict, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np import shapely.geometry as sg import shapely.ops as so from . import constants as cs from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def compute_route_stats_base( trip_stats_subset: DataFrame, headway_start_time: str = "07:00:00", headway_end_time: str = "19:00:00", *, split_directions: bool = False, ) -> DataFrame: """ Compute stats for the given subset of trips stats. Parameters ---------- trip_stats_subset : DataFrame Subset of the output of :func:`.trips.compute_trip_stats` split_directions : boolean If ``True``, then separate the stats by trip direction (0 or 1); otherwise aggregate trips visiting from both directions headway_start_time : string HH:MM:SS time string indicating the start time for computing headway stats headway_end_time : string HH:MM:SS time string indicating the end time for computing headway stats Returns ------- DataFrame Columns are - ``'route_id'`` - ``'route_short_name'`` - ``'route_type'`` - ``'direction_id'`` - ``'num_trips'``: number of trips on the route in the subset - ``'num_trip_starts'``: number of trips on the route with nonnull start times - ``'num_trip_ends'``: number of trips on the route with nonnull end times that end before 23:59:59 - ``'is_loop'``: 1 if at least one of the trips on the route has its ``is_loop`` field equal to 1; 0 otherwise - ``'is_bidirectional'``: 1 if the route has trips in both directions; 0 otherwise - ``'start_time'``: start time of the earliest trip on the route - ``'end_time'``: end time of latest trip on the route - ``'max_headway'``: maximum of the durations (in minutes) between trip starts on the route between ``headway_start_time`` and ``headway_end_time`` on the given dates - ``'min_headway'``: minimum of the durations (in minutes) mentioned above - ``'mean_headway'``: mean of the durations (in minutes) mentioned above - ``'peak_num_trips'``: maximum number of simultaneous trips in service (for the given direction, or for both directions when ``split_directions==False``) - ``'peak_start_time'``: start time of first longest period during which the peak number of trips occurs - ``'peak_end_time'``: end time of first longest period during which the peak number of trips occurs - ``'service_duration'``: total of the duration of each trip on the route in the given subset of trips; measured in hours - ``'service_distance'``: total of the distance traveled by each trip on the route in the given subset of trips; measured in whatever distance units are present in ``trip_stats_subset``; contains all ``np.nan`` entries if ``feed.shapes is None`` - ``'service_speed'``: service_distance/service_duration; measured in distance units per hour - ``'mean_trip_distance'``: service_distance/num_trips - ``'mean_trip_duration'``: service_duration/num_trips If not ``split_directions``, then remove the direction_id column and compute each route's stats, except for headways, using its trips running in both directions. In this case, (1) compute max headway by taking the max of the max headways in both directions; (2) compute mean headway by taking the weighted mean of the mean headways in both directions. If ``trip_stats_subset`` is empty, return an empty DataFrame. Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ if trip_stats_subset.empty: return pd.DataFrame() # Convert trip start and end times to seconds to ease calculations below f = trip_stats_subset.copy() f[["start_time", "end_time"]] = f[["start_time", "end_time"]].applymap( hp.timestr_to_seconds ) headway_start = hp.timestr_to_seconds(headway_start_time) headway_end = hp.timestr_to_seconds(headway_end_time) def compute_route_stats_split_directions(group): # Take this group of all trips stats for a single route # and compute route-level stats. d = OrderedDict() d["route_short_name"] = group["route_short_name"].iat[0] d["route_type"] = group["route_type"].iat[0] d["num_trips"] = group.shape[0] d["num_trip_starts"] = group["start_time"].count() d["num_trip_ends"] = group.loc[ group["end_time"] < 24 * 3600, "end_time" ].count() d["is_loop"] = int(group["is_loop"].any()) d["start_time"] = group["start_time"].min() d["end_time"] = group["end_time"].max() # Compute max and mean headway stimes = group["start_time"].values stimes = sorted( [ stime for stime in stimes if headway_start <= stime <= headway_end ] ) headways = np.diff(stimes) if headways.size: d["max_headway"] = np.max(headways) / 60 # minutes d["min_headway"] = np.min(headways) / 60 # minutes d["mean_headway"] = np.mean(headways) / 60 # minutes else: d["max_headway"] = np.nan d["min_headway"] = np.nan d["mean_headway"] = np.nan # Compute peak num trips active_trips = hp.get_active_trips_df( group[["start_time", "end_time"]] ) times, counts = active_trips.index.values, active_trips.values start, end = hp.get_peak_indices(times, counts) d["peak_num_trips"] = counts[start] d["peak_start_time"] = times[start] d["peak_end_time"] = times[end] d["service_distance"] = group["distance"].sum() d["service_duration"] = group["duration"].sum() return pd.Series(d) def compute_route_stats(group): d = OrderedDict() d["route_short_name"] = group["route_short_name"].iat[0] d["route_type"] = group["route_type"].iat[0] d["num_trips"] = group.shape[0] d["num_trip_starts"] = group["start_time"].count() d["num_trip_ends"] = group.loc[ group["end_time"] < 24 * 3600, "end_time" ].count() d["is_loop"] = int(group["is_loop"].any()) d["is_bidirectional"] = int(group["direction_id"].unique().size > 1) d["start_time"] = group["start_time"].min() d["end_time"] = group["end_time"].max() # Compute headway stats headways = np.array([]) for direction in [0, 1]: stimes = group[group["direction_id"] == direction][ "start_time" ].values stimes = sorted( [ stime for stime in stimes if headway_start <= stime <= headway_end ] ) headways = np.concatenate([headways, np.diff(stimes)]) if headways.size: d["max_headway"] = np.max(headways) / 60 # minutes d["min_headway"] = np.min(headways) / 60 # minutes d["mean_headway"] = np.mean(headways) / 60 # minutes else: d["max_headway"] = np.nan d["min_headway"] = np.nan d["mean_headway"] = np.nan # Compute peak num trips active_trips = hp.get_active_trips_df( group[["start_time", "end_time"]] ) times, counts = active_trips.index.values, active_trips.values start, end = hp.get_peak_indices(times, counts) d["peak_num_trips"] = counts[start] d["peak_start_time"] = times[start] d["peak_end_time"] = times[end] d["service_distance"] = group["distance"].sum() d["service_duration"] = group["duration"].sum() return pd.Series(d) if split_directions: f = f.loc[lambda x: x.direction_id.notnull()].assign( direction_id=lambda x: x.direction_id.astype(int) ) if f.empty: raise ValueError( "At least one trip stats direction ID value " "must be non-NaN." ) g = ( f.groupby(["route_id", "direction_id"]) .apply(compute_route_stats_split_directions) .reset_index() ) # Add the is_bidirectional column def is_bidirectional(group): d = {} d["is_bidirectional"] = int( group["direction_id"].unique().size > 1 ) return pd.Series(d) gg = g.groupby("route_id").apply(is_bidirectional).reset_index() g = g.merge(gg) else: g = f.groupby("route_id").apply(compute_route_stats).reset_index() # Compute a few more stats g["service_speed"] = g["service_distance"] / g["service_duration"] g["mean_trip_distance"] = g["service_distance"] / g["num_trips"] g["mean_trip_duration"] = g["service_duration"] / g["num_trips"] # Convert route times to time strings g[["start_time", "end_time", "peak_start_time", "peak_end_time"]] = g[ ["start_time", "end_time", "peak_start_time", "peak_end_time"] ].applymap(lambda x: hp.timestr_to_seconds(x, inverse=True)) return g def compute_route_time_series_base( trip_stats_subset: DataFrame, date_label: str = "20010101", freq: str = "5Min", *, split_directions: bool = False, ) -> DataFrame: """ Compute stats in a 24-hour time series form for the given subset of trips. Parameters ---------- trip_stats_subset : DataFrame A subset of the output of :func:`.trips.compute_trip_stats` split_directions : boolean If ``True``, then separate each routes's stats by trip direction; otherwise aggregate trips in both directions freq : Pandas frequency string Specifices the frequency with which to resample the time series; max frequency is one minute ('Min') date_label : string YYYYMMDD date string used as the date in the time series index Returns ------- DataFrame A time series version of the following route stats for each route. - ``num_trips``: number of trips in service on the route at any time within the time bin - ``num_trip_starts``: number of trips that start within the time bin - ``num_trip_ends``: number of trips that end within the time bin, ignoring trips that end past midnight - ``service_distance``: sum of the service duration accrued during the time bin across all trips on the route; measured in hours - ``service_distance``: sum of the service distance accrued during the time bin across all trips on the route; measured in kilometers - ``service_speed``: ``service_distance/service_duration`` for the route The columns are hierarchical (multi-indexed) with - top level: name is ``'indicator'``; values are ``'num_trip_starts'``, ``'num_trip_ends'``, ``'num_trips'``, ``'service_distance'``, ``'service_duration'``, and ``'service_speed'`` - middle level: name is ``'route_id'``; values are the active routes - bottom level: name is ``'direction_id'``; values are 0s and 1s If not ``split_directions``, then don't include the bottom level. The time series has a timestamp index for a 24-hour period sampled at the given frequency. The maximum allowable frequency is 1 minute. If ``trip_stats_subset`` is empty, then return an empty DataFrame with the columns ``'num_trip_starts'``, ``'num_trip_ends'``, ``'num_trips'``, ``'service_distance'``, ``'service_duration'``, and ``'service_speed'``. Notes ----- - The time series is computed at a one-minute frequency, then resampled at the end to the given frequency - Trips that lack start or end times are ignored, so the the aggregate ``num_trips`` across the day could be less than the ``num_trips`` column of :func:`compute_route_stats_base` - All trip departure times are taken modulo 24 hours. So routes with trips that end past 23:59:59 will have all their stats wrap around to the early morning of the time series, except for their ``num_trip_ends`` indicator. Trip endings past 23:59:59 not binned so that resampling the ``num_trips`` indicator works efficiently. - Note that the total number of trips for two consecutive time bins t1 < t2 is the sum of the number of trips in bin t2 plus the number of trip endings in bin t1. Thus we can downsample the ``num_trips`` indicator by keeping track of only one extra count, ``num_trip_ends``, and can avoid recording individual trip IDs. - All other indicators are downsampled by summing. - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ if trip_stats_subset.empty: return pd.DataFrame() tss = trip_stats_subset.copy() if split_directions: tss = tss.loc[lambda x: x.direction_id.notnull()].assign( direction_id=lambda x: x.direction_id.astype(int) ) if tss.empty: raise ValueError( "At least one trip stats direction ID value " "must be non-NaN." ) # Alter route IDs to encode direction: # -0 and -1 or -NA tss["route_id"] = ( tss["route_id"] + "-" + tss["direction_id"].map(lambda x: str(int(x))) ) routes = tss["route_id"].unique() # Build a dictionary of time series and then merge them all # at the end. # Assign a uniform generic date for the index date_str = date_label day_start = pd.to_datetime(date_str + " 00:00:00") day_end = pd.to_datetime(date_str + " 23:59:00") rng = pd.period_range(day_start, day_end, freq="Min") indicators = [ "num_trip_starts", "num_trip_ends", "num_trips", "service_duration", "service_distance", ] bins = [i for i in range(24 * 60)] # One bin for each minute num_bins = len(bins) # Bin start and end times def F(x): return (hp.timestr_to_seconds(x) // 60) % (24 * 60) tss[["start_index", "end_index"]] = tss[ ["start_time", "end_time"] ].applymap(F) routes = sorted(set(tss["route_id"].values)) # Bin each trip according to its start and end time and weight series_by_route_by_indicator = { indicator: {route: [0 for i in range(num_bins)] for route in routes} for indicator in indicators } for index, row in tss.iterrows(): route = row["route_id"] start = row["start_index"] end = row["end_index"] distance = row["distance"] if start is None or np.isnan(start) or start == end: continue # Get bins to fill if start <= end: bins_to_fill = bins[start:end] else: bins_to_fill = bins[start:] + bins[:end] # Bin trip # Do num trip starts series_by_route_by_indicator["num_trip_starts"][route][start] += 1 # Don't mark trip ends for trips that run past midnight; # allows for easy resampling of num_trips later if start <= end: series_by_route_by_indicator["num_trip_ends"][route][end] += 1 # Do rest of indicators for indicator in indicators[2:]: if indicator == "num_trips": weight = 1 elif indicator == "service_duration": weight = 1 / 60 else: weight = distance / len(bins_to_fill) for bin in bins_to_fill: series_by_route_by_indicator[indicator][route][bin] += weight # Create one time series per indicator rng = pd.date_range(date_str, periods=24 * 60, freq="Min") series_by_indicator = { indicator: pd.DataFrame( series_by_route_by_indicator[indicator], index=rng ).fillna(0) for indicator in indicators } # Combine all time series into one time series g = hp.combine_time_series( series_by_indicator, kind="route", split_directions=split_directions ) return hp.downsample(g, freq=freq) def get_routes( feed: "Feed", date: Optional[str] = None, time: Optional[str] = None ) -> DataFrame: """ Return a subset of ``feed.routes`` Parameters ----------- feed : Feed date : string YYYYMMDD date string restricting routes to only those active on the date time : string HH:MM:SS time string, possibly with HH > 23, restricting routes to only those active during the time Returns ------- DataFrame A subset of ``feed.routes`` Notes ----- Assume the following feed attributes are not ``None``: - ``feed.routes`` - Those used in :func:`.trips.get_trips`. """ if date is None: return feed.routes.copy() trips = feed.get_trips(date, time) R = trips["route_id"].unique() return feed.routes[feed.routes["route_id"].isin(R)] def compute_route_stats( feed: "Feed", trip_stats_subset: DataFrame, dates: List[str], headway_start_time: str = "07:00:00", headway_end_time: str = "19:00:00", *, split_directions: bool = False, ) -> DataFrame: """ Compute route stats for all the trips that lie in the given subset of trip stats and that start on the given dates. Parameters ---------- feed : Feed trip_stats_subset : DataFrame Slice of the output of :func:`.trips.compute_trip_stats` dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats split_directions : boolean If ``True``, then separate the stats by trip direction (0 or 1); otherwise aggregate trips visiting from both directions headway_start_time : string HH:MM:SS time string indicating the start time for computing headway stats headway_end_time : string HH:MM:SS time string indicating the end time for computing headway stats Returns ------- DataFrame Columns are - ``'date'`` - the columns listed in :func:``compute_route_stats_base`` Dates with no trip activity will have null stats. Exclude dates that lie outside of the Feed's date range. If all the dates given lie outside of the Feed's date range, then return an empty DataFrame. Notes ----- - The route stats for date d contain stats for trips that start on date d only and ignore trips that start on date d-1 and end on date d - Assume the following feed attributes are not ``None``: * Those used in :func:`.helpers.compute_route_stats_base` - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() ts = trip_stats_subset.copy() activity = feed.compute_trip_activity(dates) # Collect stats for each date, memoizing stats by trip ID sequence # to avoid unnecessary recomputations. # Store in dictionary of the form # trip ID sequence -> # [stats DataFarme, date list that stats apply] stats_and_dates_by_ids = {} cols = [ "route_id", "route_short_name", "route_type", "num_trips", "num_trip_ends", "num_trip_starts", "is_bidirectional", "is_loop", "start_time", "end_time", "max_headway", "min_headway", "mean_headway", "peak_num_trips", "peak_start_time", "peak_end_time", "service_duration", "service_distance", "service_speed", "mean_trip_distance", "mean_trip_duration", ] if split_directions: cols.append("direction_id") null_stats = pd.DataFrame( OrderedDict([(c, np.nan) for c in cols]), index=[0] ) for date in dates: ids = tuple(activity.loc[activity[date] > 0, "trip_id"]) if ids in stats_and_dates_by_ids: # Append date to date list stats_and_dates_by_ids[ids][1].append(date) elif not ids: # Null stats stats_and_dates_by_ids[ids] = [null_stats, [date]] else: # Compute stats t = ts[ts["trip_id"].isin(ids)].copy() stats = compute_route_stats_base( t, split_directions=split_directions, headway_start_time=headway_start_time, headway_end_time=headway_end_time, ) # Remember stats stats_and_dates_by_ids[ids] = [stats, [date]] # Assemble stats into DataFrame frames = [] for stats, dates_ in stats_and_dates_by_ids.values(): for date in dates_: f = stats.copy() f["date"] = date frames.append(f) f = ( pd.concat(frames) .sort_values(["date", "route_id"]) .reset_index(drop=True) ) return f def build_null_route_time_series( feed: "Feed", date_label: str = "20010101", freq: str = "5Min", *, split_directions: bool = False, ) -> DataFrame: """ Return a route time series with the same index and hierarchical columns as output by :func:`compute_route_time_series_base`, but fill it full of null values. """ start = date_label end = pd.to_datetime(date_label + " 23:59:00") rng = pd.date_range(start, end, freq=freq) inds = [ "num_trip_starts", "num_trip_ends", "num_trips", "service_duration", "service_distance", "service_speed", ] rids = feed.routes.route_id if split_directions: product = [inds, rids, [0, 1]] names = ["indicator", "route_id", "direction_id"] else: product = [inds, rids] names = ["indicator", "route_id"] cols = pd.MultiIndex.from_product(product, names=names) return pd.DataFrame([], index=rng, columns=cols).sort_index( axis=1, sort_remaining=True ) def compute_route_time_series( feed: "Feed", trip_stats_subset: DataFrame, dates: List[str], freq: str = "5Min", *, split_directions: bool = False, ) -> DataFrame: """ Compute route stats in time series form for the trips that lie in the trip stats subset and that start on the given dates. Parameters ---------- feed : Feed trip_stats_subset : DataFrame Slice of the output of :func:`.trips.compute_trip_stats` dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats split_directions : boolean If ``True``, then separate each routes's stats by trip direction; otherwise aggregate trips in both directions freq : Pandas frequency string Specifices the frequency with which to resample the time series; max frequency is one minute ('Min') Returns ------- DataFrame Same format as output by :func:`compute_route_time_series_base` but with multiple dates Exclude dates that lie outside of the Feed's date range. If all dates lie outside the Feed's date range, then return an empty DataFrame. Notes ----- - See the notes for :func:`compute_route_time_series_base` - Assume the following feed attributes are not ``None``: * Those used in :func:`.trips.get_trips` - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() activity = feed.compute_trip_activity(dates) ts = trip_stats_subset.copy() # Collect stats for each date, memoizing stats by trip ID sequence # to avoid unnecessary re-computations. # Store in dictionary of the form # trip ID sequence -> # [stats DataFarme, date list that stats apply] stats_and_dates_by_ids = {} null_stats = build_null_route_time_series( feed, split_directions=split_directions, freq=freq ) for date in dates: ids = tuple(activity.loc[activity[date] > 0, "trip_id"]) if ids in stats_and_dates_by_ids: # Append date to date list stats_and_dates_by_ids[ids][1].append(date) elif not ids: # Null stats stats_and_dates_by_ids[ids] = [null_stats, [date]] else: # Compute stats t = ts[ts["trip_id"].isin(ids)].copy() stats = compute_route_time_series_base( t, split_directions=split_directions, freq=freq, date_label=date, ) # Remember stats stats_and_dates_by_ids[ids] = [stats, [date]] # Assemble stats into DataFrame frames = [] for stats, dates_ in stats_and_dates_by_ids.values(): for date in dates_: f = stats.copy() # Replace date d = hp.datestr_to_date(date) f.index = f.index.map( lambda t: t.replace(year=d.year, month=d.month, day=d.day) ) frames.append(f) f = pd.concat(frames).sort_index().sort_index(axis=1, sort_remaining=True) if len(dates) > 1: # Insert missing dates and NaNs to complete series index end_datetime = pd.to_datetime(dates[-1] + " 23:59:59") new_index = pd.date_range(dates[0], end_datetime, freq=freq) f = f.reindex(new_index) else: # Set frequency f.index.freq = pd.tseries.frequencies.to_offset(freq) return f def build_route_timetable( feed: "Feed", route_id: str, dates: List[str] ) -> DataFrame: """ Return a timetable for the given route and dates. Parameters ---------- feed : Feed route_id : string ID of a route in ``feed.routes`` dates : string or list A YYYYMMDD date string or list thereof Returns ------- DataFrame The columns are all those in ``feed.trips`` plus those in ``feed.stop_times`` plus ``'date'``, and the trip IDs are restricted to the given route ID. The result is sorted first by date and then by grouping by trip ID and sorting the groups by their first departure time. Skip dates outside of the Feed's dates. If there is no route activity on the given dates, then return an empty DataFrame. Notes ----- Assume the following feed attributes are not ``None``: - ``feed.stop_times`` - Those used in :func:`.trips.get_trips` """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() t = pd.merge(feed.trips, feed.stop_times) t = t[t["route_id"] == route_id].copy() a = feed.compute_trip_activity(dates) frames = [] for date in dates: # Slice to trips active on date ids = a.loc[a[date] == 1, "trip_id"] f = t[t["trip_id"].isin(ids)].copy() f["date"] = date # Groupby trip ID and sort groups by their minimum departure time. # For some reason NaN departure times mess up the transform below. # So temporarily fill NaN departure times as a workaround. f["dt"] = f["departure_time"].fillna(method="ffill") f["min_dt"] = f.groupby("trip_id")["dt"].transform(min) frames.append(f) f = pd.concat(frames) return f.sort_values(["date", "min_dt", "stop_sequence"]).drop( ["min_dt", "dt"], axis=1 ) def route_to_geojson( feed: "Feed", route_id: str, date: Optional[str] = None, *, include_stops: bool = False, ) -> Dict: """ Return a GeoJSON rendering of the route and, optionally, its stops. Parameters ---------- feed : Feed route_id : string ID of a route in ``feed.routes`` date : string YYYYMMDD date string restricting the output to trips active on the date include_stops : boolean If ``True``, then include stop features in the result Returns ------- dictionary A decoded GeoJSON feature collection comprising a LineString features of the distinct shapes of the trips on the route. If ``include_stops``, then include one Point feature for each stop on the route. """ # Get set of unique trip shapes for route shapes = ( feed.get_trips(date=date) .loc[lambda x: x["route_id"] == route_id, "shape_id"] .unique() ) if not shapes.size: return {"type": "FeatureCollection", "features": []} geom_by_shape = feed.build_geometry_by_shape(shape_ids=shapes) # Get route properties route = ( feed.get_routes(date=date) .loc[lambda x: x["route_id"] == route_id] .fillna("n/a") .to_dict(orient="records", into=OrderedDict) )[0] # Build route shape features features = [ { "type": "Feature", "properties": route, "geometry": sg.mapping(sg.LineString(geom)), } for geom in geom_by_shape.values() ] # Build stop features if desired if include_stops: stops = ( feed.get_stops(route_id=route_id) .fillna("n/a") .to_dict(orient="records", into=OrderedDict) ) features.extend( [ { "type": "Feature", "geometry": { "type": "Point", "coordinates": [stop["stop_lon"], stop["stop_lat"]], }, "properties": stop, } for stop in stops ] ) return {"type": "FeatureCollection", "features": features} def map_routes( feed: "Feed", route_ids: List[str], date: Optional[str] = None, color_palette: List[str] = cs.COLORS_SET2, *, include_stops: bool = True, ): """ Return a Folium map showing the given routes and (optionally) their stops. Parameters ---------- feed : Feed route_ids : list IDs of routes in ``feed.routes`` date : string YYYYMMDD date string restricting the output to trips active on the date color_palette : list Palette to use to color the routes. If more routes than colors, then colors will be recycled. include_stops : boolean If ``True``, then include stops in the map Returns ------- dictionary A Folium Map depicting the distinct shapes of the trips on each route. If ``include_stops``, then include the stops for each route. Notes ------ - Requires Folium """ import folium as fl # Get routes slice and convert to dictionary routes = ( feed.routes.loc[lambda x: x["route_id"].isin(route_ids)] .fillna("n/a") .to_dict(orient="records") ) # Create route colors n = len(routes) colors = [color_palette[i % len(color_palette)] for i in range(n)] # Initialize map my_map = fl.Map(tiles="cartodbpositron") # Collect route bounding boxes to set map zoom later bboxes = [] # Create a feature group for each route and add it to the map for i, route in enumerate(routes): collection = feed.route_to_geojson( route_id=route["route_id"], date=date, include_stops=include_stops ) group = fl.FeatureGroup(name="Route " + route["route_short_name"]) color = colors[i] for f in collection["features"]: prop = f["properties"] # Add stop if f["geometry"]["type"] == "Point": lon, lat = f["geometry"]["coordinates"] fl.CircleMarker( location=[lat, lon], radius=8, fill=True, color=color, weight=1, popup=fl.Popup(hp.make_html(prop)), ).add_to(group) # Add path else: prop["color"] = color path = fl.GeoJson( f, name=route, style_function=lambda x: { "color": x["properties"]["color"] }, ) path.add_child(fl.Popup(hp.make_html(prop))) path.add_to(group) bboxes.append(sg.box(*sg.shape(f["geometry"]).bounds)) group.add_to(my_map) fl.LayerControl().add_to(my_map) # Fit map to bounds bounds = so.unary_union(bboxes).bounds bounds2 = [bounds[1::-1], bounds[3:1:-1]] # Folium expects this ordering my_map.fit_bounds(bounds2) return my_map PK![gtfstk/shapes.py""" Functions about shapes. """ from typing import Optional, List, Dict, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np import utm import shapely.geometry as sg from . import constants as cs from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def build_geometry_by_shape( feed: "Feed", shape_ids: Optional[List[str]] = None, *, use_utm: bool = False, ) -> Dict: """ Return a dictionary with structure shape_id -> Shapely LineString of shape. Parameters ---------- feed : Feed shape_ids : list IDs of shapes in ``feed.shapes`` to restrict output to; return all shapes if ``None``. use_utm : boolean If ``True``, then use local UTM coordinates; otherwise, use WGS84 coordinates Returns ------- dictionary Has the structure shape_id -> Shapely LineString of shape. If ``feed.shapes is None``, then return ``None``. Return the empty dictionary if ``feed.shapes is None``. """ if feed.shapes is None: return {} # Note the output for conversion to UTM with the utm package: # >>> u = utm.from_latlon(47.9941214, 7.8509671) # >>> print u # (414278, 5316285, 32, 'T') d = {} shapes = feed.shapes.copy() if shape_ids is not None: shapes = shapes[shapes["shape_id"].isin(shape_ids)] if use_utm: for shape, group in shapes.groupby("shape_id"): lons = group["shape_pt_lon"].values lats = group["shape_pt_lat"].values xys = [ utm.from_latlon(lat, lon)[:2] for lat, lon in zip(lats, lons) ] d[shape] = sg.LineString(xys) else: for shape, group in shapes.groupby("shape_id"): lons = group["shape_pt_lon"].values lats = group["shape_pt_lat"].values lonlats = zip(lons, lats) d[shape] = sg.LineString(lonlats) return d def shapes_to_geojson( feed: "Feed", shape_ids: Optional[List[str]] = None ) -> Dict: """ Return a (decoded) GeoJSON FeatureCollection of LineString features representing ``feed.shapes``. Each feature will have a ``shape_id`` property. The coordinates reference system is the default one for GeoJSON, namely WGS84. If a list of shape IDs is given, then return only the LineString features corresponding to those shape IDS. Return the empty dictionary if ``feed.shapes is None`` """ geometry_by_shape = feed.build_geometry_by_shape(shape_ids=shape_ids) if geometry_by_shape: fc = { "type": "FeatureCollection", "features": [ { "properties": {"shape_id": shape}, "type": "Feature", "geometry": sg.mapping(linestring), } for shape, linestring in geometry_by_shape.items() ], } else: fc = {} return fc def get_shapes_intersecting_geometry( feed: "Feed", geometry, geo_shapes=None, *, geometrized: bool = False ) -> DataFrame: """ Return the slice of ``feed.shapes`` that contains all shapes that intersect the given Shapely geometry, e.g. a Polygon or LineString. Parameters ---------- feed : Feed geometry : Shapley geometry, e.g. a Polygon Specified in WGS84 coordinates geo_shapes : GeoPandas GeoDataFrame The output of :func:`geometrize_shapes` geometrize : boolean If ``True``, then return the shapes DataFrame as a GeoDataFrame of the form output by :func:`geometrize_shapes` Returns ------- DataFrame or GeoDataFrame Notes ----- - Requires GeoPandas - Specifying ``geo_shapes`` will skip the first step of the algorithm, namely, geometrizing ``feed.shapes`` - Assume the following feed attributes are not ``None``: * ``feed.shapes``, if ``geo_shapes`` is not given """ if geo_shapes is not None: f = geo_shapes.copy() else: f = geometrize_shapes(feed.shapes) cols = f.columns f["hit"] = f["geometry"].intersects(geometry) f = f[f["hit"]][cols] if geometrized: return f else: return ungeometrize_shapes(f) def append_dist_to_shapes(feed: "Feed") -> "Feed": """ Calculate and append the optional ``shape_dist_traveled`` field in ``feed.shapes`` in terms of the distance units ``feed.dist_units``. Return the resulting Feed. Notes ----- - As a benchmark, using this function on `this Portland feed `_ produces a ``shape_dist_traveled`` column that differs by at most 0.016 km in absolute value from of the original values - Assume the following feed attributes are not ``None``: * ``feed.shapes`` """ if feed.shapes is None: raise ValueError( "This function requires the feed to have a shapes.txt file" ) feed = feed.copy() f = feed.shapes m_to_dist = hp.get_convert_dist("m", feed.dist_units) def compute_dist(group): # Compute the distances of the stops along this trip group = group.sort_values("shape_pt_sequence") shape = group["shape_id"].iat[0] if not isinstance(shape, str): group["shape_dist_traveled"] = np.nan return group points = [ sg.Point(utm.from_latlon(lat, lon)[:2]) for lon, lat in group[["shape_pt_lon", "shape_pt_lat"]].values ] p_prev = points[0] d = 0 distances = [0] for p in points[1:]: d += p.distance(p_prev) distances.append(d) p_prev = p group["shape_dist_traveled"] = distances return group g = f.groupby("shape_id", group_keys=False).apply(compute_dist) # Convert from meters g["shape_dist_traveled"] = g["shape_dist_traveled"].map(m_to_dist) feed.shapes = g return feed def geometrize_shapes( shapes: DataFrame, *, use_utm: bool = False ) -> DataFrame: """ Given a GTFS shapes DataFrame, convert it to a GeoPandas GeoDataFrame and return the result. The result has a ``'geometry'`` column of WGS84 LineStrings instead of the columns ``'shape_pt_sequence'``, ``'shape_pt_lon'``, ``'shape_pt_lat'``, and ``'shape_dist_traveled'``. If ``use_utm``, then use local UTM coordinates for the geometries. Notes ------ Requires GeoPandas. """ import geopandas as gpd f = shapes.copy().sort_values(["shape_id", "shape_pt_sequence"]) def my_agg(group): d = {} d["geometry"] = sg.LineString( group[["shape_pt_lon", "shape_pt_lat"]].values ) return pd.Series(d) g = f.groupby("shape_id").apply(my_agg).reset_index() g = gpd.GeoDataFrame(g, crs=cs.WGS84) if use_utm: lat, lon = f.loc[0, ["shape_pt_lat", "shape_pt_lon"]].values crs = hp.get_utm_crs(lat, lon) g = g.to_crs(crs) return g def ungeometrize_shapes(geo_shapes) -> DataFrame: """ The inverse of :func:`geometrize_shapes`. Produces the columns: - ``'shape_id'`` - ``'shape_pt_sequence'`` - ``'shape_pt_lon'`` - ``'shape_pt_lat'`` If ``geo_shapes`` is in UTM coordinates (has a UTM CRS property), then convert thoes UTM coordinates back to WGS84 coordinates, which is the standard for a GTFS shapes table. """ geo_shapes = geo_shapes.to_crs(cs.WGS84) F = [] for index, row in geo_shapes.iterrows(): F.extend( [ [row["shape_id"], i, x, y] for i, (x, y) in enumerate(row["geometry"].coords) ] ) return pd.DataFrame( F, columns=[ "shape_id", "shape_pt_sequence", "shape_pt_lon", "shape_pt_lat", ], ) PK!R.gtfstk/stop_times.py""" Functions about stop times. """ from typing import Optional, List, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def get_stop_times(feed: "Feed", date: Optional[str] = None) -> DataFrame: """ Return a subset of ``feed.stop_times``. Parameters ---------- feed : Feed date : string YYYYMMDD date string restricting the output to trips active on the date Returns ------- DataFrame Subset of ``feed.stop_times`` Notes ----- Assume the following feed attributes are not ``None``: - ``feed.stop_times`` - Those used in :func:`.trips.get_trips` """ f = feed.stop_times.copy() if date is None: return f g = feed.get_trips(date) return f[f["trip_id"].isin(g["trip_id"])] def append_dist_to_stop_times(feed: "Feed", trip_stats: DataFrame) -> "Feed": """ Calculate and append the optional ``shape_dist_traveled`` field in ``feed.stop_times`` in terms of the distance units ``feed.dist_units``. Need trip stats in the form output by :func:`.trips.compute_trip_stats` for this. Return the resulting Feed. Notes ----- - Does not always give accurate results, as described below. - The algorithm works as follows. Compute the ``shape_dist_traveled`` field by using Shapely to measure the distance of a stop along its trip linestring. If for a given trip this process produces a non-monotonically increasing, hence incorrect, list of (cumulative) distances, then fall back to estimating the distances as follows. Get the average speed of the trip via ``trip_stats`` and use is to linearly interpolate distances for stop times, assuming that the first stop is at shape_dist_traveled = 0 (the start of the shape) and the last stop is at shape_dist_traveled = the length of the trip (taken from trip_stats and equal to the length of the shape, unless ``trip_stats`` was called with ``get_dist_from_shapes == False``). This fallback method usually kicks in on trips with self-intersecting linestrings. Unfortunately, this fallback method will produce incorrect results when the first stop does not start at the start of its shape (so shape_dist_traveled != 0). This is the case for several trips in `this Portland feed `_, for example. - Assume the following feed attributes are not ``None``: * ``feed.stop_times`` * Those used in :func:`.shapes.build_geometry_by_shape` * Those used in :func:`.stops.build_geometry_by_stop` """ feed = feed.copy() geometry_by_shape = feed.build_geometry_by_shape(use_utm=True) geometry_by_stop = feed.build_geometry_by_stop(use_utm=True) # Initialize DataFrame f = pd.merge( feed.stop_times, trip_stats[["trip_id", "shape_id", "distance", "duration"]], ).sort_values(["trip_id", "stop_sequence"]) # Convert departure times to seconds past midnight to ease calculations f["departure_time"] = f["departure_time"].map(hp.timestr_to_seconds) dist_by_stop_by_shape = {shape: {} for shape in geometry_by_shape} m_to_dist = hp.get_convert_dist("m", feed.dist_units) def compute_dist(group): # Compute the distances of the stops along this trip shape = group["shape_id"].iat[0] if not isinstance(shape, str): group["shape_dist_traveled"] = np.nan return group elif np.isnan(group["distance"].iat[0]): group["shape_dist_traveled"] = np.nan return group linestring = geometry_by_shape[shape] distances = [] for stop in group["stop_id"].values: if stop in dist_by_stop_by_shape[shape]: d = dist_by_stop_by_shape[shape][stop] else: d = m_to_dist( hp.get_segment_length(linestring, geometry_by_stop[stop]) ) dist_by_stop_by_shape[shape][stop] = d distances.append(d) s = sorted(distances) D = linestring.length distances_are_reasonable = all([d < D + 100 for d in distances]) if distances_are_reasonable and s == distances: # Good pass elif distances_are_reasonable and s == distances[::-1]: # Reverse. This happens when the direction of a linestring # opposes the direction of the bus trip. distances = distances[::-1] else: # Totally redo using trip length, first and last stop times, # and linear interpolation dt = group["departure_time"] times = dt.values # seconds t0, t1 = times[0], times[-1] d0, d1 = 0, group["distance"].iat[0] # Get indices of nan departure times and # temporarily forward fill them # for the purposes of using np.interp smoothly nan_indices = np.where(dt.isnull())[0] dt.fillna(method="ffill") # Interpolate distances = np.interp(times, [t0, t1], [d0, d1]) # Nullify distances with nan departure times for i in nan_indices: distances[i] = np.nan group["shape_dist_traveled"] = distances return group g = f.groupby("trip_id", group_keys=False).apply(compute_dist) # Convert departure times back to time strings g["departure_time"] = g["departure_time"].map( lambda x: hp.timestr_to_seconds(x, inverse=True) ) g = g.drop(["shape_id", "distance", "duration"], axis=1) feed.stop_times = g return feed def get_start_and_end_times( feed: "Feed", date: Optional[str] = None ) -> List[str]: """ Return the first departure time and last arrival time (HH:MM:SS time strings) listed in ``feed.stop_times``, respectively. Restrict to the given date (YYYYMMDD string) if specified. """ st = feed.get_stop_times(date) return ( st["departure_time"].dropna().min(), st["arrival_time"].dropna().max(), ) PK!*Έssgtfstk/stops.py""" Functions about stops. """ from collections import Counter, OrderedDict from typing import Optional, List, Dict, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np import utm import shapely.geometry as sg from shapely.geometry import Polygon from . import constants as cs from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed #: Folium CircleMarker parameters for mapping stops STOP_STYLE = { "radius": 8, "fill": True, "color": cs.COLORS_SET2[1], "weight": 1, "fill_opacity": 0.75, } def compute_stop_stats_base( stop_times_subset: DataFrame, trip_subset: DataFrame, headway_start_time: str = "07:00:00", headway_end_time: str = "19:00:00", *, split_directions: bool = False, ) -> DataFrame: """ Given a subset of a stop times DataFrame and a subset of a trips DataFrame, return a DataFrame that provides summary stats about the stops in the inner join of the two DataFrames. Parameters ---------- stop_times_subset : DataFrame A valid GTFS stop times table trip_subset : DataFrame A valid GTFS trips table split_directions : boolean If ``True``, then separate the stop stats by direction (0 or 1) of the trips visiting the stops; otherwise aggregate trips visiting from both directions headway_start_time : string HH:MM:SS time string indicating the start time for computing headway stats headway_end_time : string HH:MM:SS time string indicating the end time for computing headway stats Returns ------- DataFrame The columns are - stop_id - direction_id: present if and only if ``split_directions`` - num_routes: number of routes visiting stop (in the given direction) - num_trips: number of trips visiting stop (in the givin direction) - max_headway: maximum of the durations (in minutes) between trip departures at the stop between ``headway_start_time`` and ``headway_end_time`` - min_headway: minimum of the durations (in minutes) mentioned above - mean_headway: mean of the durations (in minutes) mentioned above - start_time: earliest departure time of a trip from this stop - end_time: latest departure time of a trip from this stop Notes ----- - If ``trip_subset`` is empty, then return an empty DataFrame. - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present. """ if trip_subset.empty: return pd.DataFrame() f = pd.merge(stop_times_subset, trip_subset) # Convert departure times to seconds to ease headway calculations f["departure_time"] = f["departure_time"].map(hp.timestr_to_seconds) headway_start = hp.timestr_to_seconds(headway_start_time) headway_end = hp.timestr_to_seconds(headway_end_time) # Compute stats for each stop def compute_stop_stats(group): # Operate on the group of all stop times for an individual stop d = OrderedDict() d["num_routes"] = group["route_id"].unique().size d["num_trips"] = group.shape[0] d["start_time"] = group["departure_time"].min() d["end_time"] = group["departure_time"].max() headways = [] dtimes = sorted( [ dtime for dtime in group["departure_time"].values if headway_start <= dtime <= headway_end ] ) headways.extend( [dtimes[i + 1] - dtimes[i] for i in range(len(dtimes) - 1)] ) if headways: d["max_headway"] = np.max(headways) / 60 # minutes d["min_headway"] = np.min(headways) / 60 # minutes d["mean_headway"] = np.mean(headways) / 60 # minutes else: d["max_headway"] = np.nan d["min_headway"] = np.nan d["mean_headway"] = np.nan return pd.Series(d) if split_directions: if "direction_id" not in f.columns: f["direction_id"] = np.nan f = f.loc[lambda x: x.direction_id.notnull()].assign( direction_id=lambda x: x.direction_id.astype(int) ) if f.empty: raise ValueError( "At least one trip direction ID value " "must be non-NaN." ) g = f.groupby(["stop_id", "direction_id"]) else: g = f.groupby("stop_id") result = g.apply(compute_stop_stats).reset_index() # Convert start and end times to time strings result[["start_time", "end_time"]] = result[ ["start_time", "end_time"] ].applymap(lambda x: hp.timestr_to_seconds(x, inverse=True)) return result def compute_stop_time_series_base( stop_times_subset: DataFrame, trip_subset: DataFrame, freq: str = "5Min", date_label: str = "20010101", *, split_directions: bool = False, ) -> DataFrame: """ Given a subset of a stop times DataFrame and a subset of a trips DataFrame, return a DataFrame that provides a summary time series about the stops in the inner join of the two DataFrames. Parameters ---------- stop_times_subset : DataFrame A valid GTFS stop times table trip_subset : DataFrame A valid GTFS trips table split_directions : boolean If ``True``, then separate each stop's stats by trip direction; otherwise aggregate trips visiting from both directions freq : Pandas frequency string Specifices the frequency with which to resample the time series; max frequency is one minute ('Min') date_label : string YYYYMMDD date string used as the date in the time series index Returns ------- DataFrame A time series with a timestamp index for a 24-hour period sampled at the given frequency. The only indicator variable for each stop is - ``num_trips``: the number of trips that visit the stop and have a nonnull departure time from the stop The maximum allowable frequency is 1 minute. The columns are hierarchical (multi-indexed) with - top level: name = 'indicator', values = ['num_trips'] - middle level: name = 'stop_id', values = the active stop IDs - bottom level: name = 'direction_id', values = 0s and 1s If not ``split_directions``, then don't include the bottom level. Notes ----- - The time series is computed at a one-minute frequency, then resampled at the end to the given frequency - Stop times with null departure times are ignored, so the aggregate of ``num_trips`` across the day could be less than the ``num_trips`` column in :func:`compute_stop_stats_base` - All trip departure times are taken modulo 24 hours, so routes with trips that end past 23:59:59 will have all their stats wrap around to the early morning of the time series. - 'num_trips' should be resampled with ``how=np.sum`` - If ``trip_subset`` is empty, then return an empty DataFrame - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ if trip_subset.empty: return pd.DataFrame() f = pd.merge(stop_times_subset, trip_subset) if split_directions: if "direction_id" not in f.columns: f["direction_id"] = np.nan f = f.loc[lambda x: x.direction_id.notnull()].assign( direction_id=lambda x: x.direction_id.astype(int) ) if f.empty: raise ValueError( "At least one trip direction ID value " "must be non-NaN." ) # Alter stop IDs to encode trip direction: # -0 and -1 f["stop_id"] = f["stop_id"] + "-" + f["direction_id"].map(str) stops = f["stop_id"].unique() # Bin each stop departure time bins = [i for i in range(24 * 60)] # One bin for each minute num_bins = len(bins) def F(x): return (hp.timestr_to_seconds(x) // 60) % (24 * 60) f["departure_index"] = f["departure_time"].map(F) # Create one time series for each stop series_by_stop = {stop: [0 for i in range(num_bins)] for stop in stops} for stop, group in f.groupby("stop_id"): counts = Counter((bin, 0) for bin in bins) + Counter( group["departure_index"].values ) series_by_stop[stop] = [counts[bin] for bin in bins] # Combine lists into dictionary of form indicator -> time series. # Only one indicator in this case, but could add more # in the future as was done with route time series. rng = pd.date_range(date_label, periods=24 * 60, freq="Min") series_by_indicator = { "num_trips": pd.DataFrame(series_by_stop, index=rng).fillna(0) } # Combine all time series into one time series g = hp.combine_time_series( series_by_indicator, kind="stop", split_directions=split_directions ) return hp.downsample(g, freq=freq) def get_stops( feed: "Feed", date: Optional[str] = None, trip_id: Optional[str] = None, route_id: Optional[str] = None, *, in_stations: bool = False, ) -> DataFrame: """ Return a section of ``feed.stops``. Parameters ----------- feed : Feed date : string YYYYMMDD string; restricts the output to stops active (visited by trips) on the date trip_id : string ID of a trip in ``feed.trips``; restricts output to stops visited by the trip route_id : string ID of route in ``feed.routes``; restricts output to stops visited by the route in_stations : boolean If ``True``, then restricts output to stops in stations if station data is available in ``feed.stops`` Returns ------- DataFrame A subset of ``feed.stops`` defined by the parameters above Notes ----- Assume the following feed attributes are not ``None``: - ``feed.stops`` - Those used in :func:`.stop_times.get_stop_times` """ s = feed.stops.copy() if date is not None: A = feed.get_stop_times(date)["stop_id"] s = s[s["stop_id"].isin(A)].copy() if trip_id is not None: st = feed.stop_times.copy() B = st[st["trip_id"] == trip_id]["stop_id"] s = s[s["stop_id"].isin(B)].copy() elif route_id is not None: A = feed.trips[feed.trips["route_id"] == route_id]["trip_id"] st = feed.stop_times.copy() B = st[st["trip_id"].isin(A)]["stop_id"] s = s[s["stop_id"].isin(B)].copy() if in_stations and set(["location_type", "parent_station"]) <= set( s.columns ): s = s[(s["location_type"] != 1) & (s["parent_station"].notnull())] return s def build_geometry_by_stop( feed: "Feed", stop_ids: Optional[List[str]] = None, *, use_utm: bool = False, ) -> Dict: """ Return a dictionary with the structure stop_id -> Shapely Point with coordinates of the stop. Parameters ---------- feed : Feed use_utm : boolean If ``True``, then return each point in UTM coordinates appropriate to the region; otherwise use the default WGS84 coordinates stop_ids : list Stop IDs (strings) from ``feed.stops`` to restrict output to Returns ------- dictionary Each key is a stop ID and each value is a Shapely Point with coordinates of the stop Notes ----- Assume the following feed attributes are not ``None``: - ``feed.stops`` """ d = {} stops = feed.stops.copy() if stop_ids is not None: stops = stops[stops["stop_id"].isin(stop_ids)] if use_utm: for stop, group in stops.groupby("stop_id"): lat, lon = group[["stop_lat", "stop_lon"]].values[0] d[stop] = sg.Point(utm.from_latlon(lat, lon)[:2]) else: for stop, group in stops.groupby("stop_id"): lat, lon = group[["stop_lat", "stop_lon"]].values[0] d[stop] = sg.Point([lon, lat]) return d def compute_stop_activity(feed: "Feed", dates: List[str]) -> DataFrame: """ Mark stops as active or inactive on the given dates. A stop is *active* on a given date if some trips that starts on the date visits the stop (possibly after midnight). Parameters ---------- feed : Feed dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute activity Returns ------- DataFrame Columns are - stop_id - ``dates[0]``: 1 if the stop has at least one trip visiting it on ``dates[0]``; 0 otherwise - ``dates[1]``: 1 if the stop has at least one trip visiting it on ``dates[1]``; 0 otherwise - etc. - ``dates[-1]``: 1 if the stop has at least one trip visiting it on ``dates[-1]``; 0 otherwise Notes ----- - If all dates lie outside the Feed period, then return an empty DataFrame - Assume the following feed attributes are not ``None``: * ``feed.stop_times`` * Those used in :func:`.trips.compute_trip_activity` """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() trip_activity = feed.compute_trip_activity(dates) g = pd.merge(trip_activity, feed.stop_times).groupby("stop_id") # Pandas won't allow me to simply return g[dates].max().reset_index(). # I get ``TypeError: unorderable types: datetime.date() < str()``. # So here's a workaround. for (i, date) in enumerate(dates): if i == 0: f = g[date].max().reset_index() else: f = f.merge(g[date].max().reset_index()) return f def compute_stop_stats( feed: "Feed", dates: List[str], stop_ids: Optional[List[str]] = None, headway_start_time: str = "07:00:00", headway_end_time: str = "19:00:00", *, split_directions: bool = False, ) -> DataFrame: """ Compute stats for all stops for the given dates. Optionally, restrict to the stop IDs given. Parameters ---------- feed : Feed dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats stop_ids : list Optional list of stop IDs to restrict stats to headway_start_time : string HH:MM:SS time string indicating the start time for computing headway stats headway_end_time : string HH:MM:SS time string indicating the end time for computing headway stats split_directions : boolean If ``True``, then separate the stop stats by direction (0 or 1) of the trips visiting the stops; otherwise aggregate trips visiting from both directions Returns ------- DataFrame Columns are - ``'date'`` - ``'stop_id'`` - ``'direction_id'``: present if and only if ``split_directions`` - ``'num_routes'``: number of routes visiting the stop (in the given direction) on the date - ``'num_trips'``: number of trips visiting stop (in the givin direction) on the date - ``'max_headway'``: maximum of the durations (in minutes) between trip departures at the stop between ``headway_start_time`` and ``headway_end_time`` on the date - ``'min_headway'``: minimum of the durations (in minutes) mentioned above - ``'mean_headway'``: mean of the durations (in minutes) mentioned above - ``'start_time'``: earliest departure time of a trip from this stop on the date - ``'end_time'``: latest departure time of a trip from this stop on the date Dates with no trip activity will have null stats. Exclude dates that lie outside of the Feed's date range. If all the dates given lie outside of the Feed's date range, then return an empty DataFrame. Notes ----- - Assume the following feed attributes are not ``None``: * ``feed.stop_times`` * Those used in :func:`.trips.get_trips` - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() # Restrict stop times to stop IDs if specified if stop_ids is not None: stop_times_subset = feed.stop_times.loc[ lambda x: x["stop_id"].isin(stop_ids) ].copy() else: stop_times_subset = feed.stop_times.copy() activity = feed.compute_trip_activity(dates) # Collect stats for each date, memoizing stats by trip ID sequence # to avoid unnecessary recomputations. # Store in dictionary of the form # trip ID sequence -> # [stats DataFarme, date list that stats apply] stats_and_dates_by_ids = {} cols = [ "stop_id", "num_routes", "num_trips", "max_headway", "min_headway", "mean_headway", "start_time", "end_time", ] if split_directions: cols.append("direction_id") null_stats = pd.DataFrame({c: np.nan for c in cols}, index=[0]) for date in dates: ids = tuple(activity.loc[activity[date] > 0, "trip_id"]) if ids in stats_and_dates_by_ids: # Append date to date list stats_and_dates_by_ids[ids][1].append(date) elif not ids: # Null stats stats_and_dates_by_ids[ids] = [null_stats, [date]] else: # Compute stats t = feed.trips trips = t[t["trip_id"].isin(ids)].copy() stats = compute_stop_stats_base( stop_times_subset, trips, split_directions=split_directions, headway_start_time=headway_start_time, headway_end_time=headway_end_time, ) # Remember stats stats_and_dates_by_ids[ids] = [stats, [date]] # Assemble stats into DataFrame frames = [] for stats, dates_ in stats_and_dates_by_ids.values(): for date in dates_: f = stats.copy() f["date"] = date frames.append(f) f = ( pd.concat(frames) .sort_values(["date", "stop_id"]) .reset_index(drop=True) ) return f def build_null_stop_time_series( feed: "Feed", date_label: str = "20010101", freq: str = "5Min", *, split_directions: bool = False, ) -> DataFrame: """ Return a stop time series with the same index and hierarchical columns as output by :func:`compute_stop_time_series_base`, but fill it full of null values. """ start = date_label end = pd.to_datetime(date_label + " 23:59:00") rng = pd.date_range(start, end, freq=freq) inds = ["num_trips"] sids = feed.stops.stop_id if split_directions: product = [inds, sids, [0, 1]] names = ["indicator", "stop_id", "direction_id"] else: product = [inds, sids] names = ["indicator", "stop_id"] cols = pd.MultiIndex.from_product(product, names=names) return pd.DataFrame([], index=rng, columns=cols).sort_index( axis=1, sort_remaining=True ) def compute_stop_time_series( feed: "Feed", dates: List[str], stop_ids: Optional[List[str]] = None, freq: str = "5Min", *, split_directions: bool = False, ) -> DataFrame: """ Compute time series for the stops on the given dates at the given frequency and return the result as a DataFrame of the same form as output by :func:`.stop_times.compute_stop_time_series_base`. Optionally restrict to stops in the given list of stop IDs. Parameters ---------- feed : Feed dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute stats stop_ids : list Optional list of stop IDs to restrict to split_directions : boolean If ``True``, then separate the stop stats by direction (0 or 1) of the trips visiting the stops; otherwise aggregate trips visiting from both directions freq : Pandas frequency string Specifices the frequency with which to resample the time series; max frequency is one minute ('Min') Returns ------- DataFrame A time series with a timestamp index across the given dates sampled at the given frequency. The maximum allowable frequency is 1 minute. The columns are the same as in :func:`compute_stop_time_series_base`. Exclude dates that lie outside of the Feed's date range. If all dates lie outside the Feed's date range, then return an empty DataFrame Notes ----- - See the notes for :func:`compute_stop_time_series_base` - Assume the following feed attributes are not ``None``: * ``feed.stop_times`` * Those used in :func:`.trips.get_trips` - Raise a ValueError if ``split_directions`` and no non-NaN direction ID values present """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() activity = feed.compute_trip_activity(dates) # Restrict stop times to stop IDs if specified if stop_ids is not None: stop_times_subset = feed.stop_times.loc[ lambda x: x["stop_id"].isin(stop_ids) ].copy() else: stop_times_subset = feed.stop_times.copy() # Collect stats for each date, memoizing stats by trip ID sequence # to avoid unnecessary recomputations. # Store in dictionary of the form # trip ID sequence -> # [stats DataFarme, date list that stats apply] stats_and_dates_by_ids = {} null_stats = build_null_stop_time_series( feed, split_directions=split_directions, freq=freq ) for date in dates: ids = tuple(activity.loc[activity[date] > 0, "trip_id"]) if ids in stats_and_dates_by_ids: # Append date to date list stats_and_dates_by_ids[ids][1].append(date) elif not ids: # Null stats stats_and_dates_by_ids[ids] = [null_stats, [date]] else: # Compute stats t = feed.trips trips = t[t["trip_id"].isin(ids)].copy() stats = compute_stop_time_series_base( stop_times_subset, trips, split_directions=split_directions, freq=freq, date_label=date, ) # Remember stats stats_and_dates_by_ids[ids] = [stats, [date]] # Assemble stats into DataFrame frames = [] for stats, dates_ in stats_and_dates_by_ids.values(): for date in dates_: f = stats.copy() # Replace date d = hp.datestr_to_date(date) f.index = f.index.map( lambda t: t.replace(year=d.year, month=d.month, day=d.day) ) frames.append(f) f = pd.concat(frames).sort_index().sort_index(axis=1, sort_remaining=True) if len(dates) > 1: # Insert missing dates and NaNs to complete series index end_datetime = pd.to_datetime(dates[-1] + " 23:59:59") new_index = pd.date_range(f.index[0], end_datetime, freq=freq) f = f.reindex(new_index) else: # Set frequency f.index.freq = pd.tseries.frequencies.to_offset(freq) return f def build_stop_timetable( feed: "Feed", stop_id: str, dates: List[str] ) -> DataFrame: """ Return a DataFrame containing the timetable for the given stop ID and dates. Parameters ---------- feed : Feed stop_id : string ID of the stop for which to build the timetable dates : string or list A YYYYMMDD date string or list thereof Returns ------- DataFrame The columns are all those in ``feed.trips`` plus those in ``feed.stop_times`` plus ``'date'``, and the stop IDs are restricted to the given stop ID. The result is sorted by date then departure time. Notes ----- Assume the following feed attributes are not ``None``: - ``feed.trips`` - Those used in :func:`.stop_times.get_stop_times` """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() t = pd.merge(feed.trips, feed.stop_times) t = t[t["stop_id"] == stop_id].copy() a = feed.compute_trip_activity(dates) frames = [] for date in dates: # Slice to stops active on date ids = a.loc[a[date] == 1, "trip_id"] f = t[t["trip_id"].isin(ids)].copy() f["date"] = date frames.append(f) f = pd.concat(frames) return f.sort_values(["date", "departure_time"]) def get_stops_in_polygon( feed: "Feed", polygon: Polygon, geo_stops=None ) -> DataFrame: """ Return the slice of ``feed.stops`` that contains all stops that lie within the given Shapely Polygon object that is specified in WGS84 coordinates. Parameters ---------- feed : Feed polygon : Shapely Polygon Specified in WGS84 coordinates geo_stops : Geopandas GeoDataFrame A geographic version of ``feed.stops`` which will be computed if not given. Specify this parameter in batch jobs to avoid unnecessary computation. Returns ------- DataFrame Subset of ``feed.stops`` Notes ----- - Requires GeoPandas - Assume the following feed attributes are not ``None``: * ``feed.stops``, if ``geo_stops`` is not given """ if geo_stops is not None: f = geo_stops.copy() else: f = geometrize_stops(feed.stops) cols = f.columns f["hit"] = f["geometry"].within(polygon) f = f[f["hit"]][cols] return ungeometrize_stops(f) def geometrize_stops(stops: List[str], *, use_utm: bool = False) -> DataFrame: """ Given a stops DataFrame, convert it to a GeoPandas GeoDataFrame and return the result. Parameters ---------- stops : DataFrame A GTFS stops table use_utm : boolean If ``True``, then convert the output to local UTM coordinates; otherwise use WGS84 coordinates Returns ------- GeoPandas GeoDataFrame Looks like the given stops DataFrame, but has a ``'geometry'`` column of Shapely Point objects that replaces the ``'stop_lon'`` and ``'stop_lat'`` columns. Notes ----- Requires GeoPandas. """ import geopandas as gpd g = ( stops.assign( geometry=lambda x: [ sg.Point(p) for p in x[["stop_lon", "stop_lat"]].values ] ) .drop(["stop_lon", "stop_lat"], axis=1) .pipe(lambda x: gpd.GeoDataFrame(x, crs=cs.WGS84)) ) if use_utm: lat, lon = stops.loc[0, ["stop_lat", "stop_lon"]].values crs = hp.get_utm_crs(lat, lon) g = g.to_crs(crs) return g def ungeometrize_stops(geo_stops: DataFrame) -> DataFrame: """ The inverse of :func:`geometrize_stops`. Parameters ---------- geo_stops : GeoPandas GeoDataFrame Looks like a GTFS stops table, but has a ``'geometry'`` column of Shapely Point objects that replaces the ``'stop_lon'`` and ``'stop_lat'`` columns. Returns ------- DataFrame A GTFS stops table where the ``'stop_lon'`` and ``'stop_lat'`` columns are derived from the points in the given GeoDataFrame and are in WGS84 coordinates regardless of the coordinate reference system of the given GeoDataFrame. """ f = geo_stops.copy().to_crs(cs.WGS84) f["stop_lon"], f["stop_lat"] = zip( *f["geometry"].map(lambda p: [p.x, p.y]) ) del f["geometry"] return f def map_stops( feed: "Feed", stop_ids: List[str], stop_style: Dict = STOP_STYLE ): """ Return a Folium map showing the given stops. Parameters ---------- feed : Feed stop_ids : list IDs of trips in ``feed.stops`` stop_style: dictionary Folium CircleMarker parameters to use for styling stops. Returns ------- dictionary A Folium Map depicting the stops as CircleMarkers. Notes ------ - Requires Folium """ import folium as fl # Initialize map my_map = fl.Map(tiles="cartodbpositron") # Create a feature group for the stops and add it to the map group = fl.FeatureGroup(name="Stops") # Add stops to feature group stops = feed.stops.loc[lambda x: x.stop_id.isin(stop_ids)].fillna("n/a") for prop in stops.to_dict(orient="records"): # Add stop lon = prop["stop_lon"] lat = prop["stop_lat"] fl.CircleMarker( location=[lat, lon], popup=fl.Popup(hp.make_html(prop)), **stop_style, ).add_to(group) group.add_to(my_map) # Add layer control fl.LayerControl().add_to(my_map) # Fit map to stop bounds bounds = [ (stops.stop_lat.min(), stops.stop_lon.min()), (stops.stop_lat.max(), stops.stop_lon.max()), ] my_map.fit_bounds(bounds, padding=[1, 1]) return my_map PK!tt |T|Tgtfstk/trips.py""" Functions about trips. """ from collections import OrderedDict import json from typing import Optional, List, Dict, TYPE_CHECKING import pandas as pd from pandas import DataFrame import numpy as np import shapely.geometry as sg import shapely.ops as so from . import constants as cs from . import helpers as hp # Help mypy but avoid circular imports if TYPE_CHECKING: from .feed import Feed def is_active_trip(feed: "Feed", trip_id: str, date: str) -> bool: """ Return ``True`` if the ``feed.calendar`` or ``feed.calendar_dates`` says that the trip runs on the given date; return ``False`` otherwise. Note that a trip that starts on date d, ends after 23:59:59, and does not start again on date d+1 is considered active on date d and not active on date d+1. This subtle point, which is a side effect of the GTFS, can lead to confusion. Parameters ---------- feed : Feed trip_id : string ID of a trip in ``feed.trips`` date : string YYYYMMDD date string Returns ------- boolean ``True`` if and only if the given trip starts on the given date. Notes ----- - This function is key for getting all trips, routes, etc. that are active on a given date, so the function needs to be fast - Assume the following feed attributes are not ``None``: * ``feed.trips`` """ service = feed._trips_i.at[trip_id, "service_id"] # Check feed._calendar_dates_g. caldg = feed._calendar_dates_g if caldg is not None: if (service, date) in caldg.groups: et = caldg.get_group((service, date))["exception_type"].iat[0] if et == 1: return True else: # Exception type is 2 return False # Check feed._calendar_i cali = feed._calendar_i if cali is not None: if service in cali.index: weekday_str = hp.weekday_to_str(hp.datestr_to_date(date).weekday()) if ( cali.at[service, "start_date"] <= date <= cali.at[service, "end_date"] and cali.at[service, weekday_str] == 1 ): return True else: return False # If you made it here, then something went wrong return False def get_trips( feed: "Feed", date: Optional[str] = None, time: Optional[str] = None ) -> DataFrame: """ Return a subset of ``feed.trips``. Parameters ---------- feed : Feed date : string YYYYMMDD date string time : string HH:MM:SS time string, possibly with HH > 23 Returns ------- DataFrame The subset of ``feed.trips`` containing trips active (starting) on the given date at the given time. If no date or time are specified, then return the entire ``feed.trips``. """ if feed.trips is None or date is None: return feed.trips f = feed.trips.copy() f["is_active"] = f["trip_id"].map( lambda trip_id: feed.is_active_trip(trip_id, date) ) f = f[f["is_active"]].copy() del f["is_active"] if time is not None: # Get trips active during given time g = pd.merge(f, feed.stop_times[["trip_id", "departure_time"]]) def F(group): d = {} start = group["departure_time"].dropna().min() end = group["departure_time"].dropna().max() try: result = start <= time <= end except TypeError: result = False d["is_active"] = result return pd.Series(d) h = g.groupby("trip_id").apply(F).reset_index() f = pd.merge(f, h[h["is_active"]]) del f["is_active"] return f def compute_trip_activity(feed: "Feed", dates: List[str]) -> DataFrame: """ Mark trip as active or inactive on the given dates as computed by :func:`is_active_trip`. Parameters ---------- feed : Feed dates : string or list A YYYYMMDD date string or list thereof indicating the date(s) for which to compute activity Returns ------- DataFrame Columns are - ``'trip_id'`` - ``dates[0]``: 1 if the trip is active on ``dates[0]``; 0 otherwise - ``dates[1]``: 1 if the trip is active on ``dates[1]``; 0 otherwise - etc. - ``dates[-1]``: 1 if the trip is active on ``dates[-1]``; 0 otherwise If ``dates`` is ``None`` or the empty list, then return an empty DataFrame. Notes ----- Assume the following feed attributes are not ``None``: - ``feed.trips`` - Those used in :func:`is_active_trip` """ dates = feed.restrict_dates(dates) if not dates: return pd.DataFrame() f = feed.trips.copy() for date in dates: f[date] = f["trip_id"].map( lambda trip_id: int(feed.is_active_trip(trip_id, date)) ) return f[["trip_id"] + list(dates)] def compute_busiest_date(feed: "Feed", dates: List[str]) -> str: """ Given a list of dates, return the first date that has the maximum number of active trips. Notes ----- Assume the following feed attributes are not ``None``: - Those used in :func:`compute_trip_activity` """ f = feed.compute_trip_activity(dates) s = [(f[c].sum(), c) for c in f.columns if c != "trip_id"] return max(s)[1] def compute_trip_stats( feed: "Feed", route_ids: Optional[List[str]] = None, *, compute_dist_from_shapes: bool = False, ) -> DataFrame: """ Return a DataFrame with the following columns: - ``'trip_id'`` - ``'route_id'`` - ``'route_short_name'`` - ``'route_type'`` - ``'direction_id'``: NaN if missing from feed - ``'shape_id'``: NaN if missing from feed - ``'num_stops'``: number of stops on trip - ``'start_time'``: first departure time of the trip - ``'end_time'``: last departure time of the trip - ``'start_stop_id'``: stop ID of the first stop of the trip - ``'end_stop_id'``: stop ID of the last stop of the trip - ``'is_loop'``: 1 if the start and end stop are less than 400m apart and 0 otherwise - ``'distance'``: distance of the trip in ``feed.dist_units``; contains all ``np.nan`` entries if ``feed.shapes is None`` - ``'duration'``: duration of the trip in hours - ``'speed'``: distance/duration If ``feed.stop_times`` has a ``shape_dist_traveled`` column with at least one non-NaN value and ``compute_dist_from_shapes == False``, then use that column to compute the distance column. Else if ``feed.shapes is not None``, then compute the distance column using the shapes and Shapely. Otherwise, set the distances to NaN. If route IDs are given, then restrict to trips on those routes. Notes ----- - Assume the following feed attributes are not ``None``: * ``feed.trips`` * ``feed.routes`` * ``feed.stop_times`` * ``feed.shapes`` (optionally) * Those used in :func:`.stops.build_geometry_by_stop` - Calculating trip distances with ``compute_dist_from_shapes=True`` seems pretty accurate. For example, calculating trip distances on `this Portland feed `_ using ``compute_dist_from_shapes=False`` and ``compute_dist_from_shapes=True``, yields a difference of at most 0.83km from the original values. """ f = feed.trips.copy() # Restrict to given route IDs if route_ids is not None: f = f[f["route_id"].isin(route_ids)].copy() # Merge with stop times and extra trip info. # Convert departure times to seconds past midnight to # compute trip durations later. if "direction_id" not in f.columns: f["direction_id"] = np.nan if "shape_id" not in f.columns: f["shape_id"] = np.nan f = ( f[["route_id", "trip_id", "direction_id", "shape_id"]] .merge(feed.routes[["route_id", "route_short_name", "route_type"]]) .merge(feed.stop_times) .sort_values(["trip_id", "stop_sequence"]) .assign( departure_time=lambda x: x["departure_time"].map( hp.timestr_to_seconds ) ) ) # Compute all trips stats except distance, # which is possibly more involved geometry_by_stop = feed.build_geometry_by_stop(use_utm=True) g = f.groupby("trip_id") def my_agg(group): d = OrderedDict() d["route_id"] = group["route_id"].iat[0] d["route_short_name"] = group["route_short_name"].iat[0] d["route_type"] = group["route_type"].iat[0] d["direction_id"] = group["direction_id"].iat[0] d["shape_id"] = group["shape_id"].iat[0] d["num_stops"] = group.shape[0] d["start_time"] = group["departure_time"].iat[0] d["end_time"] = group["departure_time"].iat[-1] d["start_stop_id"] = group["stop_id"].iat[0] d["end_stop_id"] = group["stop_id"].iat[-1] dist = geometry_by_stop[d["start_stop_id"]].distance( geometry_by_stop[d["end_stop_id"]] ) d["is_loop"] = int(dist < 400) d["duration"] = (d["end_time"] - d["start_time"]) / 3600 return pd.Series(d) # Apply my_agg, but don't reset index yet. # Need trip ID as index to line up the results of the # forthcoming distance calculation h = g.apply(my_agg) # Compute distance if ( hp.is_not_null(f, "shape_dist_traveled") and not compute_dist_from_shapes ): # Compute distances using shape_dist_traveled column h["distance"] = g.apply( lambda group: group["shape_dist_traveled"].max() ) elif feed.shapes is not None: # Compute distances using the shapes and Shapely geometry_by_shape = feed.build_geometry_by_shape(use_utm=True) geometry_by_stop = feed.build_geometry_by_stop(use_utm=True) m_to_dist = hp.get_convert_dist("m", feed.dist_units) def compute_dist(group): """ Return the distance traveled along the trip between the first and last stops. If that distance is negative or if the trip's linestring intersects itfeed, then return the length of the trip's linestring instead. """ shape = group["shape_id"].iat[0] try: # Get the linestring for this trip linestring = geometry_by_shape[shape] except KeyError: # Shape ID is NaN or doesn't exist in shapes. # No can do. return np.nan # If the linestring intersects itfeed, then that can cause # errors in the computation below, so just # return the length of the linestring as a good approximation D = linestring.length if not linestring.is_simple: return D # Otherwise, return the difference of the distances along # the linestring of the first and last stop start_stop = group["stop_id"].iat[0] end_stop = group["stop_id"].iat[-1] try: start_point = geometry_by_stop[start_stop] end_point = geometry_by_stop[end_stop] except KeyError: # One of the two stop IDs is NaN, so just # return the length of the linestring return D d1 = linestring.project(start_point) d2 = linestring.project(end_point) d = d2 - d1 if 0 < d < D + 100: return d else: # Something is probably wrong, so just # return the length of the linestring return D h["distance"] = g.apply(compute_dist) # Convert from meters h["distance"] = h["distance"].map(m_to_dist) else: h["distance"] = np.nan # Reset index and compute final stats h = h.reset_index() h["speed"] = h["distance"] / h["duration"] h[["start_time", "end_time"]] = h[["start_time", "end_time"]].applymap( lambda x: hp.timestr_to_seconds(x, inverse=True) ) return h.sort_values(["route_id", "direction_id", "start_time"]) def locate_trips(feed: "Feed", date: str, times: List[str]) -> DataFrame: """ Return the positions of all trips active on the given date and times Parameters ---------- feed : Feed date : string YYYYMMDD date string times : list HH:MM:SS time strings, possibly with HH > 23 Returns ------- DataFrame Columns are: - ``'trip_id'`` - ``'route_id'`` - ``'direction_id'``: all NaNs if ``feed.trips.direction_id`` is missing - ``'time'`` - ``'rel_dist'``: number between 0 (start) and 1 (end) indicating the relative distance of the trip along its path - ``'lon'``: longitude of trip at given time - ``'lat'``: latitude of trip at given time Assume ``feed.stop_times`` has an accurate ``shape_dist_traveled`` column. Notes ----- Assume the following feed attributes are not ``None``: - ``feed.trips`` - Those used in :func:`.stop_times.get_stop_times` - Those used in :func:`.shapes.build_geometry_by_shape` """ if not hp.is_not_null(feed.stop_times, "shape_dist_traveled"): raise ValueError( "feed.stop_times needs to have a non-null shape_dist_traveled " "column. You can create it, possibly with some inaccuracies, " "via feed2 = feed.append_dist_to_stop_times()." ) if "shape_id" not in feed.trips.columns: raise ValueError("feed.trips.shape_id must exist.") # Start with stop times active on date f = feed.get_stop_times(date) f["departure_time"] = f["departure_time"].map(hp.timestr_to_seconds) # Compute relative distance of each trip along its path # at the given time times. # Use linear interpolation based on stop departure times and # shape distance traveled. geometry_by_shape = feed.build_geometry_by_shape(use_utm=False) sample_times = np.array([hp.timestr_to_seconds(s) for s in times]) def compute_rel_dist(group): dists = sorted(group["shape_dist_traveled"].values) times = sorted(group["departure_time"].values) ts = sample_times[ (sample_times >= times[0]) & (sample_times <= times[-1]) ] ds = np.interp(ts, times, dists) return pd.DataFrame({"time": ts, "rel_dist": ds / dists[-1]}) # return f.groupby('trip_id', group_keys=False).\ # apply(compute_rel_dist).reset_index() g = f.groupby("trip_id").apply(compute_rel_dist).reset_index() # Delete extraneous multi-index column del g["level_1"] # Convert times back to time strings g["time"] = g["time"].map(lambda x: hp.timestr_to_seconds(x, inverse=True)) # Merge in more trip info and # compute longitude and latitude of trip from relative distance t = feed.trips.copy() if "direction_id" not in t.columns: t["direction_id"] = np.nan h = pd.merge(g, t[["trip_id", "route_id", "direction_id", "shape_id"]]) if not h.shape[0]: # Return a DataFrame with the promised headers but no data. # Without this check, result below could be an empty DataFrame. h["lon"] = pd.Series() h["lat"] = pd.Series() return h def get_lonlat(group): shape = group["shape_id"].iat[0] linestring = geometry_by_shape[shape] lonlats = [ linestring.interpolate(d, normalized=True).coords[0] for d in group["rel_dist"].values ] group["lon"], group["lat"] = zip(*lonlats) return group return h.groupby("shape_id").apply(get_lonlat) def trip_to_geojson( feed: "Feed", trip_id: str, *, include_stops: bool = False ) -> Dict: """ Return a GeoJSON representation of the given trip, optionally with its stops. Parameters ---------- feed : Feed trip_id : string ID of trip in ``feed.trips`` include_stops : boolean Returns ------- dictionary A (decoded) GeoJSON FeatureCollection comprising a Linestring feature representing the trip's shape. If ``include_stops``, then also include one Point feature for each stop visited by the trip. The Linestring feature will contain as properties all the columns in ``feed.trips`` pertaining to the given trip, and each Point feature will contain as properties all the columns in ``feed.stops`` pertaining to the stop, except the ``stop_lat`` and ``stop_lon`` properties. Return the empty dictionary if the trip has no shape. """ # Get the relevant shapes t = feed.trips.copy() t = t[t["trip_id"] == trip_id].copy() shid = t["shape_id"].iat[0] geometry_by_shape = feed.build_geometry_by_shape( use_utm=False, shape_ids=[shid] ) if not geometry_by_shape: return {} features = [ { "type": "Feature", "properties": json.loads(t.to_json(orient="records"))[0], "geometry": sg.mapping(sg.LineString(geometry_by_shape[shid])), } ] if include_stops: # Get relevant stops and geometrys s = feed.get_stops(trip_id=trip_id) cols = set(s.columns) - set(["stop_lon", "stop_lat"]) s = s[list(cols)].copy() stop_ids = s["stop_id"].tolist() geometry_by_stop = feed.build_geometry_by_stop(stop_ids=stop_ids) features.extend( [ { "type": "Feature", "properties": json.loads( s[s["stop_id"] == stop_id].to_json(orient="records") )[0], "geometry": sg.mapping(geometry_by_stop[stop_id]), } for stop_id in stop_ids ] ) return {"type": "FeatureCollection", "features": features} def map_trips( feed: "Feed", trip_ids: List[str], color_palette: List[str] = cs.COLORS_SET2, *, include_stops: bool = True, ): """ Return a Folium map showing the given trips and (optionally) their stops. Parameters ---------- feed : Feed trip_ids : list IDs of trips in ``feed.trips`` color_palette : list Palette to use to color the routes. If more routes than colors, then colors will be recycled. include_stops : boolean If ``True``, then include stops in the map Returns ------- dictionary A Folium Map depicting the shapes of the trips. If ``include_stops``, then include the stops for each trip. Notes ------ - Requires Folium """ import folium as fl import folium.plugins as fp # Get routes slice and convert to dictionary trips = ( feed.trips.loc[lambda x: x["trip_id"].isin(trip_ids)] .fillna("n/a") .to_dict(orient="records") ) # Create colors n = len(trips) colors = [color_palette[i % len(color_palette)] for i in range(n)] # Initialize map my_map = fl.Map(tiles="cartodbpositron") # Collect route bounding boxes to set map zoom later bboxes = [] # Create a feature group for each route and add it to the map for i, trip in enumerate(trips): collection = feed.trip_to_geojson( trip_id=trip["trip_id"], include_stops=include_stops ) group = fl.FeatureGroup(name="Trip " + trip["trip_id"]) color = colors[i] for f in collection["features"]: prop = f["properties"] # Add stop if f["geometry"]["type"] == "Point": lon, lat = f["geometry"]["coordinates"] fl.CircleMarker( location=[lat, lon], radius=8, fill=True, color=color, weight=1, popup=fl.Popup(hp.make_html(prop)), ).add_to(group) # Add path else: # Path prop["color"] = color path = fl.GeoJson( f, name=trip, style_function=lambda x: { "color": x["properties"]["color"] }, ) path.add_child(fl.Popup(hp.make_html(prop))) path.add_to(group) # Direction arrows, assuming, as GTFS does, that # trip direction equals LineString direction fp.PolyLineTextPath( path, " \u27A4 ", repeat=True, offset=5.5, attributes={"fill": color, "font-size": "18"}, ).add_to(group) bboxes.append(sg.box(*sg.shape(f["geometry"]).bounds)) group.add_to(my_map) fl.LayerControl().add_to(my_map) # Fit map to bounds bounds = so.unary_union(bboxes).bounds bounds2 = [bounds[1::-1], bounds[3:1:-1]] # Folium expects this ordering my_map.fit_bounds(bounds2) return my_map PK!D|}gtfstk/validators.py""" Functions about validation. """ import re import pytz import datetime as dt from typing import Optional, List, Union, TYPE_CHECKING import pycountry import numpy as np import pandas as pd from pandas import DataFrame from . import constants as cs from . import helpers as hp if TYPE_CHECKING: from .feed import Feed TIME_PATTERN1 = re.compile(r"^[0,1,2,3]\d:\d\d:\d\d$") TIME_PATTERN2 = re.compile(r"^\d:\d\d:\d\d$") DATE_FORMAT = "%Y%m%d" TIMEZONES = set(pytz.all_timezones) # ISO639-1 language codes, both lower and upper case LANGS = set( [lang.alpha_2 for lang in pycountry.languages if hasattr(lang, "alpha_2")] ) LANGS |= set(x.upper() for x in LANGS) CURRENCIES = set( [c.alpha_3 for c in pycountry.currencies if hasattr(c, "alpha_3")] ) URL_PATTERN = re.compile( r"^(?:http)s?://" # http:// or https:// r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" # domain... r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip r"(?::\d+)?" # optional port r"(?:/?|[/?]\S+)$", re.IGNORECASE, ) EMAIL_PATTERN = re.compile(r"[^@]+@[^@]+\.[^@]+") COLOR_PATTERN = re.compile(r"(?:[0-9a-fA-F]{2}){3}$") def valid_str(x: str) -> bool: """ Return ``True`` if ``x`` is a non-blank string; otherwise return ``False``. """ if isinstance(x, str) and x.strip(): return True else: return False def valid_time(x: str) -> bool: """ Return ``True`` if ``x`` is a valid H:MM:SS or HH:MM:SS time; otherwise return ``False``. """ if isinstance(x, str) and ( re.match(TIME_PATTERN1, x) or re.match(TIME_PATTERN2, x) ): return True else: return False def valid_date(x: str) -> bool: """ Retrun ``True`` if ``x`` is a valid YYYYMMDD date; otherwise return ``False``. """ try: if x != dt.datetime.strptime(x, DATE_FORMAT).strftime(DATE_FORMAT): raise ValueError return True except ValueError: return False def valid_timezone(x: str) -> bool: """ Retrun ``True`` if ``x`` is a valid human-readable timezone string, e.g. 'Africa/Abidjan'; otherwise return ``False``. """ return x in TIMEZONES def valid_lang(x: str) -> bool: """ Return ``True`` if ``x`` is a valid two-letter ISO 639 language code, e.g. 'aa'; otherwise return ``False``. """ return x in LANGS def valid_currency(x: str) -> bool: """ Return ``True`` if ``x`` is a valid three-letter ISO 4217 currency code, e.g. 'AED'; otherwise return ``False``. """ return x in CURRENCIES def valid_url(x: str) -> bool: """ Return ``True`` if ``x`` is a valid URL; otherwise return ``False``. """ if isinstance(x, str) and re.match(URL_PATTERN, x): return True else: return False def valid_email(x: str) -> bool: """ Return ``True`` if ``x`` is a valid email address; otherwise return ``False``. """ if isinstance(x, str) and re.match(EMAIL_PATTERN, x): return True else: return False def valid_color(x: str) -> bool: """ Return ``True`` if ``x`` a valid hexadecimal color string without the leading hash; otherwise return ``False``. """ if isinstance(x, str) and re.match(COLOR_PATTERN, x): return True else: return False def check_for_required_columns( problems: List, table: str, df: DataFrame ) -> List: """ Check that the given GTFS table has the required columns. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` Returns ------- list The ``problems`` list extended as follows. Check that the DataFrame contains the colums required by GTFS and append to the problems list one error for each column missing. """ r = cs.GTFS_REF req_columns = r.loc[ (r["table"] == table) & r["column_required"], "column" ].values for col in req_columns: if col not in df.columns: problems.append(["error", f"Missing column {col}", table, []]) return problems def check_for_invalid_columns( problems: List, table: str, df: DataFrame ) -> List: """ Check for invalid columns in the given GTFS DataFrame. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` Returns ------- list The ``problems`` list extended as follows. Check whether the DataFrame contains extra columns not in the GTFS and append to the problems list one warning for each extra column. """ r = cs.GTFS_REF valid_columns = r.loc[r["table"] == table, "column"].values for col in df.columns: if col not in valid_columns: problems.append( ["warning", f"Unrecognized column {col}", table, []] ) return problems def check_table( problems: List, table: str, df: DataFrame, condition, message: str, type_: str = "error", ) -> List: """ Check the given GTFS table for the given problem condition. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` condition : boolean expression One involving ``df``, e.g.`df['route_id'].map(is_valid_str)`` message : string Problem message, e.g. ``'Invalid route_id'`` type_ : string ``'error'`` or ``'warning'`` indicating the type of problem encountered Returns ------- list The ``problems`` list extended as follows. Record the indices of ``df`` that statisfy the condition. If the list of indices is nonempty, append to the problems the item ``[type_, message, table, indices]``; otherwise do not append anything. """ indices = df.loc[condition].index.tolist() if indices: problems.append([type_, message, table, indices]) return problems def check_column( problems: List, table: str, df: DataFrame, column: str, checker, type_: str = "error", *, column_required: bool = True, ) -> List: """ Check the given column of the given GTFS with the given problem checker. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` column : string A column of ``df`` column_required : boolean ``True`` if and only if ``column`` is required (and not optional) by the GTFS checker : boolean valued unary function Returns ``True`` if and only if no problem is encountered type_ : string ``'error'`` or ``'warning'`` indicating the type of problem encountered Returns ------- list The ``problems`` list extended as follows. Apply the checker to the column entries and record the indices of ``df`` where the checker returns ``False``. If the list of indices of is nonempty, append to the problems the item ``[type_, problem, table, indices]``; otherwise do not append anything. If not ``column_required``, then NaN entries will be ignored before applying the checker. """ f = df.copy() if not column_required: if column not in f.columns: f[column] = np.nan f = f.dropna(subset=[column]) cond = ~f[column].map(checker) problems = check_table( problems, table, f, cond, f"Invalid {column}; maybe has extra space characters", type_, ) return problems def check_column_id( problems: List, table: str, df: DataFrame, column: str, *, column_required: bool = True, ) -> List: """ A specialization of :func:`check_column`. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` column : string A column of ``df`` column_required : boolean ``True`` if and only if ``column`` is required (and not optional) by the GTFS Returns ------- list The ``problems`` list extended as follows. Record the indices of ``df`` where the given column has duplicated entry or an invalid strings. If the list of indices is nonempty, append to the problems the item ``[type_, problem, table, indices]``; otherwise do not append anything. If not ``column_required``, then NaN entries will be ignored in the checking. """ f = df.copy() if not column_required: if column not in f.columns: f[column] = np.nan f = f.dropna(subset=[column]) cond = ~f[column].map(valid_str) problems = check_table( problems, table, f, cond, f"Invalid {column}; maybe has extra space characters", ) cond = f[column].duplicated() problems = check_table(problems, table, f, cond, f"Repeated {column}") return problems def check_column_linked_id( problems: List, table: str, df: DataFrame, column: str, target_df: DataFrame, target_column: Optional[str] = None, *, column_required: bool = True, ) -> List: """ A modified version of :func:`check_column_id`. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs table : string Name of a GTFS table df : DataFrame The GTFS table corresponding to ``table`` column : string A column of ``df`` column_required : boolean ``True`` if and only if ``column`` is required (and not optional) by the GTFS target_df : DataFrame A GTFS table target_column : string A column of ``target_df``; defaults to ``column_name`` Returns ------- list The ``problems`` list extended as follows. Record indices of ``df`` where the following condition is violated: ``column`` contain IDs that are valid strings and are present in ``target_df`` under the ``target_column`` name. If the list of indices is nonempty, append to the problems the item ``[type_, problem, table, indices]``; otherwise do not append anything. If not ``column_required``, then NaN entries will be ignored in the checking. """ if target_column is None: target_column = column f = df.copy() if target_df is None: g = pd.DataFrame() g[target_column] = np.nan else: g = target_df.copy() if target_column not in g.columns: g[target_column] = np.nan if not column_required: if column not in f.columns: f[column] = np.nan f = f.dropna(subset=[column]) g = g.dropna(subset=[target_column]) cond = ~f[column].isin(g[target_column]) problems = check_table(problems, table, f, cond, f"Undefined {column}") return problems def format_problems( problems: List, *, as_df: bool = False ) -> Union[List, DataFrame]: """ Format the given problems list as a DataFrame. Parameters ---------- problems : list A four-tuple containing 1. A problem type (string) equal to ``'error'`` or ``'warning'``; ``'error'`` means the GTFS is violated; ``'warning'`` means there is a problem but it is not a GTFS violation 2. A message (string) that describes the problem 3. A GTFS table name, e.g. ``'routes'``, in which the problem occurs 4. A list of rows (integers) of the table's DataFrame where the problem occurs as_df : boolean Returns ------- list or DataFrame Return ``problems`` if not ``as_df``; otherwise return a DataFrame with the problems as rows and the columns ``['type', 'message', 'table', 'rows']``. """ if as_df: problems = pd.DataFrame( problems, columns=["type", "message", "table", "rows"] ).sort_values(["type", "table"]) return problems def check_agency( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Check that ``feed.agency`` follows the GTFS. Return a list of problems of the form described in :func:`check_table`; the list will be empty if no problems are found. """ table = "agency" problems = [] # Preliminary checks if feed.agency is None: problems.append(["error", "Missing table", table, []]) else: f = feed.agency.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check service_id problems = check_column_id( problems, table, f, "agency_id", column_required=False ) # Check agency_name problems = check_column(problems, table, f, "agency_name", valid_str) # Check agency_url problems = check_column(problems, table, f, "agency_url", valid_url) # Check agency_timezone problems = check_column( problems, table, f, "agency_timezone", valid_timezone ) # Check agency_fare_url problems = check_column( problems, table, f, "agency_fare_url", valid_url, column_required=False ) # Check agency_lang problems = check_column( problems, table, f, "agency_lang", valid_lang, column_required=False ) # Check agency_phone problems = check_column( problems, table, f, "agency_phone", valid_str, column_required=False ) # Check agency_email problems = check_column( problems, table, f, "agency_email", valid_email, column_required=False ) return format_problems(problems, as_df=as_df) def check_calendar( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.calendar``. """ table = "calendar" problems = [] # Preliminary checks if feed.calendar is None: return problems f = feed.calendar.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check service_id problems = check_column_id(problems, table, f, "service_id") # Check weekday columns v = lambda x: x in range(2) for col in [ "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", ]: problems = check_column(problems, table, f, col, v) # Check start_date and end_date for col in ["start_date", "end_date"]: problems = check_column(problems, table, f, col, valid_date) if include_warnings: # Check if feed has expired d = f["end_date"].max() if feed.calendar_dates is not None and not feed.calendar_dates.empty: table += "/calendar_dates" d = max(d, feed.calendar_dates["date"].max()) if d < dt.datetime.today().strftime(DATE_FORMAT): problems.append(["warning", "Feed expired", table, []]) return format_problems(problems, as_df=as_df) def check_calendar_dates( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.calendar_dates``. """ table = "calendar_dates" problems = [] # Preliminary checks if feed.calendar_dates is None: return problems f = feed.calendar_dates.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check service_id problems = check_column(problems, table, f, "service_id", valid_str) # Check date problems = check_column(problems, table, f, "date", valid_date) # No duplicate (service_id, date) pairs allowed cond = f[["service_id", "date"]].duplicated() problems = check_table( problems, table, f, cond, "Repeated pair (service_id, date)" ) # Check exception_type v = lambda x: x in [1, 2] problems = check_column(problems, table, f, "exception_type", v) return format_problems(problems, as_df=as_df) def check_fare_attributes( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.calendar_dates``. """ table = "fare_attributes" problems = [] # Preliminary checks if feed.fare_attributes is None: return problems f = feed.fare_attributes.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check fare_id problems = check_column_id(problems, table, f, "fare_id") # Check currency_type problems = check_column( problems, table, f, "currency_type", valid_currency ) # Check payment_method v = lambda x: x in range(2) problems = check_column(problems, table, f, "payment_method", v) # Check transfers v = lambda x: pd.isnull(x) or x in range(3) problems = check_column(problems, table, f, "transfers", v) # Check transfer_duration v = lambda x: x >= 0 problems = check_column( problems, table, f, "transfer_duration", v, column_required=False ) return format_problems(problems, as_df=as_df) def check_fare_rules( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.calendar_dates``. """ table = "fare_rules" problems = [] # Preliminary checks if feed.fare_rules is None: return problems f = feed.fare_rules.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check fare_id problems = check_column_linked_id( problems, table, f, "fare_id", feed.fare_attributes ) # Check route_id problems = check_column_linked_id( problems, table, f, "route_id", feed.routes, column_required=False ) # Check origin_id, destination_id, contains_id for col in ["origin_id", "destination_id", "contains_id"]: problems = check_column_linked_id( problems, table, f, col, feed.stops, "zone_id", column_required=False, ) return format_problems(problems, as_df=as_df) def check_feed_info( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.feed_info``. """ table = "feed_info" problems = [] # Preliminary checks if feed.feed_info is None: return problems f = feed.feed_info.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check feed_publisher_name problems = check_column( problems, table, f, "feed_publisher_name", valid_str ) # Check feed_publisher_url problems = check_column( problems, table, f, "feed_publisher_url", valid_url ) # Check feed_lang problems = check_column(problems, table, f, "feed_lang", valid_lang) # Check feed_start_date and feed_end_date cols = ["feed_start_date", "feed_end_date"] for col in cols: problems = check_column( problems, table, f, col, valid_date, column_required=False ) if set(cols) <= set(f.columns): d1, d2 = f.loc[0, ["feed_start_date", "feed_end_date"]].values if pd.notnull(d1) and pd.notnull(d2) and d1 > d1: problems.append( [ "error", "feed_start_date later than feed_end_date", table, [0], ] ) # Check feed_version problems = check_column( problems, table, f, "feed_version", valid_str, column_required=False ) return format_problems(problems, as_df=as_df) def check_frequencies( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.frequencies``. """ table = "frequencies" problems = [] # Preliminary checks if feed.frequencies is None: return problems f = feed.frequencies.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check trip_id problems = check_column_linked_id( problems, table, f, "trip_id", feed.trips ) # Check start_time and end_time time_cols = ["start_time", "end_time"] for col in time_cols: problems = check_column(problems, table, f, col, valid_time) for col in time_cols: f[col] = f[col].map(hp.timestr_to_seconds) # Start_time should be earlier than end_time cond = f["start_time"] >= f["end_time"] problems = check_table( problems, table, f, cond, "start_time not earlier than end_time" ) # Headway periods should not overlap f = f.sort_values(["trip_id", "start_time"]) for __, group in f.groupby("trip_id"): a = group["start_time"].values b = group["end_time"].values indices = np.flatnonzero(a[1:] < b[:-1]).tolist() if indices: problems.append( [ "error", "Headway periods for the same trip overlap", table, indices, ] ) # Check headway_secs v = lambda x: x >= 0 problems = check_column(problems, table, f, "headway_secs", v) # Check exact_times v = lambda x: x in range(2) problems = check_column( problems, table, f, "exact_times", v, column_required=False ) return format_problems(problems, as_df=as_df) def check_routes( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.routes``. """ table = "routes" problems = [] # Preliminary checks if feed.routes is None: problems.append(["error", "Missing table", table, []]) else: f = feed.routes.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check route_id problems = check_column_id(problems, table, f, "route_id") # Check agency_id if "agency_id" in f: if "agency_id" not in feed.agency.columns: problems.append( [ "error", "agency_id column present in routes but not in agency", table, [], ] ) else: g = f.dropna(subset=["agency_id"]) cond = ~g["agency_id"].isin(feed.agency["agency_id"]) problems = check_table( problems, table, g, cond, "Undefined agency_id" ) # Check route_short_name and route_long_name for column in ["route_short_name", "route_long_name"]: problems = check_column( problems, table, f, column, valid_str, column_required=False ) cond = ~(f["route_short_name"].notnull() | f["route_long_name"].notnull()) problems = check_table( problems, table, f, cond, "route_short_name and route_long_name both empty", ) # Check route_type v = lambda x: x in range(8) problems = check_column(problems, table, f, "route_type", v) # Check route_url problems = check_column( problems, table, f, "route_url", valid_url, column_required=False ) # Check route_color and route_text_color for col in ["route_color", "route_text_color"]: problems = check_column( problems, table, f, col, valid_color, column_required=False ) if include_warnings: # Check for duplicated (route_short_name, route_long_name) pairs cond = f[["route_short_name", "route_long_name"]].duplicated() problems = check_table( problems, table, f, cond, "Repeated pair (route_short_name, route_long_name)", "warning", ) # Check for routes without trips s = feed.trips["route_id"] cond = ~f["route_id"].isin(s) problems = check_table( problems, table, f, cond, "Route has no trips", "warning" ) return format_problems(problems, as_df=as_df) def check_shapes( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.shapes``. """ table = "shapes" problems = [] # Preliminary checks if feed.shapes is None: return problems f = feed.shapes.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check shape_id problems = check_column(problems, table, f, "shape_id", valid_str) # Check shape_pt_lon and shape_pt_lat for column, bound in [("shape_pt_lon", 180), ("shape_pt_lat", 90)]: v = lambda x: pd.notnull(x) and -bound <= x <= bound cond = ~f[column].map(v) problems = check_table( problems, table, f, cond, f"{column} out of bounds {[-bound, bound]}", ) # Check for duplicated (shape_id, shape_pt_sequence) pairs cond = f[["shape_id", "shape_pt_sequence"]].duplicated() problems = check_table( problems, table, f, cond, "Repeated pair (shape_id, shape_pt_sequence)" ) # Check if shape_dist_traveled does decreases on a trip if "shape_dist_traveled" in f.columns: g = f.dropna(subset=["shape_dist_traveled"]) indices = [] prev_sid = None prev_dist = -1 cols = ["shape_id", "shape_dist_traveled"] for i, sid, dist in g[cols].itertuples(): if sid == prev_sid and dist < prev_dist: indices.append(i) prev_sid = sid prev_dist = dist if indices: problems.append( [ "error", "shape_dist_traveled decreases on a trip", table, indices, ] ) return format_problems(problems, as_df=as_df) def check_stops( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.stops``. """ table = "stops" problems = [] # Preliminary checks if feed.stops is None: problems.append(["error", "Missing table", table, []]) else: f = feed.stops.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check stop_id problems = check_column_id(problems, table, f, "stop_id") # Check stop_code, stop_desc, zone_id, parent_station for column in ["stop_code", "stop_desc", "zone_id", "parent_station"]: problems = check_column( problems, table, f, column, valid_str, column_required=False ) # Check stop_name problems = check_column(problems, table, f, "stop_name", valid_str) # Check stop_lon and stop_lat for column, bound in [("stop_lon", 180), ("stop_lat", 90)]: v = lambda x: pd.notnull(x) and -bound <= x <= bound cond = ~f[column].map(v) problems = check_table( problems, table, f, cond, f"{column} out of bounds {[-bound, bound]}", ) # Check stop_url problems = check_column( problems, table, f, "stop_url", valid_url, column_required=False ) # Check location_type v = lambda x: x in range(3) problems = check_column( problems, table, f, "location_type", v, column_required=False ) # Check stop_timezone problems = check_column( problems, table, f, "stop_timezone", valid_timezone, column_required=False, ) # Check wheelchair_boarding v = lambda x: x in range(3) problems = check_column( problems, table, f, "wheelchair_boarding", v, column_required=False ) # Check further location_type and parent_station if "parent_station" in f.columns: if "location_type" not in f.columns: problems.append( [ "error", "parent_station column present but location_type column missing", table, [], ] ) else: # Stations must have location type 1 station_ids = f.loc[ f["parent_station"].notnull(), "parent_station" ] cond = f["stop_id"].isin(station_ids) & (f["location_type"] != 1) problems = check_table( problems, table, f, cond, "A station must have location_type 1" ) # Stations must not lie in stations cond = (f["location_type"] == 1) & f["parent_station"].notnull() problems = check_table( problems, table, f, cond, "A station must not lie in another station", ) if include_warnings: # Check for stops without trips s = feed.stop_times["stop_id"] cond = ~feed.stops["stop_id"].isin(s) problems = check_table( problems, table, f, cond, "Stop has no stop times", "warning" ) return format_problems(problems, as_df=as_df) def check_stop_times( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.stop_times``. """ table = "stop_times" problems = [] # Preliminary checks if feed.stop_times is None: problems.append(["error", "Missing table", table, []]) else: f = feed.stop_times.copy().sort_values(["trip_id", "stop_sequence"]) problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check trip_id problems = check_column_linked_id( problems, table, f, "trip_id", feed.trips ) # Check arrival_time and departure_time v = lambda x: pd.isnull(x) or valid_time(x) for col in ["arrival_time", "departure_time"]: problems = check_column(problems, table, f, col, v) # Check that arrival and departure times exist for the first and last # stop of each trip and for each timepoint. # For feeds with many trips, iterating through the stop time rows is # faster than uisg groupby. if "timepoint" not in f.columns: f["timepoint"] = np.nan # This will not mess up later timepoint check indices = [] prev_tid = None prev_atime = 1 prev_dtime = 1 for i, tid, atime, dtime, tp in f[ ["trip_id", "arrival_time", "departure_time", "timepoint"] ].itertuples(): if tid != prev_tid: # Check last stop of previous trip if pd.isnull(prev_atime) or pd.isnull(prev_dtime): indices.append(i - 1) # Check first stop of current trip if pd.isnull(atime) or pd.isnull(dtime): indices.append(i) elif tp == 1 and (pd.isnull(atime) or pd.isnull(dtime)): # Failure at timepoint indices.append(i) prev_tid = tid prev_atime = atime prev_dtime = dtime if indices: problems.append( [ "error", "First/last/time point arrival/departure time missing", table, indices, ] ) # Check stop_id problems = check_column_linked_id( problems, table, f, "stop_id", feed.stops ) # Check for duplicated (trip_id, stop_sequence) pairs cond = f[["trip_id", "stop_sequence"]].dropna().duplicated() problems = check_table( problems, table, f, cond, "Repeated pair (trip_id, stop_sequence)" ) # Check stop_headsign problems = check_column( problems, table, f, "stop_headsign", valid_str, column_required=False ) # Check pickup_type and drop_off_type for col in ["pickup_type", "drop_off_type"]: v = lambda x: x in range(4) problems = check_column( problems, table, f, col, v, column_required=False ) # Check if shape_dist_traveled decreases on a trip if "shape_dist_traveled" in f.columns: g = f.dropna(subset=["shape_dist_traveled"]) indices = [] prev_tid = None prev_dist = -1 for i, tid, dist in g[["trip_id", "shape_dist_traveled"]].itertuples(): if tid == prev_tid and dist < prev_dist: indices.append(i) prev_tid = tid prev_dist = dist if indices: problems.append( [ "error", "shape_dist_traveled decreases on a trip", table, indices, ] ) # Check timepoint v = lambda x: x in range(2) problems = check_column( problems, table, f, "timepoint", v, column_required=False ) if include_warnings: # Check for duplicated (trip_id, departure_time) pairs cond = f[["trip_id", "departure_time"]].duplicated() problems = check_table( problems, table, f, cond, "Repeated pair (trip_id, departure_time)", "warning", ) return format_problems(problems, as_df=as_df) def check_transfers( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.transfers``. """ table = "transfers" problems = [] # Preliminary checks if feed.transfers is None: return problems f = feed.transfers.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check from_stop_id and to_stop_id for col in ["from_stop_id", "to_stop_id"]: problems = check_column_linked_id( problems, table, f, col, feed.stops, "stop_id" ) # Check transfer_type v = lambda x: pd.isnull(x) or x in range(5) problems = check_column( problems, table, f, "transfer_type", v, column_required=False ) # Check min_transfer_time v = lambda x: x >= 0 problems = check_column( problems, table, f, "min_transfer_time", v, column_required=False ) return format_problems(problems, as_df=as_df) def check_trips( feed: "Feed", *, as_df: bool = False, include_warnings: bool = False ) -> List: """ Analog of :func:`check_agency` for ``feed.trips``. """ table = "trips" problems = [] # Preliminary checks if feed.trips is None: problems.append(["error", "Missing table", table, []]) else: f = feed.trips.copy() problems = check_for_required_columns(problems, table, f) if problems: return format_problems(problems, as_df=as_df) if include_warnings: problems = check_for_invalid_columns(problems, table, f) # Check trip_id problems = check_column_id(problems, table, f, "trip_id") # Check route_id problems = check_column_linked_id( problems, table, f, "route_id", feed.routes ) # Check service_id g = pd.DataFrame() if feed.calendar is not None: g = pd.concat([g, feed.calendar]) if feed.calendar_dates is not None: g = pd.concat([g, feed.calendar_dates]) problems = check_column_linked_id(problems, table, f, "service_id", g) # Check direction_id v = lambda x: x in range(2) problems = check_column( problems, table, f, "direction_id", v, column_required=False ) # Check block_id if "block_id" in f.columns: v = lambda x: pd.isnull(x) or valid_str(x) cond = ~f["block_id"].map(v) problems = check_table(problems, table, f, cond, "Blank block_id") # Check shape_id problems = check_column_linked_id( problems, table, f, "shape_id", feed.shapes, column_required=False ) # Check wheelchair_accessible and bikes_allowed v = lambda x: x in range(3) for column in ["wheelchair_accessible", "bikes_allowed"]: problems = check_column( problems, table, f, column, v, column_required=False ) # Check for trips with no stop times if include_warnings: s = feed.stop_times["trip_id"] cond = ~f["trip_id"].isin(s) problems = check_table( problems, table, f, cond, "Trip has no stop times", "warning" ) return format_problems(problems, as_df=as_df) def validate( feed: "Feed", *, as_df: bool = True, include_warnings: bool = True ) -> Union[List, DataFrame]: """ Check whether the given feed satisfies the GTFS. Parameters ---------- feed : Feed as_df : boolean If ``True``, then return the resulting report as a DataFrame; otherwise return the result as a list include_warnings : boolean If ``True``, then include problems of types ``'error'`` and ``'warning'``; otherwise, only return problems of type ``'error'`` Returns ------- list or DataFrame Run all the table-checking functions: :func:`check_agency`, :func:`check_calendar`, etc. This yields a possibly empty list of items [problem type, message, table, rows]. If ``as_df``, then format the error list as a DataFrame with the columns - ``'type'``: 'error' or 'warning'; 'error' means the GTFS is violated; 'warning' means there is a problem but it's not a GTFS violation - ``'message'``: description of the problem - ``'table'``: table in which problem occurs, e.g. 'routes' - ``'rows'``: rows of the table's DataFrame where problem occurs Return early if the feed is missing required tables or required columns. Notes ----- - This function interprets the GTFS liberally, classifying problems as warnings rather than errors where the GTFS is unclear. For example if a trip_id listed in the trips table is not listed in the stop times table (a trip with no stop times), then that's a warning and not an error. - Timing benchmark: on a 2.80 GHz processor machine with 16 GB of memory, this function checks `this 31 MB Southeast Queensland feed `_ in 22 seconds, including warnings. """ problems = [] # Check for invalid columns and check the required tables checkers = [ "check_agency", "check_calendar", "check_calendar_dates", "check_fare_attributes", "check_fare_rules", "check_feed_info", "check_frequencies", "check_routes", "check_shapes", "check_stops", "check_stop_times", "check_transfers", "check_trips", ] for checker in checkers: problems.extend( globals()[checker](feed, include_warnings=include_warnings) ) # Check calendar/calendar_dates combo if feed.calendar is None and feed.calendar_dates is None: problems.append( ["error", "Missing both tables", "calendar & calendar_dates", []] ) return format_problems(problems, as_df=as_df) PK!ޫ\;;"gtfstk-9.4.0.dist-info/LICENSE.txtThe MIT License (MIT) Copyright (c) 2014 Alexander Raichev Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK!HڽTUgtfstk-9.4.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HКHgtfstk-9.4.0.dist-info/METADATA\]{۶ׯM'-q$=&M&+ "! 5Ii[߾D9yN.[`wf|Pd#Tզd:( u%6,nڢJ >m)ir">+S Y|Uk˻R*gSq%0i*{urͦ]&)N:RmOܪT~2jX2WFݹǪ:O>Z]+;fQAW7VjJzs-~ܩ +jeQ6庅xO./j\?)g\J<4%*YfI2;MΎ_gCiNLΏ_M_YGmWMm;Ee)[d 5dŽaZ[{bU|# cޗ l4Ge+mLeBVs oXn"rߺ2]kq1 ~Ib#6lE=uƳAQ-*8 F¬ MhBTmB{.xLo:##Zɪ6hQ>Otmxvh"FJf_ Y"lO9 SNφ|஭mN=mXO4' 9 LUrX Yi4Idzi`溓8a|TOWN.d:R sI4CcfaXQ`zj8[r:%)A I[ SlMO v!|ĺM"Gͼ1sДrBl[UB+ ~Y0[ ;1Nuй[[F_z-}(o2E(%sum "^jxɯ3:lv-~1pYщ'j@q@ު9Õ~M6zy>׷=&;脟Ƴ!.XM@FGe@Kjں2^MbvR|hF$ILD[9zߒުDa޲2MUh",Io뷖c]yeU[#'$*X -LXU[υTsR5|VvXxڂ`U$I :@mD\[R,oQ)ZXT9jn02}=z(@ H E(R3"p@[4C }Z+'>TJ!+t?Zܗ/Ym$xv},],2WS,y% Aapn\`#1|^,/;w,E Vn]+tf0qH.M dIqڜ-& )Q]zUkculuACĆ h1x)4={2xMCq3^9R+*?Tv}ڨoě о} ` 0r8{TY'(IمfP"P v`[CkvW ՅZQ~뢨(elW\,&GMgD Yfq GLpH[C1/]2AjJH*q m26kچn`LF G+pk&#`؋w8Fr  L1sx;xm%GG)dpuN0˞$]sgM%f2@3̃cEAۢ#K̰6A%$c.o/YZ[jTĠb6J7H$ܚ&YŖΎh9P0ܾ&dypUMJv% mGp<ࣿU$BP%h r~C`dI2\<3䑹+42^k5i%`2e Gg_wHNv Y"oLh,c4-# fFtUǔ{x*g "tSGqi񿭂=Ҍ.SQ?uy{2'(1f`)3_5YS?]YO  { i L-c7<[.Ϗ1\L46!nDȊoD)C虾$?= I֍00N[s` [K5ΦPcHjJ irx5R=$>c`ـUadèp300zmԲo(U𸫃oB$-Wn\ǂݫ9P  1u 9*aՁZ8fs+@GS;6(M=Fg>/~EIdaK;^}ȅUgLt9eHUPpm'E__CQz,H_!b&5=yj};5mD:.\$D g :$!y/81!=ՎNA׎{t9̔Djx6wd( q1K,b۸ 6fV.0{P(TV$p2dTXDd'kgHù uKӸuuȽrPeLC1RuCy90GС).-CK=h6ԅ U7+!\[gQCM")Z쀆T; o1, =C䅤 T' :ɍD22\!WUǹx fR`Gx$>>!֙wtXgQ%R0$ U;点nxpѨ6&\An6:_O#7nl`NwyQ# |NGc1,۵/0*j\(~1.}ueCf1S%fn;39"`I{χ8J7|qr|@v{}Ʒ*{gಳTPA8s |],!nx%ά">,[Tá5րXccBf(S5R +!-<])=u.mB4u/Rq2<\k>ߙtԖ4 Y4oioizߪc!?=ݙp )dt#j; 7/+KI&8 PXr'`VIiEɠjz𨮾R5Ab<>xo# wb*:{y\~;/3B--U*[:羒W @}y@eO5${ G;jM/Do"]}kAY$P~R}ZnoɧaAN5!p>XauYǽ(-rZվXRߪ̱ٓh#S NMV]ܔ8DDW͛FhrXݟ mZ2t .S+~(L%_>iEϢ?@|5~#slw:Ustzp^lQ/ܣrUj Ţ} /ScB'/' Q>?ю5+)QؿH`Y]h4`Y=kMԎ (0/ߖ6'+u"x0ovHA]*9sȷAGtA_3BS "<7:0[zz݋_?n.:]ه[&LmS4mkѝȐ#4#Q8ֆ;ވ5ӐQ(LYs㸹f9 ^ @ s7nDBI6=\4:Off@Xn9@jf BoNdyevĬ3NQ[A9czum"eI؁\ w#!euUDUԚ qT%L\d4áSOuzмc .d`}[ n+q[o!OS'@6}_V2y f#ǝ#9|ĴXapkIrk!W+H&|@PLݕAYXP|t𡩨f,xֹYY$@~˾.7"Ja1w֖O'% CVRnӵH8u;Ub3-mt{mցAE_IRݞuZK N,q7!%^ljw˿t>rLK>徝ڸq D7/\]^+_Y_S?޲2| 0ˑNGz`/6}@fl]1}/44`Sa\1_K4fMufKn0}۶_Ok(1ZG(ٷއ㝨HΕmcӆ&Qhk6٬S嗏{i@̢uƒ|ϹI x8 fƓ_oo(C/}s<>N^m1I[辮İwl9.#!M[`r?׋]m;]Mb맿[=N'l_ ~hGhwq'9 ^nETSRvݰ99G|!݁r-$$KO/m[L|PS!rvĠ5PoW:Cr:yxhu~ !Qow}Q 凒>n\ hq5X@wG8_;d~6 :h3m"ۦo= : su5e/kEa #Q>!Z](ֆf'#xg5,dw>ٹU܎O_Џc )F+nER4bYso5礭_MT 0P>o#?}\t=Ծn J#hJbF[Ms$O-xN#՝9G;i UGܘ"$orVսb PFGˋR5hz U$b>jk!PK!z&,00gtfstk/__init__.pyPK!12P P `gtfstk/calendar.pyPK!n??gtfstk/cleaners.pyPK!G˸O,gtfstk/constants.pyPK!S888Egtfstk/feed.pyPK!%ca4a4S~gtfstk/helpers.pyPK!D[ llgtfstk/miscellany.pyPK!!i;gtfstk/routes.pyPK![gtfstk/shapes.pyPK!R.Kgtfstk/stop_times.pyPK!*Έss4gtfstk/stops.pyPK!tt |T|Tkgtfstk/trips.pyPK!D|}gtfstk/validators.pyPK!ޫ\;;"vogtfstk-9.4.0.dist-info/LICENSE.txtPK!HڽTUsgtfstk-9.4.0.dist-info/WHEELPK!HКHtgtfstk-9.4.0.dist-info/METADATAPK!H;!!Vgtfstk-9.4.0.dist-info/RECORDPKj