[server]: Optimise transient_data.py

This commit is contained in:
2025-03-09 14:30:29 +00:00
parent f281114eb1
commit 948dca284c

View File

@ -5,27 +5,39 @@ import requests
import os import os
import boto3 import boto3
import time import time
from concurrent.futures import ThreadPoolExecutor
# Create a reusable session for requests
session = requests.Session()
# Setup DynamoDB client
dynamodb = boto3.resource("dynamodb") dynamodb = boto3.resource("dynamodb")
table_name = os.environ.get("DYNAMODB_TABLE", "transient_data")
table = dynamodb.Table(table_name)
timestamp = str(int(time.time())) timestamp = str(int(time.time()))
# API URLs # API URLs
irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/" irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/"
# function to fetch Irish Rail train data
def fetch_trains(): def fetch_trains():
"""
Fetches train data from the Irish Rail API.
Returns:
list: A list of dictionaries containing train data.
"""
print("Fetching Irish Rail data.") print("Fetching Irish Rail data.")
api_function = "getCurrentTrainsXML_WithTrainType?TrainType=" api_function = "getCurrentTrainsXML_WithTrainType?TrainType="
train_types = ["M", "S", "D"] train_types = ["M", "S", "D"]
trains = [] trains = []
for train_type in train_types: for train_type in train_types:
response = requests.get(irishrail_url + api_function + train_type) response = session.get(irishrail_url + api_function + train_type)
response.raise_for_status() response.raise_for_status()
trains_xml = response.text trains_xml = response.text
trains_json = json.loads(json.dumps(xmltodict.parse(trains_xml))) trains_json = xmltodict.parse(trains_xml)
for train in trains_json["ArrayOfObjTrainPositions"]["objTrainPositions"]: for train in trains_json["ArrayOfObjTrainPositions"]["objTrainPositions"]:
trains.append({ trains.append({
@ -34,7 +46,6 @@ def fetch_trains():
"timestamp": timestamp, "timestamp": timestamp,
"latitude": str(train["TrainLatitude"]), "latitude": str(train["TrainLatitude"]),
"longitude": str(train["TrainLongitude"]), "longitude": str(train["TrainLongitude"]),
"trainCode": str(train["TrainCode"]), "trainCode": str(train["TrainCode"]),
"trainType": train_type, "trainType": train_type,
"trainStatus": train["TrainStatus"], "trainStatus": train["TrainStatus"],
@ -45,29 +56,39 @@ def fetch_trains():
return trains return trains
# function to fetch Luas stops data and the forecasted trams associated with each stop
def fetch_luas(): def fetch_luas():
"""
Fetches Luas stop and forecast data.
Returns:
list: A list of dictionaries containing Luas stop and forecast data.
"""
print("Fetching Luas data.") print("Fetching Luas data.")
stops = [] stops = []
stops_tsv = requests.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig') stops_tsv = session.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig')
tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t") tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t")
stops_json = [row for row in tsv_reader]
for stop in stops_json: def fetch_forecast(stop):
response = requests.get("https://luasforecasts.rpa.ie/xml/get.ashx?action=forecast&stop=" + stop["Abbreviation"] + "&encrypt=false") """
Fetches forecast data for a given Luas stop.
Args:
stop (dict): A dictionary containing Luas stop information.
Returns:
dict: A dictionary containing Luas stop and forecast data.
"""
response = session.get(f"https://luasforecasts.rpa.ie/xml/get.ashx?action=forecast&stop={stop['Abbreviation']}&encrypt=false")
response.raise_for_status() response.raise_for_status()
trams_xml = response.text trams_xml = response.text
trams_json = json.loads(json.dumps(xmltodict.parse(trams_xml))) trams_json = xmltodict.parse(trams_xml)
return {
stops.append({
"objectID": "LuasStop-" + stop["Abbreviation"], "objectID": "LuasStop-" + stop["Abbreviation"],
"objectType": "LuasStop", "objectType": "LuasStop",
"timestamp": timestamp, "timestamp": timestamp,
"latitude": str(stop["Latitude"]), "latitude": str(stop["Latitude"]),
"longitude": str(stop["Longitude"]), "longitude": str(stop["Longitude"]),
"luasStopName": stop["Name"], "luasStopName": stop["Name"],
"luasStopIrishName": stop["IrishName"], "luasStopIrishName": stop["IrishName"],
"luasStopID": str(stop["StopID"]), "luasStopID": str(stop["StopID"]),
@ -81,13 +102,32 @@ def fetch_luas():
"luasStopZoneCountB": str(stop["ZoneCountB"]), "luasStopZoneCountB": str(stop["ZoneCountB"]),
"luasStopMessage": str(trams_json["stopInfo"]["message"]), "luasStopMessage": str(trams_json["stopInfo"]["message"]),
"luasStopTrams": str(trams_json["stopInfo"]["direction"]) "luasStopTrams": str(trams_json["stopInfo"]["direction"])
}) }
with ThreadPoolExecutor() as executor:
stops = list(executor.map(fetch_forecast, tsv_reader))
return stops return stops
def fetch_bus_routes():
"""
Fetches bus route data from the permanent data API.
Returns:
list: A list of dictionaries containing bus route data.
"""
permanent_data_api = os.environ["PERMANENT_DATA_API"]
response = session.get(permanent_data_api + "?objectType=BusRoute")
response.raise_for_status()
return response.json()
# function to fetch bus data
def fetch_buses(): def fetch_buses():
"""
Fetches bus data from the National Transport API.
Returns:
list: A list of dictionaries containing bus data.
"""
print("Fetching bus data.") print("Fetching bus data.")
buses = [] buses = []
api_url = "https://api.nationaltransport.ie/gtfsr/v2/Vehicles?format=json" api_url = "https://api.nationaltransport.ie/gtfsr/v2/Vehicles?format=json"
@ -96,7 +136,7 @@ def fetch_buses():
"x-api-key": os.getenv("GTFS_KEY") "x-api-key": os.getenv("GTFS_KEY")
} }
response = requests.get(api_url, headers=headers) response = session.get(api_url, headers=headers)
response.raise_for_status() response.raise_for_status()
buses_json = response.json() buses_json = response.json()
@ -105,59 +145,83 @@ def fetch_buses():
for bus in buses_json["entity"]: for bus in buses_json["entity"]:
busRouteID = str(bus["vehicle"]["trip"]["route_id"]) busRouteID = str(bus["vehicle"]["trip"]["route_id"])
route_info = bus_routes_hashmap.get(busRouteID, {})
buses.append({ buses.append({
"objectID": "Bus-" + bus["id"], "objectID": "Bus-" + bus["id"],
"objectType": "Bus", "objectType": "Bus",
"timestamp": timestamp, "timestamp": timestamp,
"latitude": str(bus["vehicle"]["position"]["latitude"]), "latitude": str(bus["vehicle"]["position"]["latitude"]),
"longitude": str(bus["vehicle"]["position"]["longitude"]), "longitude": str(bus["vehicle"]["position"]["longitude"]),
"busID": str(bus["id"]), "busID": str(bus["id"]),
"busTripID": str(bus["vehicle"]["trip"]["trip_id"]), "busTripID": str(bus["vehicle"]["trip"]["trip_id"]),
"busStartTime": str(bus["vehicle"]["trip"]["start_time"]), "busStartTime": str(bus["vehicle"]["trip"]["start_time"]),
"busStartDate": str(bus["vehicle"]["trip"]["start_date"]), "busStartDate": str(bus["vehicle"]["trip"]["start_date"]),
"busScheduleRelationship": str(bus["vehicle"]["trip"]["schedule_relationship"]), "busScheduleRelationship": str(bus["vehicle"]["trip"]["schedule_relationship"]),
"busRoute": busRouteID, "busRoute": busRouteID,
"busRouteAgencyName": str(bus_routes_hashmap[busRouteID]["busRouteAgencyName"]), "busRouteAgencyName": route_info.get("busRouteAgencyName", ""),
"busRouteLongName": str(bus_routes_hashmap[busRouteID]["busRouteLongName"]), "busRouteLongName": route_info.get("busRouteLongName", ""),
"busRouteShortName": str(bus_routes_hashmap[busRouteID]["busRouteShortName"]), "busRouteShortName": route_info.get("busRouteShortName", ""),
"busDirection": str(bus["vehicle"]["trip"]["direction_id"]), "busDirection": str(bus["vehicle"]["trip"]["direction_id"]),
}) })
return buses return buses
# function to fetch bus route data def batch_upload_to_dynamodb(data):
def fetch_bus_routes(): """
permanent_data_api = os.environ["PERMANENT_DATA_API"] Uploads data to DynamoDB in batches.
routes = requests.get(permanent_data_api + "?objectType=BusRoute").json()
return routes
Args:
data (list): A list of dictionaries containing data to be uploaded.
"""
with table.batch_writer() as batch:
for item in data:
batch.put_item(Item=item)
def lambda_handler(event, context): def lambda_handler(event, context):
"""
AWS Lambda handler function to fetch and upload data.
Args:
event (dict): Event data passed to the Lambda function.
context (object): Runtime information of the Lambda function.
Returns:
dict: A dictionary containing the status code and message.
"""
print("Lambda handler triggered; fetching data.") print("Lambda handler triggered; fetching data.")
data = fetch_trains() + fetch_buses() with ThreadPoolExecutor() as executor:
print("Data retrieved successfully.") futures = [
executor.submit(fetch_trains),
executor.submit(fetch_luas),
executor.submit(fetch_buses)
]
data = []
for future in futures:
data.extend(future.result())
table_name = os.environ.get("DYNAMODB_TABLE", "transient_data") print(f"Retrieved {len(data)} records.")
table = dynamodb.Table(table_name) print("Uploading to DynamoDB...")
chunk_size = 25
for i in range(0, len(data), chunk_size):
batch_upload_to_dynamodb(data[i:i + chunk_size])
print("Upload completed.")
print("Attempting to batch upload retrieved data to DynamoDB.") return {
'statusCode': 200,
'body': json.dumps({'message': 'Data uploaded successfully!'})
}
try: if __name__ == "__main__":
with table.batch_writer() as batch: """
for record in data: Main function to fetch and print data locally.
batch.put_item(Item=record) """
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(fetch_trains),
executor.submit(fetch_buses)
]
data = []
for future in futures:
data.extend(future.result())
print("Completed data upload.") print(json.dumps(data))
return {
'statusCode': 200,
'body': json.dumps({'message': 'Data inserted successfully!'})
}
except Exception as e:
return {"statusCode": 500, "error": str(e)}
lambda_handler("event", "context")