From d28cf0c036b9b90ec7ea1338b271f6f7e9b64596 Mon Sep 17 00:00:00 2001 From: Andrew Date: Sun, 9 Mar 2025 14:00:43 +0000 Subject: [PATCH] [server]: Optimise permanent_data.py --- .../permanent_data/permanent_data.py | 279 +++++++++--------- 1 file changed, 146 insertions(+), 133 deletions(-) diff --git a/server/src/functions/permanent_data/permanent_data.py b/server/src/functions/permanent_data/permanent_data.py index d4c84fd..b204f77 100644 --- a/server/src/functions/permanent_data/permanent_data.py +++ b/server/src/functions/permanent_data/permanent_data.py @@ -6,23 +6,31 @@ import zipfile import io import os import boto3 +from concurrent.futures import ThreadPoolExecutor +# Create a reusable session for requests +session = requests.Session() + +# Setup DynamoDB client for Lambda dynamodb = boto3.resource("dynamodb") +table_name = os.environ.get("DYNAMODB_TABLE", "permanent_data") +table = dynamodb.Table(table_name) -# API URLs irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/" -# function to fetch Irish Rail station data with types -# this function seems to be missing stations -- the API must have some uncategorised stations that it won't return -# unfortunately, this is the only way to categorise stations as the API won't return the station's category def fetch_train_stations_with_type(): - api_function = "getAllStationsXML_WithStationType?StationType=" + """ + Fetch train stations from the Irish Rail API with specific station types. + + Returns: + list: A list of dictionaries containing train station data with types. + """ station_types = ["M", "S", "D"] stations = [] - for station_type in station_types: - stations_xml = requests.get(irishrail_url + api_function + station_type).text - stations_json = json.loads(json.dumps(xmltodict.parse(stations_xml))) + response = session.get(irishrail_url + f"getAllStationsXML_WithStationType?StationType={station_type}") + stations_xml = response.text + stations_json = xmltodict.parse(stations_xml) for station in stations_json["ArrayOfObjStation"]["objStation"]: stations.append({ @@ -30,169 +38,174 @@ def fetch_train_stations_with_type(): "objectType": "IrishRailStation", "latitude": station["StationLatitude"], "longitude": station["StationLongitude"], - "trainStationID": station["StationId"], "trainStationCode": station["StationCode"], - "trainStationAlias": station["StationAlias"], + "trainStationAlias": station.get("StationAlias", ""), "trainStationDesc": station["StationDesc"], "trainStationType": station_type }) - return stations -# function to fetch Irish Rail station data without types def fetch_train_stations(): - api_function = "getAllStationsXML" - stations = [] - - stations_xml = requests.get(irishrail_url + api_function).text - stations_json = json.loads(json.dumps(xmltodict.parse(stations_xml))) - - for station in stations_json["ArrayOfObjStation"]["objStation"]: - stations.append({ - "objectID": "IrishRailStation-" + station["StationCode"], - "objectType": "IrishRailStation", - "latitude": station["StationLatitude"], - "longitude": station["StationLongitude"], - - "trainStationID": station["StationId"], - "trainStationCode": station["StationCode"], - "trainStationAlias": station["StationAlias"], - "trainStationDesc": station["StationDesc"], - }) + """ + Fetch all train stations from the Irish Rail API. + Returns: + list: A list of dictionaries containing train station data. + """ + response = session.get(irishrail_url + "getAllStationsXML") + stations_xml = response.text + stations_json = xmltodict.parse(stations_xml) + stations = [{ + "objectID": "IrishRailStation-" + station["StationCode"], + "objectType": "IrishRailStation", + "latitude": station["StationLatitude"], + "longitude": station["StationLongitude"], + "trainStationID": station["StationId"], + "trainStationCode": station["StationCode"], + "trainStationAlias": station.get("StationAlias", ""), + "trainStationDesc": station["StationDesc"] + } for station in stations_json["ArrayOfObjStation"]["objStation"]] return stations - - -# function to fetch Luas stops data def fetch_luas(): - stops = [] + """ + Fetch Luas stops from the TII dataset. - stops_tsv = requests.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig') + Returns: + list: A list of dictionaries containing Luas stop data. + """ + response = session.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt") + stops_tsv = response.content.decode('utf-8-sig') tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t") - stops_json = [row for row in tsv_reader] - - for stop in stops_json: - stops.append({ - "objectID": "LuasStop-" + stop["Abbreviation"], - "objectType": "LuasStop", - "latitude": stop["Latitude"], - "longitude": stop["Longitude"], - - "luasStopName": stop["Name"], - "luasStopIrishName": stop["IrishName"], - "luasStopID": stop["StopID"], - "luasStopCode": stop["Abbreviation"], - "luasStopLineID": stop["LineID"], - "luasStopSortOrder": stop["SortOrder"], - "luasStopIsEnabled": stop["IsEnabled"], - "luasStopIsParkAndRide": stop["IsParkAndRide"], - "luasStopIsCycleAndRide": stop["IsCycleAndRide"], - "luasStopZoneCountA": stop["ZoneCountA"], - "luasStopZoneCountB": stop["ZoneCountB"], - }) - + stops = [{ + "objectID": "LuasStop-" + stop["Abbreviation"], + "objectType": "LuasStop", + "latitude": stop["Latitude"], + "longitude": stop["Longitude"], + "luasStopName": stop["Name"], + "luasStopIrishName": stop["IrishName"], + "luasStopID": stop["StopID"], + "luasStopCode": stop["Abbreviation"], + "luasStopLineID": stop["LineID"], + "luasStopSortOrder": stop["SortOrder"], + "luasStopIsEnabled": stop["IsEnabled"], + "luasStopIsParkAndRide": stop["IsParkAndRide"], + "luasStopIsCycleAndRide": stop["IsCycleAndRide"], + "luasStopZoneCountA": stop["ZoneCountA"], + "luasStopZoneCountB": stop["ZoneCountB"] + } for stop in tsv_reader] return stops - def fetch_gtfs(): - data = [] + """ + Fetch GTFS data from the Transport for Ireland dataset. + + Returns: + list: A list of dictionaries containing GTFS data. + """ url = "https://www.transportforireland.ie/transitData/Data/GTFS_All.zip" - zip_file = requests.get(url).content + zip_file = session.get(url).content + data = [] with zipfile.ZipFile(io.BytesIO(zip_file)) as zip: - # will need to access the list of agencies for later objects, so keeping separate - agencies = [] - - # extract agencies data if "agency.txt" in zip.namelist(): with zip.open("agency.txt") as file: agencies_csv = file.read().decode('utf-8') - csv_reader = csv.DictReader(agencies_csv.splitlines(), delimiter=",") - agencies_json = [row for row in csv_reader] + agencies = [{ + "objectID": "BusAgency" + agency["agency_id"], + "objectType": "BusAgency", + "busAgencyID": agency["agency_id"], + "busAgencyName": agency["agency_name"], + "busAgencyURL": agency["agency_url"] + } for agency in csv.DictReader(agencies_csv.splitlines())] + data.extend(agencies) - for agency in agencies_json: - agencies.append({ - "objectID": "BusAgency" + agency["agency_id"], - "objectType": "BusAgency", - # no latitude or longitude - - "busAgencyID": agency["agency_id"], - "busAgencyName": agency["agency_name"], - "busAgencyURL": agency["agency_url"] - }) - - data += agencies - - # extract routes data if "routes.txt" in zip.namelist(): with zip.open("routes.txt") as file: routes_csv = file.read().decode('utf-8') - csv_reader = csv.DictReader(routes_csv.splitlines(), delimiter=",") - routes_json = [row for row in csv_reader] + data.extend([{ + "objectID": "BusRoute-" + route["route_id"], + "objectType": "BusRoute", + "busRouteID": route["route_id"], + "busRouteAgencyID": route["agency_id"], + "busRouteShortName": route["route_short_name"], + "busRouteLongName": route["route_long_name"], + "busRouteAgencyName": next((agency['busAgencyName'] for agency in data if agency['busAgencyID'] == route["agency_id"]), None) + } for route in csv.DictReader(routes_csv.splitlines())]) - for route in routes_json: - data.append({ - "objectID": "BusRoute-" + route["route_id"], - "objectType": "BusRoute", - # no latitude or longitude - - "busRouteID": route["route_id"], - "busRouteAgencyID": route["agency_id"], - "busRouteAgencyName": next((agency['busAgencyName'] for agency in agencies if agency['busAgencyID'] == route["agency_id"]), None), - "busRouteShortName": route["route_short_name"], - "busRouteLongName": route["route_long_name"] - }) - - # extract stops data if "stops.txt" in zip.namelist(): with zip.open("stops.txt") as file: stops_csv = file.read().decode('utf-8') - csv_reader = csv.DictReader(stops_csv.splitlines(), delimiter=",") - stops_json = [row for row in csv_reader] - - for stop in stops_json: - data.append({ - "objectID": "BusStop-" + stop["stop_id"], - "objectType": "BusStop", - "latitude": stop["stop_lat"], - "longitude": stop["stop_lon"], - - "busStopID": stop["stop_id"], - "busStopCode": stop["stop_code"], - "busStopName": stop["stop_name"] - }) - + data.extend([{ + "objectID": "BusStop-" + stop["stop_id"], + "objectType": "BusStop", + "latitude": stop["stop_lat"], + "longitude": stop["stop_lon"], + "busStopID": stop["stop_id"], + "busStopCode": stop.get("stop_code", ""), + "busStopName": stop["stop_name"] + } for stop in csv.DictReader(stops_csv.splitlines())]) return data +def batch_upload_to_dynamodb(data): + """ + Batch upload data to DynamoDB. + + Args: + data (list): A list of dictionaries containing data to be uploaded. + """ + with table.batch_writer() as batch: + for item in data: + batch.put_item(Item=item) def lambda_handler(event, context): + """ + AWS Lambda handler to fetch data and upload it to DynamoDB. + + Args: + event (dict): Event data passed to the Lambda function. + context (object): Runtime information of the Lambda function. + + Returns: + dict: A dictionary containing the status code and message. + """ print("Lambda Handler invoked! Retrieving data...") - data = fetch_train_stations() + fetch_luas() + fetch_gtfs() - print("Data retrieved successfully") - table_name = os.environ.get("DYNAMODB_TABLE", "permanent_data") - table = dynamodb.Table(table_name) + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(fetch_train_stations), + executor.submit(fetch_luas), + executor.submit(fetch_gtfs) + ] + data = [] + for future in futures: + data.extend(future.result()) - print("Attempting to batch upload retrieved data") + print(f"Retrieved {len(data)} records.") + print("Uploading to DynamoDB...") + chunk_size = 25 + for i in range(0, len(data), chunk_size): + batch_upload_to_dynamodb(data[i:i + chunk_size]) + print("Upload completed.") - try: - with table.batch_writer() as batch: - for record in data: - batch.put_item(Item=record) + return { + 'statusCode': 200, + 'body': json.dumps({'message': 'Data uploaded successfully!'}) + } - print("done uploading") +if __name__ == "__main__": + """ + Main function to fetch data and print it locally. + """ + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(fetch_train_stations), + executor.submit(fetch_luas), + executor.submit(fetch_gtfs) + ] + data = [] + for future in futures: + data.extend(future.result()) - return { - 'statusCode': 200, - 'body': json.dumps({'message': 'Data inserted successfully!'}) - } - - except Exception as e: - return {"statusCode": 500, "error": str(e)} - - -if "__main__" == __name__: - data = fetch_train_stations() + fetch_luas() + fetch_gtfs() - print(json.dumps(data)) + print(json.dumps(data)) \ No newline at end of file