From 948dca284c6876406631c057984930edb04952d3 Mon Sep 17 00:00:00 2001 From: Andrew Date: Sun, 9 Mar 2025 14:30:29 +0000 Subject: [PATCH] [server]: Optimise transient_data.py --- .../transient_data/transient_data.py | 162 ++++++++++++------ 1 file changed, 113 insertions(+), 49 deletions(-) diff --git a/server/src/functions/transient_data/transient_data.py b/server/src/functions/transient_data/transient_data.py index d70c3b0..932a853 100644 --- a/server/src/functions/transient_data/transient_data.py +++ b/server/src/functions/transient_data/transient_data.py @@ -5,27 +5,39 @@ import requests import os import boto3 import time +from concurrent.futures import ThreadPoolExecutor +# Create a reusable session for requests +session = requests.Session() + +# Setup DynamoDB client dynamodb = boto3.resource("dynamodb") +table_name = os.environ.get("DYNAMODB_TABLE", "transient_data") +table = dynamodb.Table(table_name) + timestamp = str(int(time.time())) # API URLs irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/" - -# function to fetch Irish Rail train data def fetch_trains(): + """ + Fetches train data from the Irish Rail API. + + Returns: + list: A list of dictionaries containing train data. + """ print("Fetching Irish Rail data.") api_function = "getCurrentTrainsXML_WithTrainType?TrainType=" train_types = ["M", "S", "D"] trains = [] for train_type in train_types: - response = requests.get(irishrail_url + api_function + train_type) + response = session.get(irishrail_url + api_function + train_type) response.raise_for_status() trains_xml = response.text - trains_json = json.loads(json.dumps(xmltodict.parse(trains_xml))) + trains_json = xmltodict.parse(trains_xml) for train in trains_json["ArrayOfObjTrainPositions"]["objTrainPositions"]: trains.append({ @@ -34,7 +46,6 @@ def fetch_trains(): "timestamp": timestamp, "latitude": str(train["TrainLatitude"]), "longitude": str(train["TrainLongitude"]), - "trainCode": str(train["TrainCode"]), "trainType": train_type, "trainStatus": train["TrainStatus"], @@ -45,29 +56,39 @@ def fetch_trains(): return trains -# function to fetch Luas stops data and the forecasted trams associated with each stop def fetch_luas(): + """ + Fetches Luas stop and forecast data. + + Returns: + list: A list of dictionaries containing Luas stop and forecast data. + """ print("Fetching Luas data.") stops = [] - stops_tsv = requests.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig') + stops_tsv = session.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig') tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t") - stops_json = [row for row in tsv_reader] - for stop in stops_json: - response = requests.get("https://luasforecasts.rpa.ie/xml/get.ashx?action=forecast&stop=" + stop["Abbreviation"] + "&encrypt=false") + def fetch_forecast(stop): + """ + Fetches forecast data for a given Luas stop. + + Args: + stop (dict): A dictionary containing Luas stop information. + + Returns: + dict: A dictionary containing Luas stop and forecast data. + """ + response = session.get(f"https://luasforecasts.rpa.ie/xml/get.ashx?action=forecast&stop={stop['Abbreviation']}&encrypt=false") response.raise_for_status() - trams_xml = response.text - trams_json = json.loads(json.dumps(xmltodict.parse(trams_xml))) - - stops.append({ + trams_json = xmltodict.parse(trams_xml) + return { "objectID": "LuasStop-" + stop["Abbreviation"], "objectType": "LuasStop", "timestamp": timestamp, "latitude": str(stop["Latitude"]), "longitude": str(stop["Longitude"]), - "luasStopName": stop["Name"], "luasStopIrishName": stop["IrishName"], "luasStopID": str(stop["StopID"]), @@ -81,13 +102,32 @@ def fetch_luas(): "luasStopZoneCountB": str(stop["ZoneCountB"]), "luasStopMessage": str(trams_json["stopInfo"]["message"]), "luasStopTrams": str(trams_json["stopInfo"]["direction"]) - }) + } + + with ThreadPoolExecutor() as executor: + stops = list(executor.map(fetch_forecast, tsv_reader)) return stops +def fetch_bus_routes(): + """ + Fetches bus route data from the permanent data API. + + Returns: + list: A list of dictionaries containing bus route data. + """ + permanent_data_api = os.environ["PERMANENT_DATA_API"] + response = session.get(permanent_data_api + "?objectType=BusRoute") + response.raise_for_status() + return response.json() -# function to fetch bus data def fetch_buses(): + """ + Fetches bus data from the National Transport API. + + Returns: + list: A list of dictionaries containing bus data. + """ print("Fetching bus data.") buses = [] api_url = "https://api.nationaltransport.ie/gtfsr/v2/Vehicles?format=json" @@ -96,7 +136,7 @@ def fetch_buses(): "x-api-key": os.getenv("GTFS_KEY") } - response = requests.get(api_url, headers=headers) + response = session.get(api_url, headers=headers) response.raise_for_status() buses_json = response.json() @@ -105,59 +145,83 @@ def fetch_buses(): for bus in buses_json["entity"]: busRouteID = str(bus["vehicle"]["trip"]["route_id"]) - + route_info = bus_routes_hashmap.get(busRouteID, {}) buses.append({ "objectID": "Bus-" + bus["id"], "objectType": "Bus", "timestamp": timestamp, "latitude": str(bus["vehicle"]["position"]["latitude"]), "longitude": str(bus["vehicle"]["position"]["longitude"]), - "busID": str(bus["id"]), - "busTripID": str(bus["vehicle"]["trip"]["trip_id"]), "busStartTime": str(bus["vehicle"]["trip"]["start_time"]), "busStartDate": str(bus["vehicle"]["trip"]["start_date"]), "busScheduleRelationship": str(bus["vehicle"]["trip"]["schedule_relationship"]), "busRoute": busRouteID, - "busRouteAgencyName": str(bus_routes_hashmap[busRouteID]["busRouteAgencyName"]), - "busRouteLongName": str(bus_routes_hashmap[busRouteID]["busRouteLongName"]), - "busRouteShortName": str(bus_routes_hashmap[busRouteID]["busRouteShortName"]), + "busRouteAgencyName": route_info.get("busRouteAgencyName", ""), + "busRouteLongName": route_info.get("busRouteLongName", ""), + "busRouteShortName": route_info.get("busRouteShortName", ""), "busDirection": str(bus["vehicle"]["trip"]["direction_id"]), }) return buses -# function to fetch bus route data -def fetch_bus_routes(): - permanent_data_api = os.environ["PERMANENT_DATA_API"] - routes = requests.get(permanent_data_api + "?objectType=BusRoute").json() - - return routes +def batch_upload_to_dynamodb(data): + """ + Uploads data to DynamoDB in batches. + Args: + data (list): A list of dictionaries containing data to be uploaded. + """ + with table.batch_writer() as batch: + for item in data: + batch.put_item(Item=item) def lambda_handler(event, context): + """ + AWS Lambda handler function to fetch and upload data. + + Args: + event (dict): Event data passed to the Lambda function. + context (object): Runtime information of the Lambda function. + + Returns: + dict: A dictionary containing the status code and message. + """ print("Lambda handler triggered; fetching data.") - data = fetch_trains() + fetch_buses() - print("Data retrieved successfully.") + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(fetch_trains), + executor.submit(fetch_luas), + executor.submit(fetch_buses) + ] + data = [] + for future in futures: + data.extend(future.result()) - table_name = os.environ.get("DYNAMODB_TABLE", "transient_data") - table = dynamodb.Table(table_name) + print(f"Retrieved {len(data)} records.") + print("Uploading to DynamoDB...") + chunk_size = 25 + for i in range(0, len(data), chunk_size): + batch_upload_to_dynamodb(data[i:i + chunk_size]) + print("Upload completed.") - print("Attempting to batch upload retrieved data to DynamoDB.") + return { + 'statusCode': 200, + 'body': json.dumps({'message': 'Data uploaded successfully!'}) + } - try: - with table.batch_writer() as batch: - for record in data: - batch.put_item(Item=record) +if __name__ == "__main__": + """ + Main function to fetch and print data locally. + """ + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(fetch_trains), + executor.submit(fetch_buses) + ] + data = [] + for future in futures: + data.extend(future.result()) - print("Completed data upload.") - - return { - 'statusCode': 200, - 'body': json.dumps({'message': 'Data inserted successfully!'}) - } - except Exception as e: - return {"statusCode": 500, "error": str(e)} - -lambda_handler("event", "context") + print(json.dumps(data)) \ No newline at end of file