# package(s) related to time, space and id
import bisect
import scipy as sc
from shapely.geometry import Point, LineString, MultiLineString
from shapely.ops import linemerge
import shapely
import pandas as pd
from matplotlib import pyplot as plt, dates
import math
import networkx as nx
import datetime
import pytz
import pyproj
# mathematical packages
import bisect
import math
import scipy as sc
# packages for data handling
import numpy as np
import pandas as pd
import xarray as xr
import pickle
# import netCDF4
from shapely import reverse
from shapely.ops import transform
from opentnsim import core
from opentnsim import model
from opentnsim.graph.mixins import get_length_of_edge, get_geometry_of_edge, determine_length_of_edge_geometry
from opentnsim.graph import mixins
from opentnsim.vessel_traffic_service.hydrodanamic_data_manager import HydrodynamicDataManager
# spatial libraries
import networkx as nx
import shapely
import shapely.ops
from shapely.geometry import MultiPolygon, Point, Polygon
[docs]
class VesselTrafficService(mixins.HasMultiDiGraph):
"""Class: a collection of functions that processes requests of vessels regarding the nautical processes on ow to enter the port safely"""
def __init__(
self,
graph,
crs_m = "EPSG:4087",
hydrodynamic_information_path=None,
vessel_speed_information_path=None,
hydrodynamic_information=None,
vessel_speed_information=None,
hydrodynamic_start_time=pd.Timedelta(seconds=0),
*args,
**kwargs,
):
self.crs_m = crs_m
self.hydrodynamic_information = hydrodynamic_information
self.hydrodynamic_information_path = hydrodynamic_information_path
self.graph = graph
if isinstance(hydrodynamic_information, xr.Dataset):
self.hydrodynamic_information_path = False
if isinstance(vessel_speed_information, xr.Dataset):
self.vessel_speeds = vessel_speed_information
global vertical_tidal_restrictions_condition_df
self.vertical_tidal_restrictions_condition_df = pd.DataFrame()
global horizontal_tidal_restrictions_condition_df
self.horizontal_tidal_restrictions_condition_df = pd.DataFrame()
global restricted_vessel_speeds
self.restricted_vessel_speeds = pd.DataFrame()
global edges_info
self.edges_info = self.get_edges_info()
for edge in self.graph.edges:
length = determine_length_of_edge_geometry(graph, edge, crs_meter = self.crs_m)
self.graph.edges[edge]["length_m"] = length
for node in graph.nodes:
node_info = graph.nodes[node]
if 'Horizontal tidal restriction' in node_info.keys():
specification_df = graph.nodes[node]['Horizontal tidal restriction']['Specifications']
specification_df['Node'] = node
self.horizontal_tidal_restrictions_condition_df = pd.concat(
[self.horizontal_tidal_restrictions_condition_df, specification_df]
)
if 'Vertical tidal restriction' in node_info.keys():
specification_df = graph.nodes[node]['Vertical tidal restriction']['Specifications']
specification_df['Node'] = node
self.vertical_tidal_restrictions_condition_df = pd.concat(
[self.vertical_tidal_restrictions_condition_df, specification_df]
)
self.horizontal_tidal_restrictions_condition_df = self.horizontal_tidal_restrictions_condition_df.reset_index(drop=True)
self.vertical_tidal_restrictions_condition_df = self.vertical_tidal_restrictions_condition_df.reset_index(drop=True)
if self.hydrodynamic_information_path is not None:
hydro_manager = HydrodynamicDataManager()
if isinstance(hydrodynamic_information_path,str):
hydro_manager.hydrodynamic_data = netCDF4.Dataset(self.hydrodynamic_information_path)
else:
hydro_manager.hydrodynamic_data = hydrodynamic_information
self.hydrodynamic_start_time = hydrodynamic_start_time
if isinstance(hydrodynamic_information_path, str):
self.hydrodynamic_times = hydro_manager.hydrodynamic_times = (
hydro_manager.hydrodynamic_data["TIME"][:].data.astype("timedelta64[m]") + hydrodynamic_start_time
)
else:
self.hydrodynamic_times = hydro_manager.hydrodynamic_data["TIME"][:]
if isinstance(vessel_speed_information_path, str):
with open(vessel_speed_information_path, "rb") as file:
self.restricted_vessel_speeds = pickle.load(file)
super().__init__(*args, **kwargs)
[docs]
def get_edges_info(self):
graph = self.multidigraph
edges_info = pd.DataFrame(columns=["Edge", "Distance", "MBL"])
for edge in graph.edges:
edge_info = graph.edges[edge]
index = len(edges_info)
edges_info.loc[index, "Edge"] = edge
edges_info.loc[index, "Distance"] = get_length_of_edge(graph, edge)
if "MBL" in edge_info.keys():
edges_info.loc[index, "MBL"] = edge_info["MBL"]
else:
edges_info.loc[index, "MBL"] = 999.0
return edges_info.set_index("Edge")
[docs]
def read_tidal_periods(self,hydrodynamic_data,tidal_period_type,station_index):
if 'tidal_period_type' not in hydrodynamic_data.variables:
return None
data = hydrodynamic_data[tidal_period_type][station_index, :, :]
for tide_index, (time_start, tide) in enumerate(data):
if time_start == 'nan':
new_time = 'NaT'
else:
new_time = time_start
data[tide_index] = (np.datetime64(new_time), tide)
return data
[docs]
def provide_waiting_time_for_inbound_tidal_window(self, vessel, route, time_start=None, time_stop=None, delay=0, plot=False):
"""Function: calculates the time that a vessel has to wait depending on the available tidal windows
Input:
- vessel: an identity which is Identifiable, Movable, and Routeable, and has VesselProperties
- route: a list of strings that resemble the route of the vessel (can be different than the vessel.route)
- delay: a delay that can be included to calculate a future situation
- plot: bool that specifies if a plot is requested or not
"""
# Create sub-routes based on anchorage areas on the route
if not time_start:
time_start = pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now)).to_datetime64()
if not time_stop:
time_stop = pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now + vessel.metadata['max_waiting_time'])).to_datetime64()
_,tidal_windows = self.provide_tidal_windows(vessel,route,time_start,time_stop,delay,plot=plot)
waiting_time = pd.Timedelta(vessel.metadata["max_waiting_time"], "s")
for window in tidal_windows:
if time_start > window[1]:
continue
if time_start >= window[0]:
waiting_time = pd.Timedelta(0, "s")
else:
waiting_time = window[0] - time_start
break
waiting_time = waiting_time.total_seconds()
return waiting_time
[docs]
def provide_waiting_time_for_outbound_tidal_window(self,vessel,route,delay=0,plot=False):
vessel.bound = 'outbound'
vessel._T += vessel.metadata['loading'][0]
waiting_time = self.provide_waiting_time_for_inbound_tidal_window(vessel,route=route,delay=delay, plot=plot)
vessel._T -= vessel.metadata['loading'][0]
vessel.bound = 'inbound'
return waiting_time
[docs]
def provide_speed_over_edge(self,vessel,edge):
v = vessel.v
restricted_vessel_speeds_edge = self.restricted_vessel_speeds[self.restricted_vessel_speeds.index.isin([edge])]
if not restricted_vessel_speeds_edge.empty:
v = restricted_vessel_speeds_edge.Speed.iloc[0]
if math.isnan(v):
v = vessel.v
if 'restricted_speed' in dir(vessel):
v = vessel.restricted_speed
return v
[docs]
def provide_speed_over_route(self,vessel,route,edges=[]):
"""
Provides the speed along a vessel's route
Parameters
----------
vessel : type
a type including the following parent-classes: Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
route : list of str
str resemble node names that have to be in the graph
edges : list of tuple
tuples resemble edges with: a start_node [u] as str, end_node (v) as str, and identifier (k) as int
Returns
vessel_speed_over_route : pd.DataFrame
vessel speed per edge of the route
-------
"""
# get edges of route
if not edges:
for idx, (u, v) in enumerate(zip(route[:-1], route[1:])):
k = sorted(self.multidigraph[u][v], key=lambda x: get_length_of_edge(self.multidigraph,(u, v, x)))[0]
edges.append((u,v,k))
# construct dataframe of speed information per edge
vessel_speed_over_route = pd.DataFrame(columns=['Speed'],index=edges)
# predefined speed restrictions should be added to the dataframe
if not self.restricted_vessel_speeds.empty:
vessel_speed_over_route = self.restricted_vessel_speeds[self.restricted_vessel_speeds.index.isin(edges)]
vessel_speed_over_route = vessel_speed_over_route.reindex(edges)
# missing speeds should be replaced with the vessel target speed
vessel_speed_over_route[vessel_speed_over_route.Speed.isna() | (vessel_speed_over_route.Speed == 0)] = vessel.v
# speeds overruled by lock masters should be obeyed to
if 'overruled_speed' in dir(vessel):
for edge,overruled_speed_limit in vessel.overruled_speed.iterrows():
vessel_speed_over_route.loc[edge,'Speed'] = overruled_speed_limit
return vessel_speed_over_route
[docs]
def provide_heading(self, vessel, edge):
distance = []
origin_location = vessel.multidigraph.nodes[edge[0]]["geometry"]
k = sorted(vessel.multidigraph[edge[0]][edge[1]], key=lambda x: get_length_of_edge(self.multidigraph,(edge[0], edge[1], x)))[0]
edge_geometry = vessel.multidigraph.edges[edge[0], edge[1], k]["Info"]["geometry"]
for coord in edge_geometry.coords:
distance.append(origin_location.distance(Point(coord)))
if np.argmin(distance):
edge_geometry = shapely.ops.transform(self.reverse_geometry, edge_geometry)
heading = np.degrees(
math.atan2(
edge_geometry.coords[0][0] - edge_geometry.coords[-1][0], edge_geometry.coords[0][1] - edge_geometry.coords[-1][1]
)
)
return heading
[docs]
def reverse_geometry(self, geometry):
reversed_geometry = reverse(geometry)
return reversed_geometry
[docs]
def provide_trajectory(self,node_1,node_2):
geometry = None
route = nx.dijkstra_path(self.multidigraph, node_1, node_2)
for node_I, node_II in zip(route[:-1], route[1:]):
k = sorted(self.multidigraph[node_I][node_II], key=lambda x: get_length_of_edge(self.multidigraph,(node_I, node_II, x)))[0]
edge = (node_I, node_II, k)
edge_geometry = self.multidigraph.edges[edge]['geometry']
aligned = self.check_if_geometry_is_aligned_with_edge(edge)
if not aligned:
edge_geometry = self.reverse_geometry(edge_geometry)
if geometry:
geometry = shapely.ops.linemerge(MultiLineString([geometry, edge_geometry]))
else:
geometry = edge_geometry
return geometry
[docs]
def provide_distance_over_network_to_location(self,node_1,node_2,location,tolerance=0.0001):
geod = pyproj.Geod(ellps="WGS84")
geometry = self.provide_trajectory(node_1,node_2)
geometries = shapely.ops.split(shapely.ops.snap(geometry, location, tolerance=tolerance), location).geoms
distance_sailed = 0
distance_to_go = 0
if len(geometries) < 2:
if self.multidigraph.nodes[node_1]['geometry'] == location:
distance_to_go = geod.geometry_length(geometries[0])
elif self.multidigraph.nodes[node_2]['geometry'] == location:
distance_sailed = geod.geometry_length(geometries[0])
elif self.multidigraph.nodes[node_1]['geometry'].distance(location) > self.multidigraph.nodes[node_2]['geometry'].distance(location):
distance_sailed = geod.geometry_length(geometries[0])
else:
distance_to_go = geod.geometry_length(geometries[0])
else:
distance_sailed = geod.geometry_length(geometries[0])
distance_to_go = geod.geometry_length(geometries[1])
return distance_sailed,distance_to_go
[docs]
def check_if_geometry_is_aligned_with_edge(self, edge):
node_start = edge[0]
node_stop = edge[1]
edge_geometry = get_geometry_of_edge(self.multidigraph, edge)
first_point = Point(edge_geometry.coords[0])
distance_to_edge_nodes = {}
for node in [node_start, node_stop]:
node_geometry = self.multidigraph.nodes[node]["geometry"]
distance_to_edge_nodes[node] = first_point.distance(node_geometry)
closest_node = min(distance_to_edge_nodes, key=distance_to_edge_nodes.get)
aligned = closest_node == node_start
return aligned
[docs]
def get_closest_location_on_edge_to_point(self, graph, edge, point):
edge_geometry = graph.edges[edge]["geometry"]
point_on_edge = edge_geometry.interpolate(edge_geometry.project(point))
return point_on_edge
[docs]
def provide_location_over_edges(self,node_1,node_2,interpolation_length):
geometry = self.provide_trajectory(node_1, node_2)
if geometry is None or geometry.is_empty:
return None
geometry_m = self.transform_geometry(geometry, epsg_out = self.crs_m)
interpolation_point_m = geometry_m.line_interpolate_point(interpolation_length)
interpolation_point = self.transform_geometry(interpolation_point_m, epsg_in = self.crs_m, epsg_out = "EPSG:4326")
return interpolation_point
[docs]
def provide_distance_from_location_over_edge(self,edge,location,tolerance=0.0001):
geod = pyproj.Geod(ellps="WGS84")
if len(edge) == 2:
k = sorted(self.multidigraph[edge[0]][edge[1]],key=lambda x: get_length_of_edge(self.multidigraph,(edge[0], edge[1], x)))[0]
edge = (edge[0],edge[1],k)
geometry = self.multidigraph.edges[(edge[0],edge[1],edge[2])]['geometry']
distance_sailed = 0
distance_to_go = 0
if geometry.coords[0] == location.coords[0]:
distance_to_go = self.multidigraph.edges[(edge[0],edge[1],edge[2])]['length']
elif geometry.coords[-1] == location.coords[0]:
distance_sailed = self.multidigraph.edges[(edge[0],edge[1],edge[2])]['length']
else:
lines = shapely.ops.split(shapely.ops.snap(geometry, location, tolerance), location).geoms
for index, line in enumerate(lines):
distance = 0
for point_I, point_II in zip(line.coords[:-1], line.coords[1:]):
sub_edge_geometry = LineString([Point(point_I), Point(point_II)])
distance += geod.geometry_length(sub_edge_geometry)
if not index:
distance_sailed = distance
else:
distance_to_go = distance
return distance_sailed, distance_to_go
[docs]
def provide_edge_by_distance_from_node(self,env,node_1,node_2,distance):
route = nx.dijkstra_path(self.multidigraph, node_1, node_2)
total_length = 0
for node_I, node_II in zip(route[:-1], route[1:]):
k = sorted(self.multidigraph[node_I][node_II],key=lambda x: get_length_of_edge(self.multidigraph,(node_I,node_II,x)))[0]
edge_length = self.multidigraph.edges[node_I,node_II,k]['length']
total_length += edge_length
if total_length < distance:
continue
break
return (node_I,node_II,k)
[docs]
def provide_sailing_distance(self,edge,k=None):
"""
Calculates sailing distance of edge
Parameters
----------
edge : tuple
tuple resembles an edge with: a start_node [u] as str, end_node (v) as str
k : int [optional]
identifier of the edge (used in networkx MultiDiGraph to distinguish between edges between the same pair of nodes)
Returns
-------
sailing_distance : float
sailing distance along the edge in [m]
"""
if k is None:
k = sorted(self.env.multidigraph[edge[0]][edge[1]], key=lambda x: get_length_of_edge(self.multidigraph,(edge[0],edge[1],x)))[0]
sailing_distance = self.env.multidigraph.edges[edge[0], edge[1], k]['length']
return sailing_distance
[docs]
def provide_sailing_distance_over_route(self, route, edges=None):
"""
Calculates sailing distance of a route
Parameters
----------
vessel :
a type including the following parent-classes: Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
edge : tuple
tuple resembles an edge with: a start_node [u] as str, end_node (v) as str
Returns
-------
sailing_distance_over_route : float
sailing distance along the route in [m]
"""
# get edge info
edges_info = self.edges_info
# get edges of route
if not edges:
edges = []
for idx, (u, v) in enumerate(zip(route[:-1], route[1:])):
k = sorted(self.multidigraph[u][v], key=lambda x: get_length_of_edge(self.multidigraph,(u, v, x)))[0]
edges.append((u,v,k))
# calculate sailing distance along route
sailing_distance_over_route = edges_info[edges_info.index.isin(edges)]
return sailing_distance_over_route
[docs]
def provide_sailing_time(self, vessel, route, edges=None):
"""
Calculates sailing time of vessel
Parameters
----------
vessel :
a type including the following parent-classes: Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
route : list of str
str resemble node names that have to be in the graph
edges : list of tuples
tuples resemble edges with: a start_node [u] as str, end_node (v) as str, and identifier (k) as int
Returns
-------
sailing_time_over_route : pd.DataFrame
dataframe with edges as (multi)index and the following column-information: Speed, Distance, Time
"""
# determine list of edges that the vessel passes given its route
if not edges:
edges = []
for idx, (u, v) in enumerate(zip(route[:-1], route[1:])):
k = sorted(self.multidigraph[u][v], key=lambda x: get_length_of_edge(self.multidigraph,(u, v, x)))[0]
edges.append((u,v,k))
# calculate sailing distance over route
sailing_distance_over_route_df = self.provide_sailing_distance_over_route(route, edges)
# obtain dataframe with information of sailing speed along route
sailing_information_df = self.provide_speed_over_route(vessel, route, edges)
# add distance and duraction information to the sailing information dataframe
sailing_information_df['Distance'] = sailing_distance_over_route_df['Distance']
sailing_information_df['Time'] = sailing_information_df['Distance']/sailing_information_df['Speed']
return sailing_information_df
[docs]
def provide_sailing_time_distance_on_edge_to_distance_on_another_edge(self, vessel, route, distance_sailed_on_first_edge=0., distance_sailed_on_last_edge=0., edges=None):
"""
Calculates the distance from a location along an edge A to another location along an edge B
Parameters
----------
vessel : type
a type including the following parent-classes: Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
route : list of str
str resemble node names that have to be in the graph
distance_sailed_on_first_edge : float
distance that is already covered on the edge at which the vessel is currently sailing
distance_sailed_on_last_edge : float
distance on the last edge that the vessel has to sail to reach its location of interest
edges : list of tuples
tuples resemble edges with: a start_node [u] as str, end_node (v) as str, and identifier (k) as int
Returns
-------
sailing_information_df : pd.DataFrame
dataframe with edges as (multi)index and the following column-information: Speed, Distance, Time
"""
# obtain dataframe with information of sailing speed, distance and time along route
sailing_information_df = self.provide_sailing_time(vessel=vessel, route=route, edges=edges)
# determine indexes of first and last edges
index_first_edge = pd.Index([sailing_information_df.iloc[0].name])
index_last_edge = pd.Index([sailing_information_df.iloc[-1].name])
# determine distance that must still be sailed on the current edge of the vessel
distance_to_sail_on_first_edge = (sailing_information_df.loc[index_first_edge, 'Distance']-distance_sailed_on_first_edge)
# adjust information of the sailing distance and sailing time on the first and last edges
sailing_information_df.loc[index_first_edge, 'Time'] = sailing_information_df.loc[index_first_edge, 'Time']*(distance_to_sail_on_first_edge/sailing_information_df.loc[index_first_edge, 'Distance'])
sailing_information_df.loc[index_first_edge, 'Distance'] = distance_to_sail_on_first_edge
sailing_information_df.loc[index_last_edge, 'Time'] = sailing_information_df.loc[index_last_edge, 'Time']*(distance_sailed_on_last_edge/sailing_information_df.loc[index_last_edge, 'Distance'])
sailing_information_df.loc[index_last_edge, 'Distance'] = distance_sailed_on_last_edge
return sailing_information_df
[docs]
def provide_nearest_anchorage_area(self, vessel, node):
nodes_of_anchorages = []
capacity_of_anchorages = []
users_of_anchorages = []
sailing_times_to_anchorages = []
# Loop over the nodes of the network and identify all the anchorage areas:
for node_anchorage in vessel.multidigraph.nodes:
if "Anchorage" in vessel.multidigraph.nodes[node_anchorage]:
# Determine if the anchorage area can be reached
anchorage_reachable = True
route_to_anchorage = nx.dijkstra_path(vessel.multidigraph, node, node_anchorage)
for node_on_route in route_to_anchorage:
station_index = list(HydrodynamicDataManager().hydrodynamic_data["STATION"][:]).index(node_on_route)
min_water_level = np.min(HydrodynamicDataManager().hydrodynamic_data["Water level"][:, station_index].data)
_, _, _, required_water_depth, _, MBL = self.provide_ukc_clearance(vessel,node)
if min_water_level - MBL < required_water_depth:
anchorage_reachable = False
break
if not anchorage_reachable:
continue
# Extract information over the individual anchorage areas:
# capacity, users, and the sailing distance to the anchorage area
# from the designated terminal the vessel is planning to call
nodes_of_anchorages.append(node_anchorage)
capacity_of_anchorages.append(vessel.multidigraph.nodes[node_anchorage]["Anchorage"][0].resource.capacity)
users_of_anchorages.append(len(vessel.multidigraph.nodes[node_anchorage]["Anchorage"][0].resource.users))
route_from_anchorage = nx.dijkstra_path(vessel.multidigraph, node_anchorage, vessel.route[-1])
sailing_time_to_anchorage = vessel.env.vessel_traffic_service.provide_sailing_time(vessel, route_from_anchorage)[
"Time"
].sum()
sailing_times_to_anchorages.append(sailing_time_to_anchorage)
# Sort the lists based on the sailing distance to the anchorage area from the designated terminal
# the vessel is planning to call
sorted_nodes_anchorages = [nodes for (distances, nodes) in sorted(zip(sailing_times_to_anchorages, nodes_of_anchorages))]
sorted_users_of_anchorages = [nodes for (distances, nodes) in sorted(zip(sailing_times_to_anchorages, users_of_anchorages))]
sorted_capacity_of_anchorages = [
nodes for (distances, nodes) in sorted(zip(sailing_times_to_anchorages, capacity_of_anchorages))
]
# Take the anchorage area that is closest to the designated terminal the vessel is planning to call if there
# is sufficient capacity:
node_anchorage = 0
for anchorage_index, node_anchorage in enumerate(sorted_nodes_anchorages):
if sorted_users_of_anchorages[anchorage_index] < sorted_capacity_of_anchorages[anchorage_index]:
# node anchorage is found
break
return node_anchorage
[docs]
def provide_governing_current_velocity(self,vessel,node,time_start_index,time_end_index):
hydromanager = HydrodynamicDataManager()
station_index = list(hydromanager.hydrodynamic_data["STATION"][:]).index(node)
times = hydromanager.hydrodynamic_times[time_start_index:time_end_index]
start_time = times[0]
end_time = times[-1]
time_step = times[1]-times[0]
relative_layer_height = hydromanager.hydrodynamic_data["LAYER"][:].data
current_velocity = hydromanager.hydrodynamic_data["Primary current velocity"][
time_start_index:time_end_index, station_index
].data
def depth_averaged_current_velocity(interpolation_depth, times, relative_layer_height, current_velocity, station_index):
layer_boundaries = []
average_current_velocity = []
number_of_layers = len(relative_layer_height)
water_depth = (
-1 * hydromanager.hydrodynamic_data["MBL"][:, station_index].data
+ hydromanager.hydrodynamic_data["Water level"][:, station_index].data
)
relative_water_depth = np.outer(water_depth, relative_layer_height)
cumulative_water_depth = np.cumsum(relative_water_depth, axis=1)
for ti in range(len(times)):
layer_boundaries.append(
np.interp(interpolation_depth, cumulative_water_depth[ti], np.arange(0, number_of_layers, 1))
)
layer_boundary = np.floor(layer_boundaries)
relative_boundary_layer_thickness = layer_boundaries - layer_boundary
for ti in range(len(times)):
if int(layer_boundary[ti]) + 2 < len(relative_layer_height):
rel_layer_heights = relative_layer_height[0 : int(layer_boundary[ti]) + 2].copy()
rel_layer_heights[-1] = rel_layer_heights[-1] * relative_boundary_layer_thickness[ti]
average_current_velocity.append(
np.average(current_velocity[ti][0 : int(layer_boundary[ti]) + 2], weights=rel_layer_heights)
)
elif int(layer_boundary[ti]) == 0:
average_current_velocity = current_velocity[ti]
else:
average_current_velocity.append(np.average(current_velocity[ti], weights=relative_layer_height))
return average_current_velocity
if "LAYER" in list(hydromanager.hydrodynamic_data["Current velocity"].dimensions):
if vessel._T <= 5:
current_velocity = depth_averaged_current_velocity(5, times, relative_layer_height, current_velocity, station_index)
elif vessel._T <= 15:
current_velocity = depth_averaged_current_velocity(
vessel._T, times, relative_layer_height, current_velocity, station_index
)
else:
current_velocity = [np.average(current_velocity[t], weights=relative_layer_height) for t in range(len(times))]
if len(current_velocity) > 2:
current_governing_current_velocity = current_velocity[2]
else:
current_governing_current_velocity = current_velocity[-1]
return current_velocity, current_governing_current_velocity
# Functions used to calculate the sail-in-times for a specific vessel
[docs]
def provide_tidal_window_restriction(self, vessel, route, node, delay, restriction_type='Vertical'):
"""Function: determines which tidal window restriction applies to the vessel at the specific node
Input:
- vessel: an identity which is Identifiable, Movable, and Routable, and has VesselProperties
- route: a list of strings of node names that resemble the route that the vessel is planning
to sail (can be different than vessel.route)
- specifications: the specific data regarding the properties for which the restriction holds
- node: a string that defines the node of the tidal window restriction
"""
# Predefined bool
restriction = []
boolean = True
no_tidal_window = True
node_index = route.index(node)
# Vessel parameters
previous_terminal_of_call = np.nan
terminal_of_call = np.nan
if len(dict.get(vessel.output, 'visited_terminals')):
previous_terminal_of_call = dict.get(vessel.output, 'visited_terminals')[-1]
if len(dict.get(vessel.metadata, 'terminal_of_call')):
terminal_of_call = dict.get(vessel.metadata, 'terminal_of_call')[0]
route_before_node = route[:node_index]
route_after_node = route[node_index:]
ukc = 0.
if restriction_type == 'Horizontal':
restriction_condition_df = self.horizontal_tidal_restrictions_condition_df
ukc, _, _, _, _, _ = self.provide_ukc_clearance(vessel, node, delay)
elif restriction_type == 'Vertical':
restriction_condition_df = self.vertical_tidal_restrictions_condition_df
node_mask = restriction_condition_df.Node == node
length_mask = ((restriction_condition_df.min_ge_Length <= vessel.L)&
(restriction_condition_df.min_gt_Length < vessel.L)&
(restriction_condition_df.max_lt_Length > vessel.L) &
(restriction_condition_df.max_le_Length >= vessel.L))
draught_mask = ((restriction_condition_df.min_ge_Draught <= vessel.T)&
(restriction_condition_df.min_gt_Draught < vessel.T)&
(restriction_condition_df.max_lt_Draught > vessel.T)&
(restriction_condition_df.max_le_Draught >= vessel.T))
ukc_mask = ((restriction_condition_df.min_ge_UKC <= ukc)&
(restriction_condition_df.min_gt_UKC < ukc)&
(restriction_condition_df.max_lt_UKC > ukc)&
(restriction_condition_df.max_le_UKC >= ukc))
from_node_mask = (restriction_condition_df.bound_from.isin(route_before_node)|
(restriction_condition_df.bound_from.isna()))
to_node_mask = (restriction_condition_df.bound_to.isin(route_before_node)|
(restriction_condition_df.bound_to.isna()))
terminal_mask = ((restriction_condition_df.terminal == terminal_of_call)|
(restriction_condition_df.terminal.isna()))
previous_terminal_mask = ((restriction_condition_df.visited_terminal == previous_terminal_of_call)|
(restriction_condition_df.visited_terminal.isna()))
restriction_mask = node_mask&from_node_mask&to_node_mask&length_mask&draught_mask&terminal_mask&ukc_mask&terminal_mask&previous_terminal_mask
restriction_condition_df = restriction_condition_df[restriction_mask]
if restriction_condition_df.empty:
return restriction, no_tidal_window
no_tidal_window = False
# conditions_df = restriction_condition_df[restriction_mask]
restriction = restriction_condition_df.iloc[0]
return restriction, no_tidal_window
[docs]
def provide_water_depth(self,vessel,node,delay=0):
hydromanager = HydrodynamicDataManager()
node_index = list(hydromanager.hydrodynamic_data["STATION"][:]).index(node)
time_index = np.absolute(
hydromanager.hydrodynamic_times - pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now + delay)).to_datetime64()
).argmin()
water_level = hydromanager.hydrodynamic_data["Water level"][time_index, node_index].data
MBL = hydromanager.hydrodynamic_data["MBL"][time_index, node_index].data
available_water_depth = water_level - MBL
return MBL,water_level,available_water_depth
[docs]
def provide_ukc_clearance(self,vessel,node,delay=0):
"""Function: calculates the sail-in-times for a specific vssel with certain properties and a pre-determined route and provides this information to the vessel
Input:
- vessel: an identity which is Identifiable, Movable, and Routable, and has VesselProperties
- node:
- components_calc:
"""
MBL,water_level,available_water_depth = self.provide_water_depth(vessel,node,delay)
ukc_s, ukc_p, ukc_r, fwa = np.zeros(4)
ship_related_factors = {'ukc_s': ukc_s, 'ukc_p': ukc_p, 'ukc_r': ukc_r, 'fwa': fwa,'extra_ukc': vessel.metadata['ukc']}
if 'Vertical tidal restriction' in vessel.multidigraph.nodes[node].keys():
restrictions, _ = self.provide_tidal_window_restriction(vessel, [node], node, delay)
if restrictions.empty:
return [], [], available_water_depth, 0., ship_related_factors, MBL
specifications = self.graph.nodes['A']['Vertical tidal restriction']['Specifications']
specifications = self.graph.nodes["A"]["Vertical tidal restriction"]["Specifications"]
ukcs_s = []
ukcs_p = []
ukcs_r = []
fwas = []
for _,specs in specifications.iterrows():
ukcs_s.append(specs.Restriction.ukc_s)
ukcs_p.append(specs.Restriction.ukc_p)
ukcs_r.append(specs.Restriction.ukc_r)
fwas.append(specs.Restriction.fwa)
# Determine which restriction applies to vessel
restriction_index = 0
# Calculate ukc policy based on the applied restriction
ukc_s = ukcs_s[restriction_index]
ukc_p = ukcs_p[restriction_index] * vessel.T
ukc_r = ukcs_r[restriction_index][0] * (vessel.T - ukcs_r[restriction_index][1])
fwa = fwas[restriction_index] * vessel.T
ship_related_factors = {"ukc_s": ukc_s, "ukc_p": ukc_p, "ukc_r": ukc_r, "fwa": fwa, "extra_ukc": vessel.metadata["ukc"]}
required_water_depth = vessel.T + sum(ship_related_factors.values())
net_ukc = available_water_depth - required_water_depth
gross_ukc = available_water_depth - vessel.T
return net_ukc, gross_ukc, available_water_depth, required_water_depth, ship_related_factors, MBL
[docs]
def provide_minimum_available_water_depth_along_route(self,vessel, route, time_start, time_end, delay=0):
"""Function: calculates the minimum available water depth (predicted/modelled/measured water level minus the local maintained bed level) along the route over time,
subtracted with the difference between the gross ukc and net ukc (hence: subtracted with additional safety margins consisting of vessel-related factors
and water level factors). The bottom-related factors are already accounted for in the use of the MBL instead of the actual depth.
Input:
- vessel: an identity which is Identifiable, Movable, and Routable, and has VesselProperties
- route: a list of strings of node names that resemble the route that the vessel is planning
to sail (can be different than vessel.route)
- delay:
"""
hydromanager = HydrodynamicDataManager()
time_start_index = np.max(
[0, np.absolute(hydromanager.hydrodynamic_times - (time_start + np.timedelta64(int(delay), "s"))).argmin() - 2]
)
time_end_index = np.absolute(hydromanager.hydrodynamic_times - (time_end + np.timedelta64(int(delay), "s"))).argmin()
net_ukc = pd.DataFrame()
times = hydromanager.hydrodynamic_times[time_start_index:time_end_index]
start_time = times[0]
end_time = times[-1]
t_step = times[1] - times[0]
time_range = np.arange(start_time, end_time + t_step, t_step)
nodes_of_interest = route.copy()
for node in route:
if self.graph.nodes[node]['LAT']-self.graph.nodes[node]['MBL'] >= vessel.T+np.max([1.0,vessel.T*1.125]):
nodes_of_interest.remove(node)
if not nodes_of_interest:
net_ukc['min_net_ukc'] = 0.0
net_ukc['station'] = ''
net_ukc.loc[time_start,:] = [1.0,route[0]]
net_ukc.loc[time_end,:] = [1.0, route[-1]]
return net_ukc
# Start of calculation by looping over the nodes of the route
for route_index, node_name in enumerate(nodes_of_interest):
station_index = list(hydromanager.hydrodynamic_data["STATION"][:]).index(node_name)
sailing_time_to_next_node = vessel.env.vessel_traffic_service.provide_sailing_time(vessel,route[:(route_index+1)])
time_correction_index = int(np.round(sailing_time_to_next_node['Time'].sum() / (t_step/np.timedelta64(1, 's'))))
water_level = hydromanager.hydrodynamic_data["Water level"][time_start_index:time_end_index, station_index].data
MBL = hydromanager.hydrodynamic_data["MBL"][time_start_index:time_end_index, station_index].data
_, _, _, required_water_depth, _, _ = self.provide_ukc_clearance(vessel,node_name,delay)
water_depth = water_level - MBL
net_ukc = pd.concat(
[
net_ukc,
pd.DataFrame(
[available_water_depth - required_water_depth for available_water_depth in water_depth],
index=[t - time_correction_index * t_step for t in times],
columns=[node_name],
),
],
axis=1,
)
# Pick the minimum of the water depths for each time and each node
net_ukc = net_ukc.dropna(axis=0)
net_ukc["min_net_ukc"] = net_ukc.min(axis=1)
return net_ukc
[docs]
def provide_vertical_tidal_windows(self, vessel, route, time_start, time_end, delay=0, plot=False):
"""Function: calculates the windows available to sail-in and -out of the port given the
vertical tidal restrictions according to the tidal window policy.
Input:
- vessel: an identity which is Identifiable, Movable, and Routable, and has VesselProperties
- route: a list of strings of node names that resemble the route that the vessel is planning
to sail (can be different than vessel.route)
- sailing_time_correction: a bool that indicates whether the calculation should correct for
sailing_speed (dynamic calculation) or not (static calculation)
"""
hydromanager = HydrodynamicDataManager()
vertical_tidal_accessibility = pd.DataFrame(columns=['Tidal period', 'Period number', 'Limit', 'Accessibility'])
time_start_index = np.max(
[0, np.absolute(hydromanager.hydrodynamic_times - (time_start + np.timedelta64(int(delay), "s"))).argmin() - 2]
)
time_end_index = np.absolute(hydromanager.hydrodynamic_times - (time_end + np.timedelta64(int(delay), "s"))).argmin()
net_ukcs = self.provide_minimum_available_water_depth_along_route(vessel, route, time_start, time_end, delay)
new_net_ukcs = pd.DataFrame()
for station in list(dict.fromkeys(net_ukcs['station'])):
station_index = list(hydromanager.hydrodynamic_data["STATION"][:]).index(station)
tidal_periods = self.read_tidal_periods(hydromanager.hydrodynamic_data, "Vertical tidal periods", station_index)
if tidal_periods == None:
tidal_periods = [
[hydromanager.hydrodynamic_times[0], "Flood start"],
[hydromanager.hydrodynamic_times[-1], "Flood stop"],
]
tidal_periods = pd.DataFrame(tidal_periods, columns=['Period start', 'Tidal period'])
tidal_periods = tidal_periods.reset_index(names='Period number')
tidal_periods = tidal_periods.set_index('Period start')
net_ukc = net_ukcs[net_ukcs.station == station]
data = pd.concat([net_ukc, tidal_periods])
data = data.sort_index()
data[['Period number','Tidal period']] = data[['Period number','Tidal period']].ffill()
data[['Period number','Tidal period']] = data[['Period number','Tidal period']].bfill()
data = data.dropna()
new_net_ukcs = pd.concat([new_net_ukcs,data])
net_ukcs = new_net_ukcs.sort_index()
net_ukcs['Period number'] = net_ukcs['Period number'].astype(int)
net_ukcs['Tidal period'] = [tidal_period.split(' ')[0] for tidal_period in net_ukcs['Tidal period']]
net_ukcs = net_ukcs[['Tidal period', 'Period number', 'min_net_ukc']]
# Determine zero crossings
zero_crossings = np.where(np.diff(np.sign(net_ukcs['min_net_ukc'])))[0]
for iloc in zero_crossings:
net_ukc = net_ukcs.iloc[iloc]
if net_ukc['min_net_ukc'] > 0:
vertical_tidal_accessibility.loc[net_ukc.name] = [net_ukc['Tidal period'], net_ukc['Period number'], 0, 'Inaccessible']
else:
vertical_tidal_accessibility.loc[net_ukc.name] = [net_ukc['Tidal period'], net_ukc['Period number'], 0, 'Accessible']
# Default values
if net_ukcs.iloc[0]['min_net_ukc'] < 0:
vertical_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_start.replace(tzinfo=None))] = [np.nan, np.nan, 0,'Inaccessible']
else:
vertical_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_start.replace(tzinfo=None))] = [np.nan, np.nan, 0, 'Accessible']
if vertical_tidal_accessibility.iloc[-1,-1] == 'Accessible':
vertical_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_stop.replace(tzinfo=None))] = [np.nan, np.nan, 0,'Inaccessible']
else:
vertical_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_stop.replace(tzinfo=None))] = [np.nan, np.nan, 0,'Accessible']
vertical_tidal_accessibility = vertical_tidal_accessibility.sort_index()
vertical_tidal_accessibility[['Period number', 'Tidal period']] = vertical_tidal_accessibility[['Period number', 'Tidal period']].ffill()
vertical_tidal_accessibility[['Period number', 'Tidal period']] = vertical_tidal_accessibility[['Period number', 'Tidal period']].bfill()
# Return the sail-in or -out-times given the vertical tidal restrictions over the route of the vessel
vertical_tidal_accessibility['Condition'] = 'Water level'
vertical_tidal_accessibility = vertical_tidal_accessibility[~(vertical_tidal_accessibility['Accessibility'] == vertical_tidal_accessibility['Accessibility'].shift(1))]
vertical_tidal_windows = [
[window_start[0], window_end[0]]
for window_start, window_end in zip(
vertical_tidal_accessibility.iloc[:-1].iterrows(), vertical_tidal_accessibility.iloc[1:].iterrows()
)
if window_start[1].Accessibility == "Accessible"
]
if plot:
# Create figure
fig, ax = plt.subplots(figsize=[16 * 2 / 3, 6])
# Plot vertical tidal windows
for window in vertical_tidal_windows:
(vertical_tidal_window,) = ax.fill(
[window[0], window[0], window[1], window[1]],
[-1.5, 1.5, 1.5, -1.5],
facecolor="C0",
alpha=0.25,
edgecolor="none",
)
# Plot net UKC
# net_ukc = self.provide_minimum_available_water_depth_along_route(vessel,route,time_start, time_end, delay)
(net_UKC,) = ax.plot(net_ukcs["min_net_ukc"], color="C0", linewidth=2)
ax.axhline(0, color="k", linewidth=0.5)
# Figure bounds
ax.set_xlim(hydromanager.hydrodynamic_times[time_start_index], hydromanager.hydrodynamic_times[time_end_index - 36])
ax.set_ylim(-1.5, 1.5)
# Figure ticks
ax.set_xticks(ax.get_xticks())
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")
ax.xaxis.set_major_formatter(dates.DateFormatter("%Y-%m-%d %H:%M"))
# Figure axes
ax.set_xlabel("Date")
ax.set_ylabel("Net UKC [m]")
# Legend
ax.legend(
[net_UKC, vertical_tidal_window],
["Net UKC", "Vertical tidal windows"],
frameon=False,
loc="upper left",
bbox_to_anchor=(1.0, 1.0),
)
fig.tight_layout()
plt.show()
return vertical_tidal_accessibility, vertical_tidal_windows, net_ukcs
[docs]
def provide_horizontal_tidal_windows(self, vessel, route, time_start, time_end, delay=0, plot=False):
hydromanager = HydrodynamicDataManager()
def calculate_horizontal_tidal_window(vessel, time_start_index, time_end_index, current_velocity_station, critical_limits, restriction_type, flood=True, ebb=True,decreasing=False):
station_index = list(hydromanager.hydrodynamic_data["STATION"][:]).index(current_velocity_station)
time_step = hydromanager.hydrodynamic_times[1] - hydromanager.hydrodynamic_times[0]
time_start_index = np.max([0,time_start_index-int(np.timedelta64(12,'h')/time_step)])
currents_time = hydromanager.hydrodynamic_times[time_start_index:time_end_index]
currents_data,_ = self.provide_governing_current_velocity(vessel,current_velocity_station,time_start_index,time_end_index)
tidal_periods = self.read_tidal_periods(hydromanager.hydrodynamic_data, "Horizontal tidal periods", station_index)
tidal_periods = [condition for condition in tidal_periods if condition[0] <= currents_time[-1] and condition[0] >= currents_time[0]]
currents = pd.DataFrame({'Current velocity': currents_data}, index=currents_time)
currents = abs(currents)
tidal_periods = pd.DataFrame(tidal_periods,columns=['Period start','Tidal period'])
tidal_periods = tidal_periods.reset_index(names='Period number')
tidal_periods = tidal_periods.set_index('Period start')
currents = pd.concat([currents,tidal_periods])
currents = currents.sort_index()
for column_name in ['Tidal period','Period number']:
currents[column_name] = currents[column_name].fillna(method='ffill')
currents[column_name] = currents[column_name].fillna(method='bfill')
currents['Tidal period'] = [tidal_period.split(' ')[0] for tidal_period in currents['Tidal period']]
currents['Current velocity'] = currents['Current velocity'].interpolate()
currents["Period number"] = currents["Period number"].astype(int)
roots_cv = [
root for root in roots if root >= currents_time[0].astype(float) and root <= currents_time[-1].astype(float)
]
times_horizontal_tidal_period = []
for root in roots_cv:
root = pd.Timestamp(root).to_datetime64()
index_current_root = bisect.bisect_right(currents_time, root) - 2
if index_current_root == -1:
index_current_root = index_current_root + 1
if len(currents_data[index_prev_root:index_current_root]) == 0:
continue
cvel_diff_cross = currents_data[index_current_root + 1] - currents_data[index_current_root - 1]
if cvel_diff_cross < 0:
times_horizontal_tidal_period.append([root, "Ebb Start"])
index_prev_root = index_current_root
elif cvel_diff_cross > 0:
times_horizontal_tidal_period.append([root, "Flood Start"])
index_prev_root = index_current_root
tidal_periods = [condition for condition in times_horizontal_tidal_period if condition[0] <= currents_time[-1]]
currents_time = np.append(
currents_time, np.array([tide[0] for tide in tidal_periods if tide[0] not in currents_time], dtype="datetime64[ns]")
)
currents_data = [abs(value) for value in currents_data]
currents_data = np.append(currents_data, -999 * np.ones(len(tidal_periods)))
currents_time, currents_data = [np.array(data) for data in zip(*sorted(zip(currents_time, currents_data)))]
# Find the intersection points with critical current velocity
current_intersections = []
if isinstance(cross_current_limit_dataframe, pd.DataFrame) and not cross_current_limit_dataframe.empty:
critical_limit = np.interp(
currents_time.astype("float"),
cross_current_limit_dataframe.index.to_numpy().astype("float"),
cross_current_limit_dataframe.Limit.to_numpy(),
)
idx = np.argwhere(np.diff(np.sign(critical_limit - currents_data))).flatten()
roots = currents_time[idx]
current_intersections.extend(roots.astype(dtype="datetime64[ns]"))
critical_current_velocity = np.interp(
np.array(current_intersections).astype("float"), currents_time.astype("float"), critical_limit
)
horizontal_tidal_accessibility = pd.DataFrame(
data=critical_current_velocity, columns=["Limit"], index=current_intersections
)
elif isinstance(critical_limits, list):
critical_current_velocity = []
for critical_limit in critical_limits:
idx = np.argwhere(
np.diff(np.sign([current_velocity - critical_limit for current_velocity in currents_data]))
).flatten()
roots = currents_time[idx]
current_intersections.extend(roots.astype(dtype="datetime64[ns]"))
critical_current_velocity.extend(np.ones(len(roots)) * critical_limit)
horizontal_tidal_accessibility = pd.DataFrame(
data=critical_current_velocity, columns=["Limit"], index=current_intersections
)
horizontal_tidal_accessibility = horizontal_tidal_accessibility.sort_index()
# Determine the tidal period of the found interpolation points
horizontal_tidal_accessibility["Period"] = ""
horizontal_tidal_accessibility["Period_nr"] = -999
for period_nr, (tidal_period_start, tidal_period_end) in enumerate(zip(tidal_periods[:-1], tidal_periods[1:])):
tidal_period = tidal_period_start[1].split(" ")[0]
if tidal_period == "Rising":
tidal_period = "Flood"
if tidal_period == "Falling":
tidal_period = "Ebb"
tidal_period_start = tidal_period_start[0]
tidal_period_end = tidal_period_end[0]
horizontal_tidal_accessibility.loc[
horizontal_tidal_accessibility[
(horizontal_tidal_accessibility.index >= tidal_period_start)
& (horizontal_tidal_accessibility.index <= tidal_period_end)
].index,
"Period",
] = tidal_period
horizontal_tidal_accessibility.loc[
horizontal_tidal_accessibility[
(horizontal_tidal_accessibility.index >= tidal_period_start)
& (horizontal_tidal_accessibility.index <= tidal_period_end)
].index,
"Period_nr",
] = period_nr
horizontal_tidal_accessibility["Period_nr"] = horizontal_tidal_accessibility["Period_nr"].astype(int)
# Filter the found interpolation points: remove from errors
# (multiple interpolated numbers and flood/ebb values if not required)
if decreasing:
selected_horizontal_tidal_accessibility = pd.DataFrame(columns=horizontal_tidal_accessibility.columns)
if flood:
selected_horizontal_tidal_accessibility = pd.concat(
[
selected_horizontal_tidal_accessibility,
horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period == "Flood"],
]
)
if ebb:
selected_horizontal_tidal_accessibility = pd.concat(
[
selected_horizontal_tidal_accessibility,
horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period == "Ebb"],
]
)
horizontal_tidal_accessibility = selected_horizontal_tidal_accessibility.sort_index()
horizontal_tidal_accessibility = horizontal_tidal_accessibility.loc[
horizontal_tidal_accessibility.index.drop_duplicates(keep=False)
]
# Correct the found interpolation points
if decreasing:
tide_number = horizontal_tidal_accessibility.iloc[0]["Period_nr"]
number_of_tidal_periods = horizontal_tidal_accessibility.iloc[-1]["Period_nr"]
end_time_windows = []
for period_nr in [
idx for idx, count in horizontal_tidal_accessibility.value_counts("Period_nr").items() if count == 2
]:
sub_df = horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period_nr == period_nr]
end_time_windows.append(tidal_periods[period_nr + 1][0] - sub_df.iloc[-1].name)
mean_end_time_window = np.mean(end_time_windows)
missing_tides = set(list(np.arange(tide_number, number_of_tidal_periods, 2))) - set(
list(dict.fromkeys(horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period == "Flood"].Period_nr))
)
for tide_index in missing_tides:
starting_time = np.datetime64(tidal_periods[tide_index][0])
closing_time = np.datetime64(tidal_periods[tide_index + 1][0] - mean_end_time_window)
next_index = bisect.bisect_right(currents_time, closing_time)
previous_index = next_index - 1
current_velocity = np.interp(
closing_time,
[currents_time[previous_index], currents_time[next_index]],
[currents_data[previous_index], currents_data[next_index]],
)
horizontal_tidal_accessibility.loc[starting_time, :] = [0, "Flood", tide_index]
horizontal_tidal_accessibility.loc[closing_time, :] = [current_velocity, "Flood", tide_index]
for period_nr, count in [
(idx, count) for idx, count in horizontal_tidal_accessibility.value_counts("Period_nr").items() if count != 2
]:
if count == 1:
for iloc, (loc, info) in enumerate(
horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period_nr == period_nr].iterrows()
):
if not iloc % 2 and info.Limit != np.max(critical_limits):
starting_time = np.datetime64(tidal_periods[info.Period_nr][0])
horizontal_tidal_accessibility.loc[starting_time] = info
horizontal_tidal_accessibility.loc[starting_time, "Limit"] = np.max(critical_limits)
break
else:
horizontal_tidal_accessibility = horizontal_tidal_accessibility.drop(
horizontal_tidal_accessibility[
(horizontal_tidal_accessibility.Period_nr == period_nr)
& (horizontal_tidal_accessibility.Limit == np.max(critical_limits))
].index[:-1]
)
horizontal_tidal_accessibility = horizontal_tidal_accessibility.drop(
horizontal_tidal_accessibility[
(horizontal_tidal_accessibility.Period_nr == period_nr)
& (horizontal_tidal_accessibility.Limit == np.min(critical_limits))
].index[:-1]
)
if len(horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period_nr == period_nr]) < 2:
for iloc, (loc, info) in enumerate(
horizontal_tidal_accessibility[horizontal_tidal_accessibility.Period_nr == period_nr].iterrows()
):
if not iloc % 2 and info.Limit != np.max(critical_limits):
starting_time = np.datetime64(tidal_periods[info.Period_nr][0])
horizontal_tidal_accessibility.loc[starting_time] = info
horizontal_tidal_accessibility.loc[starting_time, "Limit"] = np.max(critical_limits)
break
else:
for period_nr in [
idx for idx, count in horizontal_tidal_accessibility.value_counts("Period_nr").items() if count % 2
]:
for loc, info in horizontal_tidal_accessibility[
horizontal_tidal_accessibility.Period_nr == period_nr
].iterrows():
if (
currents_data[list(currents_time).index(loc) - 1] < info.Limit
and currents_data[list(currents_time).index(loc) + 1] < info.Limit
):
horizontal_tidal_accessibility[loc + np.timedelta64(1, "ns")] = info
break
# Add fully (in)accessible tides
for tide in tides:
critical_limit = critical_limits[tide]
if critical_limit == tidal_window_constructor.accessibility.accessible.value:
for period_time,period_info in tidal_periods.iterrows():
if period_info['Tidal period'].find(tide)+1:
horizontal_tidal_accessibility.loc[period_time] = [critical_limit, 'Accessible', 'Current velocity', tide, period_info['Period number']]
elif critical_limit == tidal_window_constructor.accessibility.inaccessible.value:
for period_time,period_info in tidal_periods.iterrows():
if period_info['Tidal period'].find(tide)+1:
horizontal_tidal_accessibility.loc[period_time] = [critical_limit, 'Inaccessible', 'Current velocity', tide, period_info['Period number']]
last_tidal_period = tidal_periods.iloc[-1]
horizontal_tidal_accessibility.loc[last_tidal_period.name] = [0.0, 'Inaccessible', 'Current velocity', last_tidal_period['Tidal period'], last_tidal_period['Period number']]
return horizontal_tidal_accessibility
# Start calculation
horizontal_tidal_restriction_nodes = []
horizontal_tidal_restriction_stations = []
restrictions = pd.DataFrame()
horizontal_tidal_accessibility = pd.DataFrame(columns=['Limit', 'Condition', 'Accessibility','Period_nr'])
horizontal_tidal_window = False
time_start_index = np.max([0, np.absolute(hydromanager.hydrodynamic_times - (time_start)).argmin() - 2])
time_end_index = np.absolute(hydromanager.hydrodynamic_times - (time_end)).argmin()
for route_index, node_name in enumerate(route):
if 'Horizontal tidal restriction' in vessel.multidigraph.nodes[node_name].keys():
sailing_time_to_next_node = self.provide_sailing_time(vessel, route[:(route_index + 1)])
restriction, no_tidal_window = self.provide_tidal_window_restriction(vessel, route, node_name,sailing_time_to_next_node.Time.sum(),restriction_type='Horizontal')
if no_tidal_window:
continue
restrictions = pd.concat([restrictions,pd.DataFrame([restriction])])
time_start_index = np.max(
[0, np.absolute(hydromanager.hydrodynamic_times - (time_start + np.timedelta64(int(delay), "s"))).argmin() - 2]
)
time_end_index = np.absolute(
hydromanager.hydrodynamic_times - (time_end + np.timedelta64(int(delay), "s"))
).argmin()
horizontal_tidal_window = True
current_velocity_station = restriction.Data
cross_current_limit = {}
cross_current_limit['Flood'] = restriction.Restriction.current_velocity_values['Flood']
cross_current_limit['Ebb'] = restriction.Restriction.current_velocity_values['Ebb']
if restriction.Restriction.window_method == 'Maximum':
next_horizontal_tidal_accessibility = calculate_horizontal_tidal_window(vessel,time_start_index,time_end_index,current_velocity_station,cross_current_limit,restriction.Restriction)
if restriction.Restriction.window_method == 'Point-based':
if isinstance(restriction.Restriction.current_velocity_values['Flood'], list) and restriction.Restriction.current_velocity_values['Ebb'] == -999:
next_horizontal_tidal_accessibility = calculate_horizontal_tidal_window(vessel,time_start_index,time_end_index,current_velocity_station,cross_current_limit,restriction.Restriction,ebb=False,decreasing=True)
elif isinstance(restriction.Restriction.current_velocity_values['Ebb'], list) and restriction.Restriction.current_velocity_values['Flood'] == -999:
next_horizontal_tidal_accessibility = calculate_horizontal_tidal_window(vessel,time_start_index,time_end_index,current_velocity_station,cross_current_limit,restriction.Restriction,flood=False,decreasing=True)
else:
next_horizontal_tidal_accessibility = calculate_horizontal_tidal_window(vessel,time_start_index,time_end_index,current_velocity_station,cross_current_limit,restriction.Restriction,decreasing=True)
horizontal_tidal_restriction_nodes.append(node_name)
horizontal_tidal_restriction_stations.append(current_velocity_station)
next_horizontal_tidal_accessibility_time_correction = np.timedelta64(int(sailing_time_to_next_node['Time'].sum()), 's')
next_horizontal_tidal_accessibility.index -= next_horizontal_tidal_accessibility_time_correction
next_horizontal_tidal_accessibility = next_horizontal_tidal_accessibility.sort_index()
next_horizontal_tidal_accessibility = next_horizontal_tidal_accessibility[~(next_horizontal_tidal_accessibility['Accessibility'] == next_horizontal_tidal_accessibility['Accessibility'].shift(1))]
if horizontal_tidal_accessibility.empty:
horizontal_tidal_accessibility = next_horizontal_tidal_accessibility
else:
horizontal_tidal_accessibility = self.combine_tidal_windows(
horizontal_tidal_accessibility, next_horizontal_tidal_accessibility, current_velocity_windows=True
)
horizontal_tidal_restriction_nodes.append(node_name)
horizontal_tidal_restriction_stations.append(station)
next_horizontal_tidal_accessibility_time_correction = np.timedelta64(
int(sailing_time_to_next_node["Time"].sum()), "s"
)
next_horizontal_tidal_accessibility.index -= next_horizontal_tidal_accessibility_time_correction
if horizontal_tidal_accessibility.empty:
horizontal_tidal_accessibility = next_horizontal_tidal_accessibility
else:
horizontal_tidal_accessibility = self.combine_tidal_windows(
horizontal_tidal_accessibility, next_horizontal_tidal_accessibility
)
if horizontal_tidal_accessibility.empty or not horizontal_tidal_window:
horizontal_tidal_accessibility = pd.DataFrame(columns=["Limit", "Condition", "Accessibility"])
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_start.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Accessible",
]
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_stop.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Inaccessible",
]
else:
if horizontal_tidal_accessibility.iloc[0].Accessibility == "Inaccessible":
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_start.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Accessible",
]
else:
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_start.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Inaccessible",
]
if horizontal_tidal_accessibility.iloc[-1].Accessibility == "Inaccessible":
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_stop.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Accessible",
]
else:
horizontal_tidal_accessibility.loc[np.datetime64(vessel.env.simulation_stop.replace(tzinfo=None)), :] = [
0,
"Current velocity",
"Inaccessible",
]
horizontal_tidal_accessibility = horizontal_tidal_accessibility.sort_index()
horizontal_tidal_windows = [
[window_start[0], window_end[0]]
for window_start, window_end in zip(
horizontal_tidal_accessibility.iloc[:-1].iterrows(), horizontal_tidal_accessibility.iloc[1:].iterrows()
)
if window_start[1].Accessibility == "Accessible"
]
if plot:
# Create figure
fig, ax = plt.subplots(figsize=[16 * 2 / 3, 6])
# Plot vertical tidal windows
for window in horizontal_tidal_windows:
(horizontal_tidal_window,) = ax.fill(
[window[0], window[0], window[1], window[1]],
[-1.5, 1.5, 1.5, -1.5],
facecolor="firebrick",
alpha=0.25,
edgecolor="none",
)
# Plot governing current velocity
for node,station in zip(horizontal_tidal_restriction_nodes,horizontal_tidal_restriction_stations):
governing_current_velocity,_ = self.provide_governing_current_velocity(vessel,station,time_start_index,time_end_index)
horizontal_tidal_accessibility_time_correction = np.timedelta64(int(self.provide_sailing_time(vessel, route[:(route.index(node) + 1)])['Time'].sum() + delay),'s')
(current_velocity,) = ax.plot(
[
time - horizontal_tidal_accessibility_time_correction
for time in hydromanager.hydrodynamic_times[time_start_index:time_end_index]
],
governing_current_velocity,
color="firebrick",
linewidth=2,
)
ax.axhline(0, color='k', linewidth=1)
# Figure bounds
ax.set_xlim(hydromanager.hydrodynamic_times[time_start_index], hydromanager.hydrodynamic_times[time_end_index - 36])
ax.set_ylim(-1.5, 1.5)
# Figure ticks
ax.set_xticks(ax.get_xticks())
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right")
ax.xaxis.set_major_formatter(dates.DateFormatter("%Y-%m-%d %H:%M"))
# Figure axes
ax.set_xlabel("Date")
ax.set_ylabel("Current velocity [m/s]")
# Legend
ax.legend(
[current_velocity, horizontal_tidal_window],
["Current velocity", "Horizontal tidal windows"],
frameon=False,
loc="upper left",
bbox_to_anchor=(1.0, 1.0),
)
fig.tight_layout()
plt.show()
return horizontal_tidal_accessibility, horizontal_tidal_windows, restrictions
[docs]
def combine_tidal_windows(self, tidal_window_1, tidal_window_2):
accessibility_1 = tidal_window_1.iloc[
[bisect.bisect_right(tidal_window_1.index, index) - 1 for index in tidal_window_2.index]
].Accessibility.to_numpy()
accessibility_2 = tidal_window_2.iloc[
[bisect.bisect_right(tidal_window_2.index, index) - 1 for index in tidal_window_1.index]
].Accessibility.to_numpy()
accessibility_1_interpolated = tidal_window_1.copy()
accessibility_2_interpolated = tidal_window_2.copy()
for key, value in zip(tidal_window_2.index, accessibility_1):
accessibility_1_interpolated.loc[key, "Accessibility"] = value
for key, value in zip(tidal_window_1.index, accessibility_2):
accessibility_2_interpolated.loc[key, "Accessibility"] = value
accessibility_1_interpolated = accessibility_1_interpolated.sort_index()
accessibility_2_interpolated = accessibility_2_interpolated.sort_index()
tidal_accessibility = pd.concat([accessibility_1_interpolated, accessibility_2_interpolated], axis=1)
tidal_accessibility = tidal_accessibility.sort_index()
tidal_accessibility_limit = [
limit_1 if not math.isnan(limit_1) else limit_2 for limit_1, limit_2 in tidal_accessibility.Limit.to_numpy()
]
tidal_accessibility_condition = [
condition_1 if isinstance(condition_1, str) else condition_2
for condition_1, condition_2 in tidal_accessibility.Condition.to_numpy()
]
tidal_accessibility_accessibility = [
"Accessible" if accessibility_1 == accessibility_2 and accessibility_1 == "Accessible" else "Inaccessible"
for accessibility_1, accessibility_2 in tidal_accessibility.Accessibility.to_numpy()
]
tidal_accessibility = tidal_accessibility.drop(["Limit", "Condition", "Accessibility"], axis=1)
tidal_accessibility["Limit"] = tidal_accessibility_limit
tidal_accessibility["Condition"] = tidal_accessibility_condition
tidal_accessibility["Accessibility"] = tidal_accessibility_accessibility
accessible_indexes = [
idx for idx, accessibility in enumerate((tidal_accessibility.Accessibility == "Accessible").to_numpy()) if accessibility
]
inaccessible_indexes = [
idx
for idx, inaccessibility in enumerate((tidal_accessibility.Accessibility == "Inaccessible").to_numpy())
if inaccessibility
]
accessible_indexes = np.array(
[indexes[-1] for indexes in np.split(accessible_indexes, np.nonzero(np.diff(accessible_indexes) != 1) + 1)], dtype=int
)
inaccessible_indexes = np.array(
[indexes[0] for indexes in np.split(inaccessible_indexes, np.nonzero(np.diff(inaccessible_indexes) != 1) + 1)],
dtype=int,
)
tidal_window_indexes = np.sort(np.append(accessible_indexes, inaccessible_indexes))
tidal_accessibility = tidal_accessibility.iloc[tidal_window_indexes]
return tidal_accessibility
[docs]
def provide_tidal_windows(self,vessel,route,time_start,time_end,ax_left=None,ax_right=None,delay=0,plot=False):
hydromanager = HydrodynamicDataManager()
time_start_index = np.max(
[0, np.absolute(hydromanager.hydrodynamic_times - (time_start + np.timedelta64(int(delay), "s"))).argmin() - 2]
)
time_end_index = np.absolute(hydromanager.hydrodynamic_times - (time_end + np.timedelta64(int(delay), "s"))).argmin()
vertical_tidal_accessibility,vertical_tidal_windows,net_ukcs = self.provide_vertical_tidal_windows(vessel, route, time_start, time_end, delay)
horizontal_tidal_accessibility,horizontal_tidal_windows,horizontal_tidal_restrictions = self.provide_horizontal_tidal_windows(vessel, route, time_start, time_end, delay)
if not horizontal_tidal_accessibility.empty:
tidal_accessibility = self.combine_tidal_windows(vertical_tidal_accessibility,horizontal_tidal_accessibility)
else:
tidal_accessibility = vertical_tidal_accessibility
tidal_windows = [[window_start[0], window_end[0]] for window_start, window_end in zip(tidal_accessibility.iloc[:-1].iterrows(), tidal_accessibility.iloc[1:].iterrows()) if window_start[1].Accessibility == 'Accessible']
# Plot
if plot:
# Create figure
if not ax_left:
_, ax_left = plt.subplots(figsize=[16 * 2 / 3, 6])
ax_right = ax_left.twinx()
# Plot net UKC
# net_ukc = self.provide_minimum_available_water_depth_along_route(vessel, route, time_start, time_end, delay)
(net_UKC,) = ax_left.plot(net_ukcs["min_net_ukc"], color="C0", linewidth=2, zorder=1)
minimum_required_net_ukc = ax_left.axhline(0, color="C0", linestyle="--", linewidth=2)
# Plot governing current velocity
horizontal_restriction_type = None
for index,restriction_info in horizontal_tidal_restrictions.iterrows():
horizontal_restriction_type = restriction_info.Restriction.window_method
governing_current_velocity,_ = self.provide_governing_current_velocity(vessel,restriction_info.Data,time_start_index, time_end_index)
horizontal_tidal_accessibility_time_correction = np.timedelta64(int(self.provide_sailing_time(vessel, route[:(route.index(restriction_info.Node) + 1)])['Time'].sum()), 's')
(current_velocity,) = ax_right.plot(
[
time - horizontal_tidal_accessibility_time_correction
for time in hydromanager.hydrodynamic_times[time_start_index:time_end_index]
],
governing_current_velocity,
color="firebrick",
linewidth=2,
zorder=1,
)
if horizontal_restriction_type == 'Maximum':
critical_current_velocity = ax_right.axhline(restriction_info.Restriction.current_velocity_values['Flood'], color='firebrick',linestyle='--', linewidth=2)
ax_right.axhline(-1 * restriction_info.Restriction.current_velocity_values['Ebb'], color='firebrick',linestyle='--', linewidth=2)
ax_right.set_ylim(np.floor(np.min(governing_current_velocity)),np.ceil(np.max(governing_current_velocity)))
# Figure bounds
ax_left.set_xlim(
hydromanager.hydrodynamic_times[time_start_index], hydromanager.hydrodynamic_times[time_end_index - 36]
)
ax_left.set_ylim(
np.min([np.floor(np.min(net_ukcs["min_net_ukc"].to_numpy())), -1.0]),
np.max([np.ceil(np.max(net_ukcs["min_net_ukc"])), 1.0]),
)
# Calculate vertical and horizontal tidal windows
vertical_tidal_window_polygons = []
for window in vertical_tidal_windows:
vertical_tidal_window_polygons.append(
Polygon(
[
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
]
)
)
horizontal_tidal_window_polygons = []
for window in horizontal_tidal_windows:
horizontal_tidal_window_polygons.append(
Polygon(
[
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
]
)
)
tidal_window_polygons = []
for window in tidal_windows:
tidal_window_polygons.append(
Polygon(
[
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
Point((window[0] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[1]),
Point((window[1] - np.datetime64("1970-01-01")) / np.timedelta64(1, "s"), ax_left.get_ylim()[0]),
]
)
)
if not isinstance(horizontal_tidal_window_polygons, Polygon):
horizontal_tidal_window_polygons = MultiPolygon(horizontal_tidal_window_polygons)
vertical_tidal_window_polygons = MultiPolygon(vertical_tidal_window_polygons).difference(
horizontal_tidal_window_polygons
)
vertical_tidal_window_polygons = vertical_tidal_window_polygons.difference(MultiPolygon(tidal_window_polygons))
if not isinstance(vertical_tidal_window_polygons, Polygon):
vertical_tidal_window_polygons = MultiPolygon(vertical_tidal_window_polygons)
horizontal_tidal_window_polygons = MultiPolygon(horizontal_tidal_window_polygons).difference(
vertical_tidal_window_polygons
)
horizontal_tidal_window_polygons = horizontal_tidal_window_polygons.difference(MultiPolygon(tidal_window_polygons))
# Plot vertical tidal windows
if not isinstance(vertical_tidal_window_polygons, Polygon):
for polygon in vertical_tidal_window_polygons.geoms:
polygon_x = []
for timestamp in polygon.exterior.xy[0]:
polygon_x.append(datetime.datetime.fromtimestamp(timestamp,tz=pytz.utc))
polygon_y = list(polygon.exterior.xy[1])
(vertical_tidal_window,) = ax_left.fill(
polygon_x, polygon_y, facecolor="C0", alpha=0.25, edgecolor="none", zorder=0
)
elif isinstance(vertical_tidal_window_polygons, Polygon):
polygon = vertical_tidal_window_polygons
polygon_x = []
for timestamp in polygon.exterior.xy[0]:
polygon_x.append(datetime.datetime.fromtimestamp(timestamp,tz=pytz.utc))
polygon_y = list(polygon.exterior.xy[1])
(vertical_tidal_window,) = ax_left.fill(
polygon_x, polygon_y, facecolor="C0", alpha=0.25, edgecolor="none", zorder=0
)
# Plot horizontal tidal windows
if not isinstance(horizontal_tidal_window_polygons, Polygon):
for polygon in horizontal_tidal_window_polygons.geoms:
polygon_x = []
for timestamp in polygon.exterior.xy[0]:
polygon_x.append(datetime.datetime.fromtimestamp(timestamp,tz=pytz.utc))
polygon_y = list(polygon.exterior.xy[1])
(horizontal_tidal_window,) = ax_left.fill(
polygon_x, polygon_y, facecolor="firebrick", alpha=0.25, edgecolor="none", zorder=0
)
elif isinstance(horizontal_tidal_window_polygons, Polygon):
polygon = horizontal_tidal_window_polygons
polygon_x = []
for timestamp in polygon.exterior.xy[0]:
polygon_x.append(datetime.datetime.fromtimestamp(timestamp,tz=pytz.utc))
polygon_y = list(polygon.exterior.xy[1])
(horizontal_tidal_window,) = ax_left.fill(
polygon_x, polygon_y, facecolor="firebrick", alpha=0.25, edgecolor="none", zorder=0
)
# Plot tidal windows
for window in tidal_windows:
(tidal_window,) = ax_left.fill(
[window[0], window[0], window[1], window[1]],
[ax_left.get_ylim()[0], ax_left.get_ylim()[1], ax_left.get_ylim()[1], ax_left.get_ylim()[0]],
facecolor="limegreen",
alpha=0.25,
edgecolor="none",
zorder=0,
)
if not tidal_windows:
tidal_window = ax_left.fill(
[0, 0, 0, 0],
[ax_left.get_ylim()[0], ax_left.get_ylim()[1], ax_left.get_ylim()[1], ax_left.get_ylim()[0]],
facecolor="limegreen",
alpha=0.25,
edgecolor="none",
zorder=0,
)
# Figure ticks
ax_left.set_xticks(ax_left.get_xticks())
ax_left.set_xticklabels(ax_left.get_xticklabels(), rotation=45, ha="right")
ax_left.xaxis.set_major_formatter(dates.DateFormatter("%Y-%m-%d %H:%M"))
# Figure axes
ax_left.set_xlabel("Date")
ax_left.set_ylabel("Net UKC [m]")
ax_right.set_ylabel("Current velocity [m/s]")
# Legend and title
if horizontal_restriction_type:
if horizontal_restriction_type == "Maximum":
ax_left.legend(
[
net_UKC,
minimum_required_net_ukc,
current_velocity,
critical_current_velocity,
vertical_tidal_window,
horizontal_tidal_window,
tidal_window,
],
[
"Net UKC",
"Required net UKC",
"Current velocity",
"Vertical tidal windows",
"Horizontal tidal windows",
"Resulting tidal windows",
],
frameon=False,
loc="upper left",
bbox_to_anchor=(1.1, 1.0),
)
else:
ax_left.legend(
[
net_UKC,
minimum_required_net_ukc,
current_velocity,
vertical_tidal_window,
horizontal_tidal_window,
tidal_window,
],
[
"Net UKC",
"Required net UKC",
"Current velocity",
"Vertical tidal windows",
"Horizontal tidal windows",
"Resulting tidal windows",
],
frameon=False,
loc="upper left",
bbox_to_anchor=(1.1, 1.0),
)
else:
ax_left.legend(
[net_UKC, minimum_required_net_ukc, vertical_tidal_window, tidal_window],
["Net UKC", "Required net UKC", "Vertical tidal windows", "Resulting tidal windows"],
frameon=False,
loc="upper left",
bbox_to_anchor=(1.05, 1.0),
)
ax_left.set_title(
f"Accessibility of {vessel.type}-class vessel with name {vessel.name} "
f"and {np.round(vessel.T, 2)}m draught and a length of {np.round(vessel.L)}m, sailing {vessel.bound}"
)
plt.show()
return tidal_accessibility, tidal_windows, ax_left, ax_right
return tidal_accessibility, tidal_windows