Source code for opentnsim.core.movable

"""
Mixin classes for movable objects.

The following classes are provided:
- Movable
- ContainerDependentMovable
"""
# package(s) for documentation, debugging, saving and loading
import logging
import warnings
import deprecated
from typing import Union

# math packages
import numpy as np

# spatial libraries
import pyproj
import shapely
import shapely.geometry
from shapely import Geometry
import networkx as nx
import simpy

# time packages
import datetime

# use OpenCLSim objects for core objects (identifiable is imported for later use)
import opentnsim.strategy
from openclsim.core import SimpyObject, Locatable, Log
from opentnsim.core.container import HasContainer
from opentnsim.energy.mixins import ConsumesEnergy
from opentnsim.graph.mixins import get_length_of_edge

# get logger
logger = logging.getLogger(__name__)


[docs] class Routable(SimpyObject): """Mixin class: Something with a route (networkx format) Parameters ---------- route: list list of node-IDs complete_path: list, optional ??? args, kwargs: passed to SimpyObject. Must at least contain parameter env: simpy.Environment. Attributes ----------- route: list list of node-IDs complete_path: list, optional ??? position_on_route: int index of position on the route """ def __init__(self, route, complete_path=None, *args, **kwargs): """Initialization""" # check env input env = kwargs.get("env") # if env is given and env is not None if env is not None: has_fg = hasattr(env, "FG") has_graph = hasattr(env, "graph") if has_fg and not has_graph: warnings.warn(".FG attribute has been renamed to .graph, please update your code", DeprecationWarning) env.graph = env.FG assert ( has_fg or has_graph ), "Routable expects `.graph` (a networkx graph) to be present as an attribute on the environment" # initialization super().__init__(*args, **kwargs) self.route = route # start at start of route self.position_on_route = 0 self.complete_path = complete_path self.check_attributes()
[docs] def check_attributes(self): """Check if all required attributes are set.""" # check if route is set if self.route is None or not isinstance(self.route, list): raise ValueError("Routable requires a route (list of node IDs) to be set") # check if env is set if not hasattr(self, "env") or not isinstance(self.env, simpy.Environment): raise ValueError("Routable requires an environment (simpy.Environment) to be set") # check if route is on graph if not all(node in self.graph.nodes for node in self.route): raise ValueError("Routable route must be on the graph")
@property def graph(self): """ Return the graph of the underlying environment. If it's multigraph cast to corresponding type If you want the multidigraph use the HasMultiGraph mixin """ graph = None if hasattr(self.env, "graph"): graph = self.env.graph else: raise ValueError("Routable expects .graph to be present on env") if isinstance(graph, nx.MultiDiGraph): return nx.DiGraph(graph) elif isinstance(graph, nx.MultiGraph): return nx.Graph(graph) return graph
[docs] @deprecated.deprecated(reason="Use Routable instead of Routeable") class Routeable(Routable): """Old name for Mixin class: renamed to Routable."""
[docs] class Movable(Locatable, Routable, Log): """Mixin class: Something can move. Used for object that can move with a fixed speed Parameters ---------- v: float speed of the object (in m/s) geometry: shapely.geometry.Point passed to Locatable. point used to track its current location node: str, optional passed to Locatable, route: list, optional passed to Routable, complete_path: list, optional passed to Routable, Attributes ---------- v: float speed of the object (in m/s) on_pass_edge_functions: list list of functions to call when passing an edge on_pass_node_functions: list list of functions to call when passing a node wsg84: pyproj.Geod used for distance computation req: simpy.Resource request resource request for passing nodes and edges. saved for using resources over various nodes and edges. resource: simpy.Resource resource used for passing nodes and edges. saved for using resources over various nodes and edges. Notes ----- This class can handle a basic graph in env.graph. This will result in a simple simulation. The class can also handle more complex simulations. For this, extra information is needed in the graph, such as: - Resources on nodes and edges, which can be requested and released. - Resources on nodes are saved in env.graph.nodes[node]["Resources"] - Resources on edges are saved in env.graph.edges[origin, destination]["Resources"]. - Several edges and nodes can have the same resource, which is usefull when a segment can only be used by one vessel at a time. - When using a digraph, make sure to assign the same resource to both directions of the edge. - Current on edges, which can be used to compute the speed of the vessel. - Current on edges is saved in env.graph.edges[origin, destination]["Info"]["Current"]. - Current can only be used in a directed graph (DiGraph). - Current is positive in the direction of the edge, and negative in the opposite direction. - Make sure to assign current to both directions of the edge in a digraph. (the negative and positive current) - Power information, which can be used to compute the speed of the vessel. - self must be a mixin of ConsumesEnergy. - self must have the attribute P_tot_given and must not be None. - general depth of fairway is saved in env.graph.edges[origin, destination]["Info"]["GeneralDepth"]. - width of fairway is saved in env.graph.edges[origin, destination]["Info"]["Width"]. If not given, we use 150 m. """ def __init__(self, v: float, *args, **kwargs): """Initialization""" super().__init__(*args, **kwargs) self.v = v self.distance = 0 self.on_pass_node_functions = [] self.on_pass_edge_functions = [] self.on_complete_pass_edge_functions = [] self.on_look_ahead_to_node_functions = [] self.wgs84 = pyproj.Geod(ellps="WGS84") self._check_attributes() # resource memory for passing nodes and edges self.req = None self.resource = None # keep track of distance travelled on edge self.distance_left_on_edge = np.nan def _check_attributes(self): """Check if all required attributes are set.""" # each node on route should have a geometry geoms = nx.get_node_attributes(self.graph, "geometry") if not all(node in geoms for node in self.route): raise ValueError( "Nodes on route must have a geometry attribute. Missing geometries for nodes: {}".format( [node for node in self.route if node not in geoms] ) ) @property def current_node(self) -> Union[str, None]: """Return the current node on the route based on self.position_on_route.""" if 0 <= self.position_on_route < len(self.route): return self.route[self.position_on_route] else: return None @property def next_node(self) -> Union[str, None]: """Return the next node on the route based on self.position_on_route.""" if 0 <= self.position_on_route < len(self.route) - 1: return self.route[self.position_on_route + 1] else: return None @property def route_ahead(self): """Return the remaining route ahead of the current position.""" if 0 <= self.position_on_route < len(self.route): return self.route[self.position_on_route :] else: return []
[docs] def determine_route_to_target_node(self, target_node: str): """Determine the route to the target node. Parameters ---------- target_node: str The target node to determine the route to. Returns ------- list The route to the target node. """ if target_node not in self.route_ahead: raise ValueError("Target node must be in the remaining route ahead.") # get index of first occurrence of target_node in route_ahead try: idx = self.route_ahead.index(target_node) except ValueError: warnings.warn(f"No route found to waiting area") return [] # get route to target node route = self.route_ahead[: idx + 1] return route
[docs] def update_position(self, position_on_route: int): """Update the position on the route. Parameters ---------- position_on_route: int index of position on the route """ self.position_on_route = position_on_route self.geometry = nx.get_node_attributes(self.graph, "geometry")[self.current_node]
# TODO: Move was eerst een functie met 'destination' als argument, maar dat is nu niet meer het geval. Willen we dat dit weg is?
[docs] def move(self): """Moves vessel over the path defined by self.route. Assumption is that self.path is in the right order - vessel moves from route[0] to route[-1]. Yields ------ time it takes to travel the distance to the destination. """ # Check if vessel has arrival time and let vessel wait to start moving if hasattr(self, "metadata") and "arrival_time" in self.metadata: arrival_time = self.metadata['arrival_time'] current_time = datetime.datetime.fromtimestamp(self.env.now) delay = (arrival_time - current_time).total_seconds() yield self.env.timeout(delay) # Check if vessel is at correct location - if not, move to location yield from self._move_to_start() # look ahead to first node self.position_on_route = 0 yield from self.look_ahead_to_node(self.route[0]) # Move over the path and log every step for index, edge in enumerate(zip(self.route[:-1], self.route[1:])): # update current position self.update_position(index) yield from self.pass_node(self.current_node) # are we already at destination? if self.next_node == self.current_node: warnings.warn( "Route passes node {} twice consecutively..".format(self.current_node), UserWarning, ) continue yield from self.pass_edge(self.current_node, self.next_node) yield from self.complete_pass_edge(self.next_node) # we arrived at destination # update to new position self.update_position(index + 1) # look ahead to next node yield from self.look_ahead_to_node(self.current_node) # arrived at end of route. release resource if needed if self.req is not None: self._release_resource() logger.debug(" distance: " + "%4.2f" % self.distance + " m") if self.current_speed is not None: logger.debug(" sailing: " + "%4.2f" % self.current_speed + " m/s") logger.debug(" duration: " + "%4.2f" % ((self.distance / self.current_speed) / 3600) + " hrs") else: logger.debug(" current_speed: not set")
def _move_to_start(self): """Move vessel to the start of the route. Yields ------ The time it takes to move to the start of the route. """ # Check if vessel is at correct location - if not, move to location vessel_origin_location = nx.get_node_attributes(self.env.graph, "geometry")[self.route[0]] if self.geometry != vessel_origin_location: self.log_entry_v0("Sailing to start start", self.env.now, self.distance, self.geometry) start_location = self.geometry logger.debug("Origin: {orig}") logger.debug("Destination: {dest}") self.distance += self.wgs84.inv(start_location.x, start_location.y, vessel_origin_location.x, vessel_origin_location.y)[ 2 ] yield self.env.timeout(self.distance / self.current_speed) self.geometry = vessel_origin_location self.log_entry_v0("Sailing to start stop", self.env.now, self.distance, self.geometry)
[docs] def pass_node(self, node): """pass a node and call all on_pass_node_functions Parameters ---------- node: str the node to pass Yields ------ The time it takes to pass the node. """ # request resource if needed if "Resources" in self.graph.nodes[node].keys() and self.req is None: arrival = self.env.now # remember when we arrived at the node # request yield from self._request_resource(self.graph.nodes[node]["Resources"]) # we had to wait, log it if arrival != self.env.now: self.log_entry_v0( "Waiting to pass node {} start".format(node), arrival, self.distance, self.graph.nodes[node]["geometry"], ) self.log_entry_v0( "Waiting to pass node {} stop".format(node), self.env.now, self.distance, self.graph.nodes[node]["geometry"], ) # call all on_pass_node_functions for on_pass_node_function in self.on_pass_node_functions: yield from on_pass_node_function(node) # release resource if needed if self.req is not None: # only release if resource is not needed in the next edge if self.next_edge is None: self._release_resource() if "Resources" not in self.graph.edges[self.next_edge]: self._release_resource() elif self.graph.edges[self.next_edge]["Resources"] != self.resource: self._release_resource() else: pass
def _release_resource(self): """Release the current resource.""" self.resource.release(self.req) self.req = None self.resource = None def _request_resource(self, resource): """Request a resource for passing nodes and edges. Parameters ---------- resource: simpy.Resource the resource to request Yields ------ simpy.Resource request """ self.resource = resource self.req = self.resource.request() yield self.req @property def next_edge(self): """Return the next edge on the route. based on self.position_on_route. Returns ------- tuple(str, str) or None (origin, destination) of the next edge on the route. """ if self.position_on_route < len(self.route) - 1: return self.route[self.position_on_route], self.route[self.position_on_route + 1] else: return None
[docs] def pass_edge(self, origin, destination): """pass an edge and call all on_pass_edge_functions. Parameters ---------- origin: str the origin node of the edge destination: str the destination node of the edge Yields ------ The time it takes to pass the edge. """ edge = (origin, destination) edge_info = self.graph.edges[edge] orig = nx.get_node_attributes(self.graph, "geometry")[origin] dest = nx.get_node_attributes(self.graph, "geometry")[destination] distance = get_length_of_edge(self.graph, edge) self.distance_left_on_edge = distance # calculate velocity based on depth and power, if possible. self.v = self._compute_velocity_on_edge(origin, destination) # Check if the edge has current info # NB: positive current is directed from origin to destination current = self._get_current(origin, destination) # Wait for edge resources to become available # TODO: Misschien moeten we Resources ook onder Info hangen? if "Resources" in edge_info.keys() and self.req is None: arrival = self.env.now # remember when we arrived at the edge yield from self._request_resource(self.graph.edges[origin, destination]["Resources"]) # we had to wait, log it if arrival != self.env.now: self.log_entry_v0( "Waiting to pass edge {} - {} start".format(origin, destination), arrival, self.distance, orig, ) self.log_entry_v0( "Waiting to pass edge {} - {} stop".format(origin, destination), self.env.now, self.distance, orig, ) self.log_entry_v0( "Sailing from node {} to node {} start".format(self.current_node, self.next_node), self.env.now, self.distance, orig, ) # on pass edge functions for on_pass_edge_function in self.on_pass_edge_functions: yield from on_pass_edge_function(origin, destination) # default velocity based on current speed. timeout = self.distance_left_on_edge / (self.current_speed + current) yield self.env.timeout(timeout) self.distance += self.distance_left_on_edge self.log_entry_v0( "Sailing from node {} to node {} stop".format(self.current_node, self.next_node), self.env.now, self.distance, # TODO distance klopt nu niet na een sluismodule dest, ) self.geometry = dest # release resource if needed if "Resources" in edge_info.keys(): # only release if resource is not needed in the next node if "Resources" not in self.graph.nodes[destination].keys(): self._release_resource() elif self.resource != self.graph.nodes[destination]["Resources"]: self._release_resource() self.resource = None else: pass
[docs] def complete_pass_edge(self, destination): for gen in self.on_complete_pass_edge_functions: yield from gen(destination)
[docs] def look_ahead_to_node(self, destination): for gen in self.on_look_ahead_to_node_functions: yield from gen(destination)
def _get_current(self, origin, destination): """Get the current on the edge Parameters ---------- origin: str the origin node of the edge destination: str the destination node of the edge Returns ------- float the current on the edge (in m/s) """ if "Info" not in self.graph.edges[origin, destination].keys(): # no info on the current, return 0 return 0.0 elif "Current" not in self.graph.edges[origin, destination]["Info"].keys(): # no info on current, return 0 return 0.0 elif not isinstance(self.graph, nx.DiGraph): raise TypeError( "Current is only available on a DiGraph. Use a Digraph to use current in your calculations.", UserWarning, ) return 0.0 current = self.graph.edges[origin, destination]["Info"]["Current"] if (self.current_speed + current) <= 0: raise ValueError( f"Current {current} m/s is larger than current speed {self.current_speed} m/s. " "This will result in a negative speed, which is not allowed.", UserWarning, ) return current @property def current_speed(self): """return the current speed of the vessel""" return self.v def _compute_velocity_on_edge(self, origin, destination): """compute the velocity on an edge, based on the energy module and the depth. parameters ---------- origin: str the origin node of the edge destination: str the destination node of the edge """ edge = (origin, destination) if hasattr(self,'overruled_speed') and edge in self.overruled_speed.index: overruled_speed = self.overruled_speed.loc[edge] return overruled_speed # check if we have the energy mixin and ptot_given if not isinstance(self, ConsumesEnergy): return self.v elif self.P_tot_given is None: return self.v # determine the depth of the edge edge = self.graph.edges[origin, destination] try: depth = edge["Info"]["GeneralDepth"] except KeyError: raise ValueError( f"Edge {origin} - {destination} has no GeneralDepth in Info. " f"\n Add info or remove ConsumesEnergy mixin" ) # You can input more power than is realistic # There are two mechanisms that reduce the power given: # 1. The grounding speed: # TODO: Als we dit laten staan, moeten we get_upperbound_for_power2v ook checken en testen. # TODO get_upperbound_for_power2v heeft een width standaard 150. Is dat handig? edge_width = self._get_general_width(origin, destination) edge_width = edge_width if edge_width is not None else 150 # default width if not set upperbound = opentnsim.strategy.get_upperbound_for_power2v_optim(self, width=edge_width, depth=depth, margin=0) # Here the upperbound is used to estimate the actual velocity power_used = min(self.P_tot_given, upperbound) return self.power2v(self, edge, power_used) def _get_general_width(self, origin, destination): """Get the general width of the edge. Parameters ---------- origin: str the origin node of the edge destination: str the destination node of the edge Returns ------- float the general width of the edge (in m) """ if "Info" not in self.graph.edges[origin, destination].keys(): return None elif "GeneralWidth" not in self.graph.edges[origin, destination]["Info"].keys(): return None else: return self.graph.edges[origin, destination]["Info"]["GeneralWidth"]
[docs] class ContainerDependentMovable(Movable, HasContainer): """Mixin class: ContainerDependentMovable class Used for objects that move with a speed dependent on the container level Parameters ---------- compute_v: function a function, given the fraction the container is filled (in [0,1]), returns the current speed v: float passed to Movable, speed of the object (in m/s) geometry: shapely.geometry.Point passed to Movable. point used to track its current location node: str, optional passed to Movable, route: list, optional passed to Movable, complete_path: list, optional passed to Movable, Capacity: float passed to HasContainer, the capacity of the container, which may either be continuous (like water) or discrete (like apples) level: int, default=0 passed to HasContainer, level of the container at the beginning of the simulation total_requested: int, default=0 passed to HasContainer, total amount that has been requested at the beginning of the simulation Attributes ---------- compute_v: function a function, given the fraction the container is filled (in [0,1]), returns the current speed current_speed: float the current speed of the vessel (in m/s), based on the filling degree of the container """ def __init__(self, compute_v, *args, **kwargs): super().__init__(*args, **kwargs) """Initialization""" self.compute_v = compute_v self.wgs84 = pyproj.Geod(ellps="WGS84") @property def current_speed(self): """return the current speed of the vessel, based on the filling degree of the container""" return self.compute_v(self.filling_degree)