"""Contains the mixin for lock chambers. Also contains the parent class LockChamberOperatior."""
import simpy
import numpy as np
import pandas as pd
import datetime
import math
import functools
from opentnsim.core import HasResource, Identifiable, Log, HasLength, ExtraMetadata
from opentnsim.utils import time_to_numpy
from opentnsim.lock.calculations import calculate_z, levelling_time_equation
from opentnsim.lock.utils import _get_lock_operation_to_and_from_node
from opentnsim.vessel_traffic_service.hydrodanamic_data_manager import HydrodynamicDataManager
from opentnsim.output import HasOutput
from opentnsim.graph.mixins import HasMultiDiGraph, get_length_of_edge
from opentnsim.constants import knots
[docs]
class IsLockChamberOperator:
"""The lock chamber operator operates one chamber of the lock.
The operator communicates with the lock master through self.lock_master
to coordinate vessel operations and lock state changes.
"""
def __init__(self, lock_master, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock_master = lock_master
def _get_appropriate_waiting_area(self, direction):
"""Get appropriate waiting area through lock master.
Parameters
----------
direction : int
the direction of the vessel: 0 (A -> B) or 1 (B -> A)
"""
if self.lock_master:
return self.lock_master._get_appropriate_waiting_area(direction)
else:
raise AttributeError("No lock master set for accessing waiting areas")
@property
def closing_doors_in_between_arrivals(self):
"""Get policy on closing doors in between arrivals through lock master."""
if self.lock_master:
return self.lock_master.closing_doors_in_between_arrivals
else:
raise AttributeError("No lock master set for accessing door closing policy")
@property
def closing_doors_in_between_operations(self):
"""Get policy on closing doors in between operations through lock master."""
if self.lock_master:
return self.lock_master.closing_doors_in_between_operations
else:
raise AttributeError("No lock master set for accessing door closing policy")
[docs]
def close_doors_before_vessel_is_laying_still(self, operation_index):
"""Get policy on closing doors before vessel is laying still through lock master.
Parameters
----------
operation_index : int
index of the lock operation
"""
if self.lock_master:
return self.lock_master.close_doors_before_vessel_is_laying_still(operation_index)
else:
raise AttributeError("No lock master set for accessing door closing before vessel laying still policy")
@property
def _distance_to_lock(self):
"""Get distance to lock through lock master."""
if self.lock_master:
return self.lock_master._distance_to_lock
else:
raise AttributeError("No lock master set for accessing distance to lock")
@property
def vessel_planning(self):
"""Get vessel planning through lock master."""
if self.lock_master:
return self.lock_master.vessel_planning
else:
raise AttributeError("No lock master set for accessing vessel planning")
@property
def operation_planning(self):
"""Get operation planning through lock master."""
if self.lock_master:
return self.lock_master.operation_planning
else:
raise AttributeError("No lock master set for accessing operation planning")
@property
def waiting_area_B(self):
"""Get waiting area B through lock master."""
if self.lock_master:
return self.lock_master.waiting_area_B
else:
raise AttributeError("No lock master set for accessing waiting area B")
@property
def waiting_area_A(self):
"""Get waiting area A through lock master."""
if self.lock_master:
return self.lock_master.waiting_area_A
else:
raise AttributeError("No lock master set for accessing waiting area A")
[docs]
def register_vessel(self, vessel):
"""Register vessel through lock master.
Parameters
----------
vessel : type
a type including the following mixins: PassesLockComplex,Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
"""
if self.lock_master:
yield from self.lock_master.register_vessel(vessel)
[docs]
def calculate_sailing_time_to_waiting_area(self, vessel, direction, current_node=None, prognosis=False, overwrite=True):
"""Calculate sailing time to waiting area through lock master.
Parameters
----------
vessel : type
a type including the following mixins: PassesLockComplex,Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
direction : int
the direction of the vessel: 0 (A -> B) or 1 (B -> A)
current_node : str
node name (that has to be in the graph) on which the vessel is currently sailing, to navigate an edge should form an edge with the origin)
prognosis : bool
if the sailing time is calculated for prognosis purposes (True) or for actual sailing (False)
overwrite : bool
if existing sailing time in the vessel planning should be overwritten (True) or not (False)
"""
if self.lock_master:
return self.lock_master.calculate_sailing_time_to_waiting_area(vessel, direction, current_node, prognosis, overwrite)
[docs]
def calculate_vessel_departure_start_delay(self, vessel, operation_index, direction, prognosis=False):
"""Calculate vessel departure start delay through lock master.
Parameters
----------
vessel : type
a type including the following mixins: PassesLockComplex,Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
operation_index : int
index of the lock operation
direction : int
the direction of the vessel: 0 (A -> B) or 1 (B -> A)
prognosis : bool
if the delay is calculated for prognosis purposes (True) or for actual sailing (False)
overwrite : bool
if existing delay in the vessel planning should be overwritten (True) or not (False)
"""
if self.lock_master:
return self.lock_master.calculate_vessel_departure_start_delay(vessel, operation_index, direction, prognosis)
[docs]
def initiate_levelling(self, origin, destination, vessel=None, k=0):
"""
Initiates levelling process as function that can be added to a vessel TODO: preferably you don't want to add this process to the vessel but let the lock master / operator handle this
Parameters
----------
origin : str
node name (that has to be in the graph) on which the vessel is currently sailing, to navigate an edge should form an edge with the origin)
destination : str
node name (that has to be in the graph) on which the vessel is currently sailing to, to navigate an edge (should form an edge with the origin)
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
k : int
identifier of the edge between two nodes in a multidigraph network
"""
# TODO: Moeten de origin en destination hier naast elkaar liggen? Zoja, toevoegen in documentatie.
# determine if there is a lock on the edge
if "Lock" not in vessel.multidigraph.edges[origin, destination, k].keys():
return
# get the lock complex object
lock = vessel.multidigraph.edges[origin, destination, k]["Lock"][0]
# unpack the lock complex master's vessel and lock operation plannings
vessel_planning = lock.vessel_planning
operation_planning = lock.operation_planning
# determine the index of the vessel and the lock operation to which it is assigned to and the index of this operation
vessel_planning_index = vessel_planning[vessel_planning.id == vessel.id].iloc[-1].name
operation_index = vessel_planning.loc[vessel_planning_index, "operation_index"]
this_operation = operation_planning.loc[operation_index]
# determine the direction to the lock chamber is currently levelled to, and to which node the lock chamber will level
current_node = lock.node_open
if current_node == lock.start_node:
direction = 0
next_node = lock.end_node
else:
direction = 1
next_node = lock.start_node
# determine the vessels that are assigned to the lock operation to which the vessel is assigned
vessels = this_operation.vessels
# initiate levelling if vessel is the last assigned vessel in the lock
if vessel == vessels[-1]:
# liberate the vessels that were requested to wait for the last vessel
for other_vessel in vessels[:-1]:
terminate_waiting_time_for_other_vessel = False
while not terminate_waiting_time_for_other_vessel:
try:
yield lock.wait_for_other_vessels.put(other_vessel)
terminate_waiting_time_for_other_vessel = True
except simpy.Interrupt as e:
terminate_waiting_time_for_other_vessel = False
# Wait for other vessels to lay still
delay = (operation_planning.loc[operation_index].time_door_closing_start.round("s").to_pydatetime().timestamp() - lock.env.now)
if delay > 0:
yield lock.env.timeout(delay)
# Convert lock chamber
close_doors = True
if (lock.close_doors_before_vessel_is_laying_still
and this_operation.time_door_closing_start < vessel_planning.loc[vessel_planning_index, "time_lock_entry_stop"]):
close_doors = False
lock.operation_planning.loc[operation_index, 'status'] = 'unavailable'
yield from lock.convert_chamber(next_node, direction, operation_index=operation_index, vessel=vessel, close_doors=close_doors)
# Liberate waiting vessels in lock chamber
for other_vessel in vessels[:-1]:
terminate_levelling_for_other_vessel = False
while not terminate_levelling_for_other_vessel:
try:
yield lock.wait_for_levelling.put(other_vessel)
terminate_levelling_for_other_vessel = True
except simpy.Interrupt as e:
terminate_levelling_for_other_vessel = False
# If vessel is not the last assigned vessel
else:
# Wait for last assigned vessel of lock operation
waiting_for_other_vessels = True
last_location = vessel.logbook[-1]["Geometry"]
vessel.log_entry_v0("Waiting for other vessels in lock start", self.env.now, self.output.copy(), last_location)
while waiting_for_other_vessels:
try:
yield lock.wait_for_other_vessels.get(filter=(lambda request: request.id == vessel.id))
waiting_for_other_vessels = False
except simpy.Interrupt as e:
waiting_for_other_vessels = True
vessel.log_entry_v0("Waiting for other vessels in lock stop", self.env.now, self.output.copy(),last_location)
# Follow the converting lock chamber
vessel.log_entry_v0(
"Levelling start",
vessel.env.now,
vessel.output.copy(),
vessel.position_in_lock,
)
waiting_for_levelling = True
while waiting_for_levelling:
try:
yield lock.wait_for_levelling.get(filter=(lambda request: request.id == vessel.id))
waiting_for_levelling = False
except simpy.Interrupt as e:
waiting_for_levelling = True
vessel.log_entry_v0(
"Levelling stop",
vessel.env.now,
vessel.output.copy(),
vessel.position_in_lock,
)
# determine and yield sailing out delay
sailing_out_delay = lock.calculate_vessel_departure_start_delay(vessel, operation_index, direction).total_seconds()
delay_start = vessel.env.now
while sailing_out_delay:
try:
yield vessel.env.timeout(sailing_out_delay)
sailing_out_delay = 0
except simpy.Interrupt as e:
sailing_out_delay -= vessel.env.now - delay_start
[docs]
def prepare_next_lock_operation(self, lock, operation_index, direction, vessel):
"""Lock operator checks and (if required) initiates an empty lock operation or closes the doors if there is sufficient time with respect to the next operation's start time
Parameters
----------
lock : object
the lock chamber object generated with IsLockChamber
operation_index : int
index of the lock operation
direction : int
the direction of the lock operation: 0 (A -> B) or 1 (B -> A)
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
"""
# get variables of the last lock operation: do nothing if it is not the last vessel that is sailing out of the lock
operation_planning = self.lock_master.operation_planning
last_operation = operation_planning.loc[operation_index]
vessels_in_last_operation = last_operation.vessels
is_last_vessel_sailing_out = vessels_in_last_operation[-1] == vessel
if not is_last_vessel_sailing_out:
return
# get the current time, and the information of the next operation
current_time = pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now))
_, to_node = _get_lock_operation_to_and_from_node(self, 1 - direction)
next_operations = operation_planning[operation_planning.index > operation_index]
# determine if the doors can be closed after the considered vessel has sailed out of the lock
doors_can_be_closed = lock.determine_if_door_can_be_closed(vessel, direction, operation_index)
# determine if the next operation is empty
next_lockage_is_empty = False
if not next_operations.empty:
next_operation = next_operations.iloc[0]
if not len(next_operation.vessels):
next_lockage_is_empty = True
# an action should be done if the doors can be closed in between operations, or if the next lock operation is empty
if doors_can_be_closed and lock.closing_doors_in_between_operations:
door_closing_start_time = last_operation.time_potential_lock_door_closure_start
delay = np.max([self.sailing_time_before_closing_lock_doors, (door_closing_start_time - current_time).total_seconds()])
# close the doors with the correct delay
vessel.env.process(lock.close_door(delay=delay))
elif next_lockage_is_empty:
door_closing_start_time = next_operation.time_door_closing_start
closing_delay = np.max([self.sailing_time_before_closing_lock_doors, (door_closing_start_time - current_time).total_seconds()])
# if there is an empty lock operation and no policy that doors are closed in between operations is active -> close doors and convert chamber afterwards
if not lock.closing_doors_in_between_operations:
convert_chamber_delay = closing_delay
closing_doors = True
# if there is an empty lock operation but the policy that doors are closed in between operations is active -> close doors and convert chamber later, or convert chamber immediately if there is insufficient time
else:
next_operation = next_operations.iloc[1]
door_opening_start_time = next_operation.time_potential_lock_door_opening_stop
lock_operation_duration = self.determine_time_to_open_door(operation_index = vessel_operation_index + 1,
direction =1 - direction,
doors_required_to_be_open = door_opening_start_time)
opening_delay = (np.max([0, (door_opening_start_time - current_time).total_seconds()]) - lock_operation_duration.total_seconds())
if opening_delay > (closing_delay + self.lock_master.doors_closing_time):
convert_chamber_delay = opening_delay
closing_doors = False
vessel.env.process(lock.close_door(delay=closing_delay))
else:
convert_chamber_delay = closing_delay
closing_doors = True
# convert the lock chamber with the correct delay and if the doors should first be closed
vessel.env.process(lock.convert_chamber(operation_index = operation_index + 1,
new_level = to_node,
vessel = None,
close_doors = closing_doors,
delay = convert_chamber_delay,
direction = 1 - direction))
[docs]
def allow_vessel_to_sail_out_of_lock(self, origin, destination, vessel=None, k=0):
"""Allows the vessel to sail out of the lock chamber
Parameters
----------
origin : str
node name (that has to be in the graph) on which the vessel is currently sailing, to navigate an edge should form an edge with the origin)
destination : str
node name (that has to be in the graph) on which the vessel is currently sailing to, to navigate an edge (should form an edge with the origin)
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
k : int
identifier of the edge between two nodes in a multidigraph network
Yields
------
Vessel to sail to the end of the edge at which the lock chamber is located, and initiates new processes: i.e. closing doors or empty lock operation
"""
# checks if lock is present on the edge
if "Lock" not in vessel.multidigraph.edges[origin, destination, k].keys():
return
# unpacks the lock and vessel and operation planning
lock = vessel.multidigraph.edges[origin, destination, k]["Lock"][0]
vessel_planning = lock.vessel_planning
# determines information of the lock operation
vessel_planning_index = vessel_planning[vessel_planning.id == vessel.id].iloc[-1].name
operation_index = vessel_planning.loc[vessel_planning_index, "operation_index"]
direction = vessel_planning.loc[vessel_planning_index, "direction"]
# determines the distance from the vessel to the lock doors that have to be passed
distance_in_lock_from_position = lock.lock_length - vessel.distance_position_from_first_lock_doors
# determines the geometry objects of the lock based on the direction of the vessel TODO: function?
if not direction:
second_lock_doors_position = lock.location_lock_doors_B
remaining_distance = lock.distance_from_end_node_to_lock_doors_B
exit_geom = vessel.env.graph.nodes[lock.end_node]["geometry"]
else:
second_lock_doors_position = lock.location_lock_doors_A
remaining_distance = lock.distance_from_start_node_to_lock_doors_A
exit_geom = vessel.env.graph.nodes[lock.start_node]["geometry"]
# releasing the length of the lock TODO: only the yield statement should be kept: now it is prevented that vessels need to wait to put back their length, but in principle this should not occur although it sometimes occurs due to bugs
release_lock_access = False
while not release_lock_access:
try:
yield lock.length.put(vessel.L)
release_lock_access = True
except simpy.Interrupt as e:
release_lock_access = True
# determine the waiting time to sail out of the lock TODO: have another algorithm to determine this time: now the vessel planning is used, but this should be prevented -> the lock planning might be off by a few seconds/minutes due to uncertainties/errors in predictions and unforeseen circumstances
waiting_to_sail_out_time = (vessel_planning.loc[vessel_planning_index, "time_lock_departure_start"] -
pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now))).total_seconds()
# let the vessel wait to sail out of the lock (vessels may have to give other vessels priority to sail out to later sail out of the lock in a safe manner with sufficient distance to the vessel ahead -> i.e., if they sailed into the lock ahead of the considered vessel blocking the sailing out path)
waiting_to_sail_out_time_start = vessel.env.now
while waiting_to_sail_out_time > 0:
try:
yield vessel.env.timeout(waiting_to_sail_out_time)
waiting_to_sail_out_time = 0
except simpy.Interrupt as e:
waiting_to_sail_out_time -= vessel.env.now - waiting_to_sail_out_time_start
# log that the vessel can start sailing out of the lock (up to the lock doors)
vessel.log_entry_v0("Sailing to second lock doors start", vessel.env.now, vessel.output.copy(), vessel.position_in_lock,)
# determine the process of sailing to the lock doors that have to be passed (distance to these doors divided by the sailing out speed of the vessel)
vessel_speed = lock.vessel_sailing_speed_out_lock(vessel)
sailing_out_time = distance_in_lock_from_position / vessel_speed
sailing_out_start = vessel.env.now
while sailing_out_time:
try:
yield vessel.env.timeout(sailing_out_time)
sailing_out_time = 0
except simpy.Interrupt as e:
sailing_out_time -= vessel.env.now - sailing_out_start
# log that the vessel can stops sailing out of the lock (up to the lock doors)
vessel.log_entry_v0("Sailing to second lock doors stop", vessel.env.now, vessel.output.copy(), second_lock_doors_position,)
# remove functions specific to passing the lock chamber
remove_functions = [lock.allow_vessel_to_sail_into_lock, lock.initiate_levelling, lock.allow_vessel_to_sail_out_of_lock]
remove_on_pass_edge_functions = []
for index, function in enumerate(vessel.on_pass_edge_functions):
if isinstance(function, functools.partial):
if function.func in remove_functions:
remove_on_pass_edge_functions.append(function)
elif function in remove_functions:
remove_on_pass_edge_functions.append(function)
for function in remove_on_pass_edge_functions:
vessel.on_pass_edge_functions.remove(function)
# determine if the lock has to be levelled
self.prepare_next_lock_operation(lock, operation_index, direction, vessel)
# log that sailing out of the lock complex is starting
vessel.log_entry_v0("Sailing to lock complex exit start", vessel.env.now, vessel.output.copy(), second_lock_doors_position)
# let the vessel sail to the end of the lock complex
vessel_speed = lock.vessel_sailing_out_speed(vessel, direction)
sailing_out_time = remaining_distance / vessel_speed
sailing_out_start = vessel.env.now
while sailing_out_time:
try:
yield vessel.env.timeout(sailing_out_time)
sailing_out_time = 0
except simpy.Interrupt as e:
sailing_out_time -= vessel.env.now - sailing_out_start
remaining_sailing_distance = vessel_speed * sailing_out_time
sailing_out_time = remaining_sailing_distance / vessel.current_speed
# log that sailing out of the lock complex is stopping and set that no distance has to be sailed along the edge (vessel is at end of lock complex)
vessel.log_entry_v0("Sailing to lock complex exit stop", vessel.env.now, vessel.output.copy(), exit_geom,)
vessel.distance_left_on_edge = 0
[docs]
def allow_vessel_to_sail_into_lock(self, origin, destination, vessel=None, k=0):
"""Allows the vessel to sail into the lock chamber
Parameters
----------
origin : str
node name (that has to be in the graph) on which the vessel is currently sailing, to navigate an edge should form an edge with the origin)
destination : str
node name (that has to be in the graph) on which the vessel is currently sailing to, to navigate an edge (should form an edge with the origin)
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
k : int
identifier of the edge between two nodes in a multidigraph network
"""
# checks if lock is present on the edge
if "Lock" not in vessel.multidigraph.edges[origin, destination, k].keys():
return
# unpacks the lock and vessel and operation planning
lock = vessel.multidigraph.edges[origin, destination, k]["Lock"][0]
vessel_planning = lock.vessel_planning
# determines information of the lock operation
vessel_planning_index = vessel_planning[vessel_planning.id == vessel.id].iloc[-1].name
operation_index = vessel_planning.loc[vessel_planning_index, "operation_index"]
direction = vessel_planning.loc[vessel_planning_index, "direction"]
current_time = vessel.env.now
# determines the geometry objects of the lock based on the direction of the vessel TODO: function?
waiting_area = lock._get_appropriate_waiting_area(direction)
distance_to_lock = lock._distance_to_lock(direction)
if not direction:
first_lock_door_position = lock.location_lock_doors_A
else:
first_lock_door_position = lock.location_lock_doors_B
# correct the distance to the lock doors if the vessel is in the waiting area, located at the same edge of the lock
lock_start_node, lock_end_node = _get_lock_operation_to_and_from_node(self, direction)
if (lock_start_node, lock_end_node) == waiting_area.edge:
distance_to_lock -= waiting_area.distance_from_edge_start
# log the start of sailing to the lock doors
last_position_vessel = vessel.logbook[-1]["Geometry"]
vessel.log_entry_v0("Sailing to first lock doors start", vessel.env.now, vessel.output.copy(), last_position_vessel,)
# let vessel sail to the lock doors
vessel_speed = lock.vessel_sailing_in_speed(vessel, direction)
remaining_sailing_time = distance_to_lock / vessel_speed
while remaining_sailing_time > 0:
try:
yield vessel.env.timeout(remaining_sailing_time)
remaining_sailing_time = 0
except simpy.Interrupt as e:
remaining_sailing_time -= vessel.env.now - current_time
remaining_sailing_distance = vessel_speed * remaining_sailing_time
remaining_sailing_time = remaining_sailing_distance / vessel.current_speed
# vessel entering now the lock -> delete the overruled speeds imposed on the vessel
vessel.overruled_speed = vessel.overruled_speed.iloc[0:0]
# claim the lock length (this should not lead to waiting time)
yield lock.length.get(vessel.L)
# log the stop of sailing to the lock doors
vessel.log_entry_v0("Sailing to first lock doors stop", vessel.env.now, vessel.output.copy(), first_lock_door_position,)
# Checks if door should be closed intermediately
vessel_planning_index = vessel_planning[vessel_planning.id == vessel.id].iloc[-1].name
# calculate delay to close doors
current_time = pd.Timestamp(datetime.datetime.fromtimestamp(vessel.env.now))
delay_to_close_doors = vessel_planning.loc[vessel_planning_index, "time_potential_lock_door_closure_start"] - current_time
# close doors if doors can be closed in between vessel arrivals or if vessel is last vessel to enter the lock
doors_can_be_closed_between_vessel_arrivals = lock.determine_if_door_can_be_closed(vessel, direction, operation_index, between_arrivals=True)
if lock.close_doors_before_vessel_is_laying_still and doors_can_be_closed_between_vessel_arrivals:
vessel.env.process(lock.close_door(delay=delay_to_close_doors.total_seconds()))
# log the start of sailing to the position within the lock chamber
vessel.log_entry_v0("Sailing to position in lock start", vessel.env.now, vessel.output.copy(), first_lock_door_position, )
# determine position in the lock chamber and distance to sail to this location
vessel.distance_position_from_first_lock_doors = lock.length.level + 0.5 * vessel.L
if not direction:
distance_to_position_in_lock = lock.distance_from_start_node_to_lock_doors_A + vessel.distance_position_from_first_lock_doors
else:
distance_to_position_in_lock = lock.distance_from_end_node_to_lock_doors_B + vessel.distance_position_from_first_lock_doors
vessel.position_in_lock = vessel.env.vessel_traffic_service.provide_location_over_edges(lock_start_node, lock_end_node, distance_to_position_in_lock)
# let vessel sail to the assigned location in the lock chamber
vessel_speed = lock.vessel_sailing_speed_in_lock(vessel)
remaining_sailing_time = vessel.distance_position_from_first_lock_doors / vessel_speed
while remaining_sailing_time > 0:
try:
yield vessel.env.timeout(remaining_sailing_time)
remaining_sailing_time = 0
except simpy.Interrupt as e:
remaining_sailing_time -= vessel.env.now - start_sailing
# log the stop of the sailing event to the assigned locaiton in the lock chamber
vessel.log_entry_v0("Sailing to position in lock stop", vessel.env.now, vessel.output.copy(), vessel.position_in_lock,)
# close doors if doors can be closed between vessel arrivals and doors have not already been closed before
doors_can_be_closed_between_vessel_arrivals = lock.determine_if_door_can_be_closed(vessel, direction, operation_index, between_arrivals=True)
if not lock.close_doors_before_vessel_is_laying_still and doors_can_be_closed_between_vessel_arrivals:
vessel.env.process(lock.close_door())
[docs]
def convert_chamber(self, new_level, direction, operation_index=None, vessel=None, close_doors=True, delay=0.0):
"""
Converts the lock chamber and logs this event. TODO: attribute for lock operator
Parameters
----------
new_level : str
node that represents the side at which the lock is currently levelled
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
close_doors : bool
if the doors have to be closed: yes (True) or no (False)
delay : float
a delay before lock conversion [s]
direction : int
the direction of the vessel: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
Yields
------
The conversion of the lock chamber
"""
if operation_index is not None:
self.lock_master.operation_planning.loc[operation_index, "status"] = "unavailable"
# if there is a delay -> yield time out
start_delay = self.env.now
while delay > 0:
try:
yield self.env.timeout(delay)
delay = 0
except simpy.Interrupt as e:
delay -= self.env.now - start_delay
# close the doors or make sure that lock is not performing another process
if close_doors:
yield from self.close_door(delay=delay)
else:
hold_door_A = self.door_A.request()
hold_levelling = self.levelling.request()
hold_door_B = self.door_B.request()
yield hold_door_A
yield hold_levelling
yield hold_door_B
self.door_A.release(hold_door_A)
self.levelling.release(hold_levelling)
self.door_B.release(hold_door_B)
# level lock and open the doors afterwards
yield from self.level_lock(new_level, direction, operation_index=operation_index, vessel=vessel)
yield from self.open_door()
[docs]
def close_door(self, delay=0.0):
"""
Lock operator closes the lock doors TODO: attribute for lock operator
Parameters
----------
delay : float
a delay before door opening [s]
Yields
------
The closing of the door
"""
# if there is a delay -> yield time out
start_delay = self.env.now
while delay > 0:
try:
yield self.env.timeout(delay)
delay = 0
except simpy.Interrupt as e:
delay -= self.env.now - start_delay
# make sure that all lock elements are requested, so only one process is occurring
hold_door_A = self.door_A.request()
hold_levelling = self.levelling.request()
hold_door_B = self.door_B.request()
yield hold_door_A
yield hold_levelling
yield hold_door_B
# log the start of the event
self.log_entry_v0("Lock doors closing start", self.env.now, self.output.copy(), self.node_open)
# timeout event of the doors closing
remaining_doors_closing_time = self.doors_closing_time
start_time_closing = self.env.now
while remaining_doors_closing_time:
try:
yield self.env.timeout(remaining_doors_closing_time)
remaining_doors_closing_time = 0
except simpy.Interrupt as e:
remaining_doors_closing_time -= self.env.now - start_time_closing
# set water level to the side at which the door has been closed
time = np.datetime64(datetime.datetime.fromtimestamp(self.env.now))
if self.node_open == self.start_node:
node = self.start_node
else:
node = self.end_node
time_index = HydrodynamicDataManager()._get_time_index_of_hydrodynamic_data(
self.env.vessel_traffic_service.hydrodynamic_information_path, time
)
new_water_level = HydrodynamicDataManager()._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, time, node, "Water level"
)
self.water_level[time_index:] = new_water_level
# log the end of the event
self.log_entry_v0("Lock doors closing stop", self.env.now, self.output.copy(), self.node_open)
if self.node_open == self.start_node:
self.door_A_open = False
else:
self.door_B_open = False
# release all lock elements that were requested, so the next process can start
self.door_A.release(hold_door_A)
self.levelling.release(hold_levelling)
self.door_B.release(hold_door_B)
[docs]
def level_lock(self, new_level, direction, operation_index=None, vessel=None):
"""
Lock operator levels the water level of the lock chamber to the harbour side of the direction of the lock operation TODO: attribute for lock operator
new_level : str
node of the edge of lock complex to which the lock chamber is levelling
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
direction : int
the direction of the vessel: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
same_direction : bool
Yields
------
Levelling of the lock chamber
"""
# make sure that all lock elements are requested, so only one process is occurring
hold_door_A = self.door_A.request()
hold_levelling = self.levelling.request()
hold_door_B = self.door_B.request()
yield hold_door_A
yield hold_levelling
yield hold_door_B
# determine the levelling time
levelling_time, _, _ = self.determine_levelling_time(t_start=self.env.now, direction=direction, operation_index=operation_index)
# log the start of the event
if vessel is not None:
vessel.log_entry_v0(
"Levelling start",
vessel.env.now,
vessel.output.copy(),
vessel.position_in_lock,
)
self.log_entry_v0(
"Lock chamber converting start",
self.env.now,
self.output.copy(),
self.node_open,
)
# set new node to which the doors will be opened
self.node_open = new_level
# timeout
remaining_levelling_time = levelling_time
start_levelling = self.env.now
while remaining_levelling_time:
try:
yield self.env.timeout(remaining_levelling_time)
remaining_levelling_time = 0
except simpy.Interrupt as e:
remaining_levelling_time -= self.env.now - start_levelling
# log the end of the event
self.log_entry_v0(
"Lock chamber converting stop",
self.env.now,
self.output.copy(),
self.node_open,
)
if vessel is not None:
vessel.log_entry_v0(
"Levelling stop",
vessel.env.now,
vessel.output.copy(),
vessel.position_in_lock,
)
# release all lock elements that were requested, so the next process can start
self.door_A.release(hold_door_A)
self.levelling.release(hold_levelling)
self.door_B.release(hold_door_B)
[docs]
def open_door(self, to_level=None, vessel=None, delay=0.0):
"""
Lock operator opens the lock doors TODO: attribute for lock operator
Parameters
----------
to_level : str
node of the edge of lock complex to which the lock chamber opens
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
delay : float
a delay before door opening
Yields
------
The opening of the door
"""
# if there is a delay -> yield time out
start_delay = self.env.now
while delay > 0:
try:
yield self.env.timeout(delay)
delay = 0
except (
simpy.Interrupt
) as e: # if there is a delay -> yield time out with new delay (remaining delay added with a delay equal to the exception)
delay -= self.env.now - start_delay
if vessel is not None:
if e.cause is not None:
delay += float(e.cause)
# delete attribute as form of communication of the vessel TODO: a bit complex, better do it in another way
if vessel is not None:
delattr(vessel, "door_open_request")
hydromanager = HydrodynamicDataManager()
# determine the water level in the lock chamber
time = np.datetime64(datetime.datetime.fromtimestamp(self.env.now))
time_index = hydromanager._get_time_index_of_hydrodynamic_data(
self.env.vessel_traffic_service.hydrodynamic_information_path, time
)
wlev_chamber = self.water_level[time_index]
# determine to_level
if to_level is None:
to_level = self.node_open
# determine the water level in the harbour
wlev_harbour = hydromanager._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, time, to_level, "Water level"
)
# determine the direction to which the vessels are sailing out
if to_level == self.start_node:
direction = 1
else:
direction = 0
# if the water levels in the chamber and harbour are not aligned -> level lock again
if not math.isnan(wlev_chamber) and not math.isnan(wlev_harbour) and np.abs(wlev_chamber - wlev_harbour) >= 0.1:
yield from self.level_lock(to_level, direction=direction)
else:
self.node_open = to_level
time = np.datetime64(datetime.datetime.fromtimestamp(self.env.now))
time_index = hydromanager._get_time_index_of_hydrodynamic_data(
self.env.vessel_traffic_service.hydrodynamic_information_path, time
)
wlev_series_node_door_open = hydromanager._get_hydrodynamic_data_series(
self.env.vessel_traffic_service.hydrodynamic_information_path, time, self.node_open, "Water level"
)
self.water_level[time_index:] = wlev_series_node_door_open
# make sure that all lock elements are requested, so only one process is occurring
hold_door_A = self.door_A.request()
hold_levelling = self.levelling.request()
hold_door_B = self.door_B.request()
yield hold_door_A
yield hold_levelling
yield hold_door_B
# log the process start
self.log_entry_v0("Lock doors opening start", self.env.now, self.output.copy(), self.node_open)
# timeout
remaining_doors_opening_time = self.doors_opening_time
start_time_opening = self.env.now
while remaining_doors_opening_time:
try:
yield self.env.timeout(remaining_doors_opening_time)
remaining_doors_opening_time = 0
except simpy.Interrupt as e:
remaining_doors_opening_time -= self.env.now - start_time_opening
# log the process stop
self.log_entry_v0(
"Lock doors opening stop",
self.env.now,
self.output.copy(),
self.node_open,
)
# determine which side the door is open to
if self.node_open == self.start_node:
self.door_A_open = True
else:
self.door_B_open = True
# release all lock elements that were requested, so the next process can start
self.door_A.release(hold_door_A)
self.levelling.release(hold_levelling)
self.door_B.release(hold_door_B)
[docs]
def minimum_delay_to_close_doors(self):
"""
Calculates the time delay (in seconds) between when the last vessel has entered the lock and when the lock doors can be closed
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
Returns
-------
minimum_delay_to_close_doors : pd.Timedelta
the minimum time delay that the lock doors can be closed after a vessel has entered the lock
"""
minimum_delay_to_close_doors = pd.Timedelta(seconds=self.sailing_time_before_closing_lock_doors)
return minimum_delay_to_close_doors
[docs]
def minimum_advance_to_open_doors(self):
"""
Determines the minimum time in advance that a lock door should be opened
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
direction : int
the direction of the vessel: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
Returns
-------
minimum_advance_to_open_doors : pd.Timedelta
the minimum time in advance that a lock door should be opened [s]
"""
minimum_advance_to_open_doors = pd.Timedelta(seconds=self.sailing_time_before_opening_lock_doors)
# minimum_advance_to_open_doors += pd.Timedelta(seconds=vessel.L/self.vessel_sailing_in_speed(vessel,direction))
# TODO: take into account the vessels' bows and sterns to determine the time before and after which the door should be respectively opened and closed
return minimum_advance_to_open_doors
[docs]
def determine_if_door_can_be_closed(self, vessel, direction, operation_index, between_arrivals=False):
"""
Determines if the doors can be closed in between operations or vessel arrivals
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
operation_index : int
index of the lock operation
direction : int
the direction of the lock operation: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
between_arrivals : bool
if the function is run to determine if the doors can be closed in between vessel arrivals (True) or not (False)
Returns
-------
doors_can_be_closed : bool
doors can be closed (True) or not (False)
"""
operation_planning = self.lock_master.operation_planning
this_operation = operation_planning.loc[operation_index]
vessels_in_operation = this_operation.vessels
last_vessel_to_enter_lock = vessels_in_operation[-1] == vessel
doors_can_be_closed = False
if not between_arrivals and not self.lock_master.closing_doors_in_between_operations:
return doors_can_be_closed
if between_arrivals and (not self.closing_doors_in_between_arrivals or not last_vessel_to_enter_lock):
return doors_can_be_closed
doors_can_be_closed = True
operation_planning = self.lock_master.operation_planning
vessel_planning = self.lock_master.vessel_planning
if not between_arrivals:
last_time_doors_closed = operation_planning.loc[operation_index, "time_potential_lock_door_closure_start"]
else:
last_time_doors_closed = pd.Timestamp(datetime.datetime.fromtimestamp(self.env.now))
last_time_doors_closed += pd.Timedelta(seconds=self.doors_closing_time)
next_operations = operation_planning[operation_planning.index > operation_index]
vessel_index = operation_planning.loc[operation_index, "vessels"].index(vessel)
vessels_in_operation = operation_planning.loc[operation_index, "vessels"]
operation_step = 1
if between_arrivals and vessel_index != len(vessels_in_operation) - 1:
next_vessel = vessels_in_operation[vessel_index + 1]
next_vessel_planning_index = vessel_planning[vessel_planning.id == next_vessel.id].iloc[-1].name
doors_required_to_be_open = vessel_planning.loc[next_vessel_planning_index, "time_potential_lock_door_opening_stop"]
same_direction = True
elif not next_operations.empty:
next_operation = next_operations.iloc[0]
if not len(next_operation.vessels):
next_operation = next_operations.iloc[1]
operation_step += 1
doors_required_to_be_open = next_operation.time_potential_lock_door_opening_stop
same_direction = direction != next_operation.direction
else:
return doors_can_be_closed
if same_direction:
direction = 1 - direction
door_opening_time = self.determine_time_to_open_door(operation_index + operation_step, direction, doors_required_to_be_open)
if (
doors_required_to_be_open - door_opening_time < last_time_doors_closed
or doors_required_to_be_open - last_time_doors_closed
< self.minimum_time_between_operations_for_intermediate_door_closure
):
doors_can_be_closed = False
return doors_can_be_closed
[docs]
def determine_if_door_is_closed(self, vessel, operation_index, direction, first_in_lock=False, between_arrivals=False):
"""
Determines if the doors are closed
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
operation_index : int
index of the lock operation
direction : int
the direction of the lock operation: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
first_in_lock : bool
if the function is run for the first vessel assigned to the lock operation (True) or not (False)
between_arrivals : bool
if the function is run to determine if the doors can be closed in between vessel arrivals (True) or not (False)
Returns
-------
doors_are_closed : bool
doors are closed (True) or not (False)
doors_required_to_be_open : pd.Timestamp
moment in time when the doors need to be opened
operation_time : pd.Timedelta
the time duration required to perform the lock operation
"""
operation_planning = self.lock_master.operation_planning
vessel_planning = self.lock_master.vessel_planning
vessels = operation_planning.loc[operation_index, "vessels"]
vessel_index = vessels.index(vessel)
if between_arrivals and not self.closing_doors_in_between_arrivals:
return False, None, None
if not between_arrivals and not self.lock_master.closing_doors_in_between_operations:
return False, None, None
last_lockage_was_empty = False
if operation_index - 2 in operation_planning.index:
last_lockage_was_empty = len(operation_planning.loc[operation_index - 1, "vessels"]) == 0
if last_lockage_was_empty:
return False, None, None
if not first_in_lock and vessel_index:
previous_vessel_planning_index = (
vessel_planning[vessel_planning.id == operation_planning.loc[operation_index, "vessels"][vessel_index - 1].id]
.iloc[-1]
.name
)
last_time_doors_closed = vessel_planning.loc[
previous_vessel_planning_index, "time_potential_lock_door_closure_start"
] + pd.Timedelta(seconds=self.doors_closing_time)
elif operation_index == 0:
last_time_doors_closed = datetime.datetime.fromtimestamp(self.env.now)
else:
last_time_doors_closed = operation_planning.loc[
operation_index - 1
].time_potential_lock_door_closure_start + pd.Timedelta(seconds=self.doors_closing_time)
if first_in_lock:
doors_required_to_be_open = operation_planning.loc[operation_index, "time_potential_lock_door_opening_stop"]
else:
vessel_planning_index = vessel_planning[vessel_planning.id == vessel.id].iloc[-1].name
doors_required_to_be_open = vessel_planning.loc[vessel_planning_index, "time_potential_lock_door_opening_stop"]
operation_time = self.determine_time_to_open_door(operation_index, direction, doors_required_to_be_open)
doors_are_closed = False
if (
doors_required_to_be_open - operation_time > last_time_doors_closed
and doors_required_to_be_open - last_time_doors_closed
> self.minimum_time_between_operations_for_intermediate_door_closure
):
doors_are_closed = True
return doors_are_closed, doors_required_to_be_open, operation_time
[docs]
def determine_time_to_open_door(self, operation_index, direction, doors_required_to_be_open):
"""
Determines the time to finish the levelling process and the door opening process
Parameters
----------
operation_index : int
index of the lock operation
direction : int
the direction of the lock operation: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
doors_required_to_be_open : pd.Timestamp
the moment in time that the doors are required to be opened
Returns
-------
operation_time : pd.Timedelta
the time to finish the levelling process and the door opening process
"""
last_entering_time = doors_required_to_be_open - pd.Timedelta(seconds=self.doors_opening_time)
operation_start_time = doors_required_to_be_open - pd.Timedelta(seconds=self.doors_opening_time)
levelling_information = self.calculate_lock_operation_times(
operation_index=operation_index,
last_entering_time=last_entering_time,
start_time=operation_start_time,
direction=direction,
)
levelling_time = levelling_information["time_levelling_stop"] - levelling_information["time_levelling_start"]
wlev_before, wlev_after = levelling_information["wlev_A"], levelling_information["wlev_B"]
levelling_required = True
if abs(wlev_after - wlev_before) < 0.1:
levelling_required = False
if not levelling_required:
levelling_time = pd.Timedelta(seconds=0.0)
operation_time = levelling_time + pd.Timedelta(seconds=self.doors_opening_time)
return operation_time
[docs]
def determine_water_levels_before_and_after_levelling(self, levelling_start, levelling_stop, direction):
"""
Determines the water level at both sides of the lock
Parameters
----------
levelling_start : pd.Timestamp
the start time of the levelling process
levelling_stop : pd.Timestamp
the stop time of the levelling process
direction : int
the direction of the lock operation: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
Returns
------
wlev_A :
the water level at side A [m] before or after the levelling process (depending on the direction of the operation)
wlev_B :
the actual water level at side B [m] before or after the levelling process (depending on the direction of the operation)
"""
hydromanager = HydrodynamicDataManager()
t_start = np.datetime64(levelling_start)
t_stop = np.datetime64(levelling_stop)
if not direction:
wlev_A = hydromanager._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, t_start, self.start_node, "Water level"
)
wlev_B = hydromanager._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, t_stop, self.end_node, "Water level"
)
else:
wlev_A = hydromanager._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, t_stop, self.start_node, "Water level"
)
wlev_B = hydromanager._get_hydrodynamic_data_value(
self.env.vessel_traffic_service.hydrodynamic_information_path, t_start, self.end_node, "Water level"
)
return wlev_A, wlev_B
[docs]
def determine_levelling_time(self, t_start, direction, wlev_init=None, operation_index=0, prediction=False):
"""
Calculates the levelling time of a lock operation
Parameters
----------
t_start :
the start time of the levelling process
direction : int
the direction of the vessel: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
wlev_init : float
initial water level in the lock chamber
same_direction : bool
states if the levelling process is predicted in the same direction as the last lock operation (True) or not (False)
prediction : bool
states if the levelling process is only predicted (True) or executed (False)
Returns
-------
levelling_time : float
the time duration of the levelling process
t : list of float
the time series of the levelling process
z : list of float
the water level difference series over the time of the levelling process
"""
# TODO: functie maken om tstart om te zetten (met _to_array)
# TODO: Bij andere klasses altijd checken of iets een datetime.datetime is. En als dit niet zo is een error inbouwen of hem gelijk omzetten.
dt = self.time_step
t_final = 3600 # maximum levelling time has been set to an hour
t = np.arange(0, t_final + float(dt), float(dt))
t_start = time_to_numpy(t_start)
# if there is no hydrodynamic data included in the run, use the constant levelling time included in the lock object
# if there is no hydrodynamic data included in the run, use the constant levelling time included in the lock object
if self.env.vessel_traffic_service.hydrodynamic_information_path is None:
levelling_time = self.levelling_time
z = np.zeros(len(t))
return levelling_time, t, z
z, H_A, H_B = calculate_z(
t=t,
t_start=t_start,
direction=direction,
wlev_init=wlev_init,
operation_index=operation_index,
operation_planning=self.lock_master.operation_planning,
hydrodynamic_information_path=self.env.vessel_traffic_service.hydrodynamic_information_path,
start_node=self.start_node,
end_node=self.end_node,
node_open=self.node_open,
epoch=self.env.epoch,
)
# if a function has been included to predict the levelling time based on the water level difference: calculate the levelling time based on the initial water level difference
if callable(self.levelling_time):
levelling_time = self.levelling_time(z[0])
return levelling_time, t, z
# if no function has been included: compute the levelling time based on Eq. 4.64 of Ports and Waterways Open Textbook (https://books.open.tudelft.nl/home/catalog/book/204)
levelling_time, t, z = levelling_time_equation(
t=t,
z=z,
lock_length=self.lock_length,
lock_width=self.lock_width,
disch_coeff=self.disch_coeff,
gate_opening_time=self.gate_opening_time,
opening_area=self.opening_area,
t_start=t_start,
dt=dt,
direction=direction,
water_level_difference_limit_to_open_doors=self.lock_master.water_level_difference_limit_to_open_doors,
prediction=prediction,
H_A=H_A,
H_B=H_B,
)
# if this function was not ran as a prediction, but rather as the actual levelling event: update the water level time series of the lock chamber
if not prediction:
# TODO: de self.water_level wordt niet gebruikt, maar is wel leuk om als logging terug te zien na een berekening. Nadenken of we dat zo willen laten, of anders willen bijhouden.
t_final = t_start + np.timedelta64(int(levelling_time))
t_index_final = HydrodynamicDataManager()._get_time_index_of_hydrodynamic_data(
self.env.vessel_traffic_service.hydrodynamic_information_path, t_final
)
if not direction:
self.water_level[t_index_final:] = H_B[t_index_final:].copy()
else:
self.water_level[t_index_final:] = H_A[t_index_final:].copy()
return levelling_time, t, z
[docs]
class IsLockChamber(IsLockChamberOperator, HasResource, HasLength, Identifiable, Log, HasOutput, HasMultiDiGraph, ExtraMetadata):
"""Mixin class: lock complex has a lock chamber:
creates a lock chamber with a resource which is requested when a vessels wants to enter the area with limited capacity
Attributes
----------
vessel_sailing_speed_in_lock :
calculates the average speed in the lock when entering
vessel_sailing_speed_out_lock :
calculates the average speed in the lock when leaving
vessel_sailing_in_speed :
Calculates the average speed when sailing towards the lock chamber
vessel_sailing_out_speed :
Calculates the average speed when sailing away from the lock chamber
determine_levelling_time :
calculates the levelling time of a lock operation
"""
def __init__(
self,
start_node, # a string which indicates the location of the first pair of lock doors
end_node, # a string which indicates the location of the second pair of lock doors
lock_length, # a float which contains the length of the lock chamber
lock_width, # a float which contains the width of the lock chamber
lock_depth, # a float which contains the depth of the lock chamber
lock_master=None,
k=0, # a int which is the identifier of the edge between two nodes in a multidigraph network
distance_from_start_node_to_lock_doors_A=0.0, # a float that is the distance between the start_node of the edge and the lock doors A [m]
distance_from_end_node_to_lock_doors_B=0.0, # a float that is the distance between the end_node of the edge and the lock doors B [m]
registration_nodes=[], # a list of str with the node names at which the vessels request registration to the lock complex master
doors_opening_time=300.0, # a float which contains the time it takes to open the doors [s]
doors_closing_time=300.0, # a float which contains the time it takes to close the doors [s]
disch_coeff=0.4, # a float which contains the discharge coefficient of filling system
opening_area=12.0, # a float which contains the cross-sectional area of filling system [m^2]
opening_depth=None, # a float which contains the depth at which filling system is located [m^2]
speed_reduction_factor_lock_chamber=0.3, # a float that is the reduction factor for the vessel speed from its original speed when entering the lock
start_sailing_out_time_after_doors_have_been_opened=0.0, # a float that is the time that the vessel wait to start sailing out of the lock after the doors have been opened after levelling [s]
sailing_time_before_opening_lock_doors=600.0, # a float that is the time that the doors are opened before a vessel arrives at the doors [s]
sailing_time_before_closing_lock_doors=120.0, # a float that is the time that the doors are closed after a vessel has sailed through the doors [s]
minimum_time_between_operations_for_intermediate_door_closure=0.0, # a float that is the minimum required time between lock operations that the lock doors can be both closed (to reduce salt intrusion) [s]
sailing_distance_to_crossing_point=500.0, # a float that is the distance at which vessels can safely pass each other in front of the lock (last vessel that sails out and first vessel that sails in) [m]
passage_time_door=300.0, # a float [s] ?
sailing_in_time_gap_through_doors=180.0, # a float that is the time gap after which the next vessel can sail into the lock through the lock doors (after another vessel has sailed through to enter the lock) [s]
sailing_out_time_gap_through_doors=180.0, # a float that is the time gap after which the next vessel can sail out of the lock through the lock doors (after another vessel has sailed through to leave the lock)[s]
sailing_in_time_gap_after_berthing_previous_vessel=0.0, # a float that is the time gap after which the next vessel can sail into the lock (after another vessel has berthed) [s]
sailing_out_time_gap_after_berthing_previous_vessel=0.0, # a float that is the time gap after which the next vessel can sail out of the lock (after another vessel has deberthed) [s]
sailing_in_speed_A=2 * knots, # a float that is the speed at which the vessel sails into the lock to the sea side [m/s]
sailing_out_speed_A=2 * knots, # a float that is the speed at which the vessel sails out of the lock to the sea side [m/s]
sailing_in_speed_B=2 * knots, # a float that is the speed at which the vessel sails into the lock to the canal side [m/s]
sailing_out_speed_B=2
* knots, # a float that is the speed at which the vessel sails out of the lock to the canal side [m/s]
minimum_manoeuvrability_speed=2
* knots, # a float that is the minimum speed at which the vessel is still safely manoeuvrable [m/s]
levelling_time=600.0, # a float that fixates the levelling time [s]
time_step=10.0, # a float that is the integration time step to determine the levelling time [s]
gate_opening_time=60.0, # a float that is the time it takes for the levelling gate to open [s]
node_open=None, # a string that is the node name to which the lock was last levelled to at the initial time of simulation (either start_node or end_node)
conditions=None, # maybe obsolete ???
priority_rules=None, # maybe obsolete ???
used_as_one_way_traffic_regulation=False, # maybe obsolete ???
seed_nr=None, # a int for the seed to fix the determination of the node_open when node_open is None
*args,
**kwargs,
):
"""Initialization"""
# TODO: checken of alle inputs nodig zijn
# TODO: checken of alle parents nodig zijn
# TODO: parentklasse Lockmaster toevoegen
# set input parameters as properties
self.lock_length = lock_length
self.lock_width = lock_width
# TODO: @Floor lock_depth wordt niet gebruikt... Willen we die houden?
self.lock_depth = lock_depth
# TODO @Floor, is deze coefficient afhankelijk van de lock, of is dit een standaard coefficient die we ergens anders kunnen opslaan?
self.disch_coeff = disch_coeff #0.4
self.opening_area = opening_area
if opening_depth is None:
opening_depth = lock_depth/2
self.opening_depth = opening_depth
self.levelling_time = levelling_time
self.start_sailing_out_time_after_doors_have_been_opened = start_sailing_out_time_after_doors_have_been_opened
self.sailing_time_before_opening_lock_doors = sailing_time_before_opening_lock_doors
self.sailing_time_before_closing_lock_doors = sailing_time_before_closing_lock_doors
self.minimum_time_between_operations_for_intermediate_door_closure = minimum_time_between_operations_for_intermediate_door_closure
self.sailing_in_time_gap_after_berthing_previous_vessel = sailing_in_time_gap_after_berthing_previous_vessel
self.sailing_out_time_gap_after_berthing_previous_vessel = sailing_out_time_gap_after_berthing_previous_vessel
self.sailing_in_speed_A = sailing_in_speed_A
self.sailing_out_speed_A = sailing_out_speed_A
self.sailing_in_speed_B = sailing_in_speed_B
self.sailing_out_speed_B = sailing_out_speed_B
self.sailing_distance_to_crossing_point = sailing_distance_to_crossing_point
self.sailing_in_time_gap_through_doors = sailing_in_time_gap_through_doors
self.sailing_out_time_gap_through_doors = sailing_out_time_gap_through_doors
self.speed_reduction_factor = speed_reduction_factor_lock_chamber
self.passage_time_door = passage_time_door
self.start_node = start_node
self.end_node = end_node
self.k = k
self.minimum_manoeuvrability_speed = minimum_manoeuvrability_speed
self.node_open = node_open
self.conditions = conditions
self.time_step = time_step
self.priority_rules = priority_rules
self.registration_nodes = registration_nodes
self.gate_opening_time = gate_opening_time
self.door_A_open = True
self.door_B_open = True
if not registration_nodes:
self.registration_nodes = [start_node,end_node]
self.distance_from_start_node_to_lock_doors_A = distance_from_start_node_to_lock_doors_A
self.distance_from_end_node_to_lock_doors_B = distance_from_end_node_to_lock_doors_B
self.used_as_one_way_traffic_regulation = used_as_one_way_traffic_regulation
self.converting_chamber = False
# TODO: checken of de seed_nr en de random functie worden gebruikt.
if seed_nr is not None:
np.random.seed(seed_nr)
# TODO: als lockmaster een parent klasse is, zou lock_complex=self weg moeten kunnen.
# TODO: capaciteit = 100. checken of deze info overbodig is doordat er al een lock_length is. En anders kijken of de capaciteit op oneindig kan.
super().__init__(
lock_master=lock_master,
capacity=100,
length=lock_length,
remaining_length=lock_length,
*args,
**kwargs,
)
self._verify_node_AB()
if self.env.vessel_traffic_service.hydrodynamic_information_path is not None:
hydro_manager = HydrodynamicDataManager()
if isinstance(self.env.vessel_traffic_service.hydrodynamic_information_path,str):
hydro_manager.hydrodynamic_data = Dataset(self.env.vessel_traffic_service.hydrodynamic_information_path)
else:
hydro_manager.hydrodynamic_data = self.env.vessel_traffic_service.hydrodynamic_information
if isinstance(self.env.vessel_traffic_service.hydrodynamic_information_path, str):
hydro_manager.hydrodynamic_times = (
hydro_manager.hydrodynamic_data["TIME"][:].data.astype("timedelta64[m]")
+ self.env.vessel_traffic_service.hydrodynamic_start_time
)
else:
hydro_manager.hydrodynamic_times = hydro_manager.hydrodynamic_data["TIME"][:]
if self.node_open is None:
self.node_open = np.random.choice([start_node, end_node])
if self.closing_doors_in_between_operations:
self.door_A_open = False
self.door_B_open = False
elif self.node_open == self.start_node:
self.door_B_open = False
else:
self.door_A_open = False
# Geometry on edge
edge = (start_node, end_node, 0)
edge_info = self.multidigraph.edges[edge]
# TODO Checken of de distance bepalen werkt, en misschien automatiseren op basis van geometrie
# TODO: nodes verwijderen uit graaf als die precies op de sluis liggen. (wellicht als voorbewerking van de graaf)
# TODO: losse klasse maken van de lock-doors die locatable, hasresource (capacity=1) en identifiable is en eigenschap open/dicht heeft.
edge_aligned_with_edge_geometry = self.env.vessel_traffic_service.check_if_geometry_is_aligned_with_edge(edge)
start_node_geometry = start_node
end_node_geometry = end_node
distance_from_start_node_geometry_to_lock_doors_A = self.distance_from_start_node_to_lock_doors_A
distance_from_start_node_geometry_to_lock_doors_B = self.distance_from_start_node_to_lock_doors_A + lock_length
if not edge_aligned_with_edge_geometry:
start_node_geometry = end_node
end_node_geometry = start_node
distance_from_start_node_geometry_to_lock_doors_B = self.distance_from_end_node_to_lock_doors_B
distance_from_start_node_geometry_to_lock_doors_A = self.distance_from_end_node_to_lock_doors_B + lock_length
self.location_lock_doors_A = self.env.vessel_traffic_service.provide_location_over_edges(start_node_geometry, end_node_geometry, distance_from_start_node_geometry_to_lock_doors_A)
self.location_lock_doors_B = self.env.vessel_traffic_service.provide_location_over_edges(start_node_geometry, end_node_geometry, distance_from_start_node_geometry_to_lock_doors_B)
self.lock_pos_length = simpy.Container(self.env, capacity=lock_length, init=lock_length)
self.door_A= simpy.PriorityResource(self.env, capacity = 1)
self.levelling = simpy.Resource(self.env, capacity=1)
self.door_B = simpy.PriorityResource(self.env, capacity = 1)
# TODO: kijken of onderstaande eigenschappen nodig zijn. en capacity op infinity zetten als mogelijk.
self.wait_for_other_vessel_to_arrive = simpy.FilterStore(self.env,capacity=100000000)
self.wait_for_levelling = simpy.FilterStore(self.env,capacity=100000000)
self.wait_for_other_vessels = simpy.FilterStore(self.env,capacity=100000000)
# Operating
self.doors_opening_time = doors_opening_time
self.doors_closing_time = doors_closing_time
# TODO: maak ene functie _test_input() die bijvoorbeeld checkt of de nodes na elkaar liggen.
# Water level
assert start_node != end_node
time = np.datetime64(datetime.datetime.fromtimestamp(self.env.now))
wlev_series = HydrodynamicDataManager()._get_hydrodynamic_data_series(
self.env.vessel_traffic_service.hydrodynamic_information_path, time, self.node_open, "Water level"
)
self.water_level = wlev_series
# TODO: in functie zetten.
# TODO: In de documentatie zetten dat detecotr nodes op volgorde moeten komen. En ook een assert maken.
for registration_node, lock_edge in zip(self.registration_nodes,[(self.start_node,self.end_node,self.k),(self.end_node,self.start_node,self.k)]):
if 'Lock_registration_node' not in self.multidigraph.nodes[registration_node]:
self.multidigraph.nodes[registration_node]['Lock_registration_node'] = lock_edge
# Add to the graph:
# TODO: In losse functie (add_lock_to_graph)
if "graph" in dir(self.env):
k = sorted(self.multidigraph[self.start_node][self.end_node], key=lambda x: get_length_of_edge(self.multidigraph,(self.start_node, self.end_node, x)))[0]
# Add the lock to the edge or append it to the existing list
if "Lock" not in self.multidigraph.edges[self.start_node, self.end_node, k].keys():
self.multidigraph.edges[self.start_node, self.end_node, k]["Lock"] = [self]
self.multidigraph.edges[self.end_node, self.start_node, k]["Lock"] = [self]
else:
self.multidigraph.edges[self.start_node, self.end_node, k]["Lock"].append(self)
self.multidigraph.edges[self.end_node, self.start_node, k]["Lock"].append(self)
def _verify_node_AB(self):
"""Function to verify if nodes A and B are part of the graph, and have an edge between them."""
if self.start_node not in self.env.graph.nodes or self.end_node not in self.env.graph.nodes:
raise ValueError(
f"Lock chamber {self.name} has invalid node_A {self.start_node} or node_B {self.end_node} which are not part of the graph."
)
if not self.env.graph.has_edge(self.start_node, self.end_node):
raise ValueError(
f"Lock chamber {self.name} does not have an edge between node A {self.start_node} and node B {self.end_node}."
)
[docs]
def vessel_sailing_speed_in_lock(self, vessel):
"""
Calculates the average speed in the lock when entering
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
x_location_lock : float
logintudinal coordinate in the lock to which the vessel is assigned [m]
P_used : float
the breaking power used by the vessel to gradually decelerate [kW]
Returns
-------
speed : float
the average speed in the lock from the lock doors to the location of berthing
"""
# TODO: sailing_in_speed_B zou A of B moeten zijn. Checken of deze eigenschap vaker voorkomt.
speed = self.sailing_in_speed_B
if vessel.bound == 'inbound':
speed = self.sailing_in_speed_A
return speed
[docs]
def vessel_sailing_speed_out_lock(self, vessel):
"""
Calculates the average speed to in the lock when leaving
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
x_location_lock : float
logintudinal coordinate in the lock to which the vessel is assigned [m]
P_used : float
the breaking power used by the vessel to gradually decelerate [kW]
Returns
-------
speed : float
the average speed in the lock from the lock doors to the location of berthing
"""
speed = self.sailing_out_speed_A
if vessel.bound == 'inbound':
speed = self.sailing_out_speed_B
return speed
[docs]
def vessel_sailing_in_speed(self, vessel, direction):
"""
Calculates the average speed when sailing towards the lock chamber
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
x_location_lock : float
logintudinal coordinate in the lock to which the vessel is assigned [m]
P_used : float
the breaking power used by the vessel to gradually decelerate [kW]
Returns
-------
speed : float
the average speed in the lock from the lock doors to the location of berthing
"""
# determine the edge on which the vessel is sailing and the distance to the lock doors
edge = self._directional_edge(direction)
# determine the speed of the vessel over the edge
speed = vessel._compute_velocity_on_edge(edge[0], edge[1])
# if there is an overruled speed on the edge, use this speed
if "overruled_speed" in dir(vessel) and edge in vessel.overruled_speed.index:
speed = vessel.overruled_speed.loc[edge, "Speed"]
return speed
[docs]
def vessel_sailing_out_speed(self, vessel, direction, P_used=None, h0=17, until_crossing_point=False):
"""
Calculates the average speed when sailing away from the lock chamber
Parameters
----------
vessel : type
a type including the following parent-classes: PassesLockComplex, Identifiable, Movable, VesselProperties, ExtraMetadata, HasMultiDiGraph, HasOutput
direction : int
the direction of the vessel: 0 (direction from node_A to node_B) or 1 (direction from node_B to node_A)
P_used : float
the breaking power used by the vessel to gradually decelerate [kW]
until_crossing_point : bool
Returns
-------
speed : float
the average speed in the lock from the lock doors to the location of berthing
"""
# determine the edge on which the vessel is sailing and the distance to the lock doors
edge = self._directional_edge(direction)
# determine the speed of the vessel over the edge
speed = vessel._compute_velocity_on_edge(edge[0], edge[1])
# if there is an overruled speed on the edge, use this speed
if 'overruled_speed' in dir(vessel) and edge in vessel.overruled_speed.index:
speed = vessel.overruled_speed.loc[edge, 'Speed']
return speed
def _directional_edge(self, direction):
"""get the edge of the lock chamber in the correct direction"""
if not direction:
return (self.start_node, self.end_node)
else:
return (self.end_node, self.start_node)