[server]: Optimise permanent_data.py
This commit is contained in:
@ -6,23 +6,31 @@ import zipfile
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import boto3
|
import boto3
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
|
# Create a reusable session for requests
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
# Setup DynamoDB client for Lambda
|
||||||
dynamodb = boto3.resource("dynamodb")
|
dynamodb = boto3.resource("dynamodb")
|
||||||
|
table_name = os.environ.get("DYNAMODB_TABLE", "permanent_data")
|
||||||
|
table = dynamodb.Table(table_name)
|
||||||
|
|
||||||
# API URLs
|
|
||||||
irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/"
|
irishrail_url = "http://api.irishrail.ie/realtime/realtime.asmx/"
|
||||||
|
|
||||||
# function to fetch Irish Rail station data with types
|
|
||||||
# this function seems to be missing stations -- the API must have some uncategorised stations that it won't return
|
|
||||||
# unfortunately, this is the only way to categorise stations as the API won't return the station's category
|
|
||||||
def fetch_train_stations_with_type():
|
def fetch_train_stations_with_type():
|
||||||
api_function = "getAllStationsXML_WithStationType?StationType="
|
"""
|
||||||
|
Fetch train stations from the Irish Rail API with specific station types.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of dictionaries containing train station data with types.
|
||||||
|
"""
|
||||||
station_types = ["M", "S", "D"]
|
station_types = ["M", "S", "D"]
|
||||||
stations = []
|
stations = []
|
||||||
|
|
||||||
for station_type in station_types:
|
for station_type in station_types:
|
||||||
stations_xml = requests.get(irishrail_url + api_function + station_type).text
|
response = session.get(irishrail_url + f"getAllStationsXML_WithStationType?StationType={station_type}")
|
||||||
stations_json = json.loads(json.dumps(xmltodict.parse(stations_xml)))
|
stations_xml = response.text
|
||||||
|
stations_json = xmltodict.parse(stations_xml)
|
||||||
|
|
||||||
for station in stations_json["ArrayOfObjStation"]["objStation"]:
|
for station in stations_json["ArrayOfObjStation"]["objStation"]:
|
||||||
stations.append({
|
stations.append({
|
||||||
@ -30,169 +38,174 @@ def fetch_train_stations_with_type():
|
|||||||
"objectType": "IrishRailStation",
|
"objectType": "IrishRailStation",
|
||||||
"latitude": station["StationLatitude"],
|
"latitude": station["StationLatitude"],
|
||||||
"longitude": station["StationLongitude"],
|
"longitude": station["StationLongitude"],
|
||||||
|
|
||||||
"trainStationID": station["StationId"],
|
"trainStationID": station["StationId"],
|
||||||
"trainStationCode": station["StationCode"],
|
"trainStationCode": station["StationCode"],
|
||||||
"trainStationAlias": station["StationAlias"],
|
"trainStationAlias": station.get("StationAlias", ""),
|
||||||
"trainStationDesc": station["StationDesc"],
|
"trainStationDesc": station["StationDesc"],
|
||||||
"trainStationType": station_type
|
"trainStationType": station_type
|
||||||
})
|
})
|
||||||
|
|
||||||
return stations
|
return stations
|
||||||
|
|
||||||
# function to fetch Irish Rail station data without types
|
|
||||||
def fetch_train_stations():
|
def fetch_train_stations():
|
||||||
api_function = "getAllStationsXML"
|
"""
|
||||||
stations = []
|
Fetch all train stations from the Irish Rail API.
|
||||||
|
|
||||||
stations_xml = requests.get(irishrail_url + api_function).text
|
|
||||||
stations_json = json.loads(json.dumps(xmltodict.parse(stations_xml)))
|
|
||||||
|
|
||||||
for station in stations_json["ArrayOfObjStation"]["objStation"]:
|
|
||||||
stations.append({
|
|
||||||
"objectID": "IrishRailStation-" + station["StationCode"],
|
|
||||||
"objectType": "IrishRailStation",
|
|
||||||
"latitude": station["StationLatitude"],
|
|
||||||
"longitude": station["StationLongitude"],
|
|
||||||
|
|
||||||
"trainStationID": station["StationId"],
|
|
||||||
"trainStationCode": station["StationCode"],
|
|
||||||
"trainStationAlias": station["StationAlias"],
|
|
||||||
"trainStationDesc": station["StationDesc"],
|
|
||||||
})
|
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of dictionaries containing train station data.
|
||||||
|
"""
|
||||||
|
response = session.get(irishrail_url + "getAllStationsXML")
|
||||||
|
stations_xml = response.text
|
||||||
|
stations_json = xmltodict.parse(stations_xml)
|
||||||
|
stations = [{
|
||||||
|
"objectID": "IrishRailStation-" + station["StationCode"],
|
||||||
|
"objectType": "IrishRailStation",
|
||||||
|
"latitude": station["StationLatitude"],
|
||||||
|
"longitude": station["StationLongitude"],
|
||||||
|
"trainStationID": station["StationId"],
|
||||||
|
"trainStationCode": station["StationCode"],
|
||||||
|
"trainStationAlias": station.get("StationAlias", ""),
|
||||||
|
"trainStationDesc": station["StationDesc"]
|
||||||
|
} for station in stations_json["ArrayOfObjStation"]["objStation"]]
|
||||||
return stations
|
return stations
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# function to fetch Luas stops data
|
|
||||||
def fetch_luas():
|
def fetch_luas():
|
||||||
stops = []
|
"""
|
||||||
|
Fetch Luas stops from the TII dataset.
|
||||||
|
|
||||||
stops_tsv = requests.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt").content.decode('utf-8-sig')
|
Returns:
|
||||||
|
list: A list of dictionaries containing Luas stop data.
|
||||||
|
"""
|
||||||
|
response = session.get("https://data.tii.ie/Datasets/Luas/StopLocations/luas-stops.txt")
|
||||||
|
stops_tsv = response.content.decode('utf-8-sig')
|
||||||
tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t")
|
tsv_reader = csv.DictReader(stops_tsv.splitlines(), delimiter="\t")
|
||||||
stops_json = [row for row in tsv_reader]
|
stops = [{
|
||||||
|
"objectID": "LuasStop-" + stop["Abbreviation"],
|
||||||
for stop in stops_json:
|
"objectType": "LuasStop",
|
||||||
stops.append({
|
"latitude": stop["Latitude"],
|
||||||
"objectID": "LuasStop-" + stop["Abbreviation"],
|
"longitude": stop["Longitude"],
|
||||||
"objectType": "LuasStop",
|
"luasStopName": stop["Name"],
|
||||||
"latitude": stop["Latitude"],
|
"luasStopIrishName": stop["IrishName"],
|
||||||
"longitude": stop["Longitude"],
|
"luasStopID": stop["StopID"],
|
||||||
|
"luasStopCode": stop["Abbreviation"],
|
||||||
"luasStopName": stop["Name"],
|
"luasStopLineID": stop["LineID"],
|
||||||
"luasStopIrishName": stop["IrishName"],
|
"luasStopSortOrder": stop["SortOrder"],
|
||||||
"luasStopID": stop["StopID"],
|
"luasStopIsEnabled": stop["IsEnabled"],
|
||||||
"luasStopCode": stop["Abbreviation"],
|
"luasStopIsParkAndRide": stop["IsParkAndRide"],
|
||||||
"luasStopLineID": stop["LineID"],
|
"luasStopIsCycleAndRide": stop["IsCycleAndRide"],
|
||||||
"luasStopSortOrder": stop["SortOrder"],
|
"luasStopZoneCountA": stop["ZoneCountA"],
|
||||||
"luasStopIsEnabled": stop["IsEnabled"],
|
"luasStopZoneCountB": stop["ZoneCountB"]
|
||||||
"luasStopIsParkAndRide": stop["IsParkAndRide"],
|
} for stop in tsv_reader]
|
||||||
"luasStopIsCycleAndRide": stop["IsCycleAndRide"],
|
|
||||||
"luasStopZoneCountA": stop["ZoneCountA"],
|
|
||||||
"luasStopZoneCountB": stop["ZoneCountB"],
|
|
||||||
})
|
|
||||||
|
|
||||||
return stops
|
return stops
|
||||||
|
|
||||||
|
|
||||||
def fetch_gtfs():
|
def fetch_gtfs():
|
||||||
data = []
|
"""
|
||||||
|
Fetch GTFS data from the Transport for Ireland dataset.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of dictionaries containing GTFS data.
|
||||||
|
"""
|
||||||
url = "https://www.transportforireland.ie/transitData/Data/GTFS_All.zip"
|
url = "https://www.transportforireland.ie/transitData/Data/GTFS_All.zip"
|
||||||
zip_file = requests.get(url).content
|
zip_file = session.get(url).content
|
||||||
|
data = []
|
||||||
|
|
||||||
with zipfile.ZipFile(io.BytesIO(zip_file)) as zip:
|
with zipfile.ZipFile(io.BytesIO(zip_file)) as zip:
|
||||||
# will need to access the list of agencies for later objects, so keeping separate
|
|
||||||
agencies = []
|
|
||||||
|
|
||||||
# extract agencies data
|
|
||||||
if "agency.txt" in zip.namelist():
|
if "agency.txt" in zip.namelist():
|
||||||
with zip.open("agency.txt") as file:
|
with zip.open("agency.txt") as file:
|
||||||
agencies_csv = file.read().decode('utf-8')
|
agencies_csv = file.read().decode('utf-8')
|
||||||
csv_reader = csv.DictReader(agencies_csv.splitlines(), delimiter=",")
|
agencies = [{
|
||||||
agencies_json = [row for row in csv_reader]
|
"objectID": "BusAgency" + agency["agency_id"],
|
||||||
|
"objectType": "BusAgency",
|
||||||
|
"busAgencyID": agency["agency_id"],
|
||||||
|
"busAgencyName": agency["agency_name"],
|
||||||
|
"busAgencyURL": agency["agency_url"]
|
||||||
|
} for agency in csv.DictReader(agencies_csv.splitlines())]
|
||||||
|
data.extend(agencies)
|
||||||
|
|
||||||
for agency in agencies_json:
|
|
||||||
agencies.append({
|
|
||||||
"objectID": "BusAgency" + agency["agency_id"],
|
|
||||||
"objectType": "BusAgency",
|
|
||||||
# no latitude or longitude
|
|
||||||
|
|
||||||
"busAgencyID": agency["agency_id"],
|
|
||||||
"busAgencyName": agency["agency_name"],
|
|
||||||
"busAgencyURL": agency["agency_url"]
|
|
||||||
})
|
|
||||||
|
|
||||||
data += agencies
|
|
||||||
|
|
||||||
# extract routes data
|
|
||||||
if "routes.txt" in zip.namelist():
|
if "routes.txt" in zip.namelist():
|
||||||
with zip.open("routes.txt") as file:
|
with zip.open("routes.txt") as file:
|
||||||
routes_csv = file.read().decode('utf-8')
|
routes_csv = file.read().decode('utf-8')
|
||||||
csv_reader = csv.DictReader(routes_csv.splitlines(), delimiter=",")
|
data.extend([{
|
||||||
routes_json = [row for row in csv_reader]
|
"objectID": "BusRoute-" + route["route_id"],
|
||||||
|
"objectType": "BusRoute",
|
||||||
|
"busRouteID": route["route_id"],
|
||||||
|
"busRouteAgencyID": route["agency_id"],
|
||||||
|
"busRouteShortName": route["route_short_name"],
|
||||||
|
"busRouteLongName": route["route_long_name"],
|
||||||
|
"busRouteAgencyName": next((agency['busAgencyName'] for agency in data if agency['busAgencyID'] == route["agency_id"]), None)
|
||||||
|
} for route in csv.DictReader(routes_csv.splitlines())])
|
||||||
|
|
||||||
for route in routes_json:
|
|
||||||
data.append({
|
|
||||||
"objectID": "BusRoute-" + route["route_id"],
|
|
||||||
"objectType": "BusRoute",
|
|
||||||
# no latitude or longitude
|
|
||||||
|
|
||||||
"busRouteID": route["route_id"],
|
|
||||||
"busRouteAgencyID": route["agency_id"],
|
|
||||||
"busRouteAgencyName": next((agency['busAgencyName'] for agency in agencies if agency['busAgencyID'] == route["agency_id"]), None),
|
|
||||||
"busRouteShortName": route["route_short_name"],
|
|
||||||
"busRouteLongName": route["route_long_name"]
|
|
||||||
})
|
|
||||||
|
|
||||||
# extract stops data
|
|
||||||
if "stops.txt" in zip.namelist():
|
if "stops.txt" in zip.namelist():
|
||||||
with zip.open("stops.txt") as file:
|
with zip.open("stops.txt") as file:
|
||||||
stops_csv = file.read().decode('utf-8')
|
stops_csv = file.read().decode('utf-8')
|
||||||
csv_reader = csv.DictReader(stops_csv.splitlines(), delimiter=",")
|
data.extend([{
|
||||||
stops_json = [row for row in csv_reader]
|
"objectID": "BusStop-" + stop["stop_id"],
|
||||||
|
"objectType": "BusStop",
|
||||||
for stop in stops_json:
|
"latitude": stop["stop_lat"],
|
||||||
data.append({
|
"longitude": stop["stop_lon"],
|
||||||
"objectID": "BusStop-" + stop["stop_id"],
|
"busStopID": stop["stop_id"],
|
||||||
"objectType": "BusStop",
|
"busStopCode": stop.get("stop_code", ""),
|
||||||
"latitude": stop["stop_lat"],
|
"busStopName": stop["stop_name"]
|
||||||
"longitude": stop["stop_lon"],
|
} for stop in csv.DictReader(stops_csv.splitlines())])
|
||||||
|
|
||||||
"busStopID": stop["stop_id"],
|
|
||||||
"busStopCode": stop["stop_code"],
|
|
||||||
"busStopName": stop["stop_name"]
|
|
||||||
})
|
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def batch_upload_to_dynamodb(data):
|
||||||
|
"""
|
||||||
|
Batch upload data to DynamoDB.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (list): A list of dictionaries containing data to be uploaded.
|
||||||
|
"""
|
||||||
|
with table.batch_writer() as batch:
|
||||||
|
for item in data:
|
||||||
|
batch.put_item(Item=item)
|
||||||
|
|
||||||
def lambda_handler(event, context):
|
def lambda_handler(event, context):
|
||||||
|
"""
|
||||||
|
AWS Lambda handler to fetch data and upload it to DynamoDB.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (dict): Event data passed to the Lambda function.
|
||||||
|
context (object): Runtime information of the Lambda function.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A dictionary containing the status code and message.
|
||||||
|
"""
|
||||||
print("Lambda Handler invoked! Retrieving data...")
|
print("Lambda Handler invoked! Retrieving data...")
|
||||||
data = fetch_train_stations() + fetch_luas() + fetch_gtfs()
|
|
||||||
print("Data retrieved successfully")
|
|
||||||
|
|
||||||
table_name = os.environ.get("DYNAMODB_TABLE", "permanent_data")
|
with ThreadPoolExecutor() as executor:
|
||||||
table = dynamodb.Table(table_name)
|
futures = [
|
||||||
|
executor.submit(fetch_train_stations),
|
||||||
|
executor.submit(fetch_luas),
|
||||||
|
executor.submit(fetch_gtfs)
|
||||||
|
]
|
||||||
|
data = []
|
||||||
|
for future in futures:
|
||||||
|
data.extend(future.result())
|
||||||
|
|
||||||
print("Attempting to batch upload retrieved data")
|
print(f"Retrieved {len(data)} records.")
|
||||||
|
print("Uploading to DynamoDB...")
|
||||||
|
chunk_size = 25
|
||||||
|
for i in range(0, len(data), chunk_size):
|
||||||
|
batch_upload_to_dynamodb(data[i:i + chunk_size])
|
||||||
|
print("Upload completed.")
|
||||||
|
|
||||||
try:
|
return {
|
||||||
with table.batch_writer() as batch:
|
'statusCode': 200,
|
||||||
for record in data:
|
'body': json.dumps({'message': 'Data uploaded successfully!'})
|
||||||
batch.put_item(Item=record)
|
}
|
||||||
|
|
||||||
print("done uploading")
|
if __name__ == "__main__":
|
||||||
|
"""
|
||||||
|
Main function to fetch data and print it locally.
|
||||||
|
"""
|
||||||
|
with ThreadPoolExecutor() as executor:
|
||||||
|
futures = [
|
||||||
|
executor.submit(fetch_train_stations),
|
||||||
|
executor.submit(fetch_luas),
|
||||||
|
executor.submit(fetch_gtfs)
|
||||||
|
]
|
||||||
|
data = []
|
||||||
|
for future in futures:
|
||||||
|
data.extend(future.result())
|
||||||
|
|
||||||
return {
|
|
||||||
'statusCode': 200,
|
|
||||||
'body': json.dumps({'message': 'Data inserted successfully!'})
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return {"statusCode": 500, "error": str(e)}
|
|
||||||
|
|
||||||
|
|
||||||
if "__main__" == __name__:
|
|
||||||
data = fetch_train_stations() + fetch_luas() + fetch_gtfs()
|
|
||||||
print(json.dumps(data))
|
print(json.dumps(data))
|
Reference in New Issue
Block a user