diff --git a/b_asic/operation.py b/b_asic/operation.py index ff03277e995ceb32455234fd87921406144cdd67..7a01d8d8977a818011c0d7b11bbeeb440ea379e7 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -187,7 +187,7 @@ class Operation(GraphComponent, SignalSourceProvider): @property @abstractmethod - def input_signals(self) -> Iterable[Signal]: + def input_signals(self) -> Sequence[Signal]: """ Get all the signals that are connected to this operation's input ports, in no particular order. @@ -196,7 +196,7 @@ class Operation(GraphComponent, SignalSourceProvider): @property @abstractmethod - def output_signals(self) -> Iterable[Signal]: + def output_signals(self) -> Sequence[Signal]: """ Get all the signals that are connected to this operation's output ports, in no particular order. @@ -390,6 +390,15 @@ class Operation(GraphComponent, SignalSourceProvider): raise NotImplementedError + @abstractmethod + def _increase_time_resolution(self, factor: int) -> None: + raise NotImplementedError + + @abstractmethod + def _decrease_time_resolution(self, factor: int) -> None: + raise NotImplementedError + + class AbstractOperation(Operation, AbstractGraphComponent): """ Generic abstract operation base class. @@ -440,18 +449,20 @@ class AbstractOperation(Operation, AbstractGraphComponent): if src is not None: self._input_ports[i].connect(src.source) + # Set specific latency_offsets + if latency_offsets is not None: + self.set_latency_offsets(latency_offsets) + if latency is not None: # Set the latency for all ports initially. if latency < 0: raise ValueError("Latency cannot be negative") for inp in self.inputs: - inp.latency_offset = 0 + if inp.latency_offset is None: + inp.latency_offset = 0 for outp in self.outputs: - outp.latency_offset = latency - - # Set specific latency_offsets - if latency_offsets is not None: - self.set_latency_offsets(latency_offsets) + if outp.latency_offset is None: + outp.latency_offset = latency self._execution_time = execution_time @@ -626,7 +637,7 @@ class AbstractOperation(Operation, AbstractGraphComponent): return self._output_ports @property - def input_signals(self) -> Iterable[Signal]: + def input_signals(self) -> Sequence[Signal]: result = [] for p in self.inputs: for s in p.signals: @@ -634,7 +645,7 @@ class AbstractOperation(Operation, AbstractGraphComponent): return result @property - def output_signals(self) -> Iterable[Signal]: + def output_signals(self) -> Sequence[Signal]: result = [] for p in self.outputs: for s in p.signals: diff --git a/b_asic/port.py b/b_asic/port.py index 39dd4d980e7953c709387b9cad176dc34e73c717..33443860dca0f4219dad7854fe06e4c41d5a1af9 100644 --- a/b_asic/port.py +++ b/b_asic/port.py @@ -6,7 +6,7 @@ Contains classes for managing the ports of operations. from abc import ABC, abstractmethod from copy import copy -from typing import TYPE_CHECKING, Iterable, List, Optional +from typing import TYPE_CHECKING, List, Optional, Sequence from b_asic.graph_component import Name from b_asic.signal import Signal @@ -61,7 +61,7 @@ class Port(ABC): @property @abstractmethod - def signals(self) -> Iterable[Signal]: + def signals(self) -> Sequence[Signal]: """Return all connected signals.""" raise NotImplementedError @@ -168,7 +168,7 @@ class InputPort(AbstractPort): return 0 if self._source_signal is None else 1 @property - def signals(self) -> Iterable[Signal]: + def signals(self) -> Sequence[Signal]: return [] if self._source_signal is None else [self._source_signal] def add_signal(self, signal: Signal) -> None: @@ -238,7 +238,7 @@ class OutputPort(AbstractPort, SignalSourceProvider): return len(self._destination_signals) @property - def signals(self) -> Iterable[Signal]: + def signals(self) -> Sequence[Signal]: return self._destination_signals def add_signal(self, signal: Signal) -> None: diff --git a/b_asic/process.py b/b_asic/process.py index 53ccf882109cfde104a4814130557a1139fef1bf..e1d6db568f1d26cc6507f85b57dbde12c153292d 100644 --- a/b_asic/process.py +++ b/b_asic/process.py @@ -52,7 +52,13 @@ class OperatorProcess(Process): ------- object """ - super().__init__(start_time, operation.execution_time) + execution_time = operation.execution_time + if execution_time is None: + raise ValueError( + "Operation {operation!r} does not have an execution time" + " specified!" + ) + super().__init__(start_time, execution_time) self._operation = operation @@ -75,11 +81,11 @@ class MemoryVariable(Process): ) @property - def life_times(self) -> Tuple[int]: + def life_times(self) -> Tuple[int, ...]: return self._life_times @property - def read_ports(self) -> Tuple[InputPort]: + def read_ports(self) -> Tuple[InputPort, ...]: return self._read_ports @property @@ -108,11 +114,11 @@ class PlainMemoryVariable(Process): ) @property - def life_times(self) -> Tuple[int]: + def life_times(self) -> Tuple[int, ...]: return self._life_times @property - def read_ports(self) -> Tuple[int]: + def read_ports(self) -> Tuple[int, ...]: return self._read_ports @property diff --git a/b_asic/schedule.py b/b_asic/schedule.py index acaa2db70201eccdc86693c1083ebef8e16c30c9..a0ac495e63e2cba8b696f8f79e64a536b4ef0a30 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -7,7 +7,7 @@ Contains the schedule class for scheduling operations in an SFG. import io import sys from collections import defaultdict -from typing import Dict, List, Optional, Tuple +from typing import cast, Dict, List, Optional, Tuple import matplotlib.pyplot as plt from matplotlib.lines import Line2D @@ -16,7 +16,7 @@ from matplotlib.path import Path from matplotlib.ticker import MaxNLocator import numpy as np -from b_asic import OutputPort, Signal +from b_asic import Signal from b_asic._preferences import ( EXECUTION_TIME_COLOR, LATENCY_COLOR, @@ -24,6 +24,8 @@ from b_asic._preferences import ( SIGNAL_LINEWIDTH, ) from b_asic.graph_component import GraphID +from b_asic.operation import Operation +from b_asic.port import InputPort, OutputPort from b_asic.process import MemoryVariable, Process from b_asic.signal_flow_graph import SFG from b_asic.special_operations import Delay, Output @@ -39,7 +41,7 @@ class Schedule: _sfg: SFG _start_times: Dict[GraphID, int] - _laps: Dict[GraphID, List[int]] + _laps: Dict[GraphID, int] _schedule_time: int _cyclic: bool @@ -85,10 +87,10 @@ class Schedule: """Returns the current maximum end time among all operations.""" max_end_time = 0 for op_id, op_start_time in self._start_times.items(): - op = self._sfg.find_by_id(op_id) + op = cast(Operation, self._sfg.find_by_id(op_id)) for outport in op.outputs: max_end_time = max( - max_end_time, op_start_time + outport.latency_offset + max_end_time, op_start_time + cast(int, outport.latency_offset) ) return max_end_time @@ -108,15 +110,16 @@ class Schedule: ) -> Dict["OutputPort", Dict["Signal", int]]: ret = {} start_time = self._start_times[op_id] - op = self._sfg.find_by_id(op_id) + op = cast(Operation, self._sfg.find_by_id(op_id)) for output_port in op.outputs: output_slacks = {} - available_time = start_time + output_port.latency_offset + available_time = start_time + cast(int, output_port.latency_offset) for signal in output_port.signals: + destination = cast(InputPort, signal.destination) usage_time = ( - signal.destination.latency_offset - + self._start_times[signal.destination.operation.graph_id] + cast(int, destination.latency_offset) + + self._start_times[destination.operation.graph_id] + self._schedule_time * self._laps[signal.graph_id] ) output_slacks[signal] = usage_time - available_time @@ -136,18 +139,19 @@ class Schedule: def _backward_slacks( self, op_id: GraphID - ) -> Dict[OutputPort, Dict[Signal, int]]: + ) -> Dict[InputPort, Dict[Signal, int]]: ret = {} start_time = self._start_times[op_id] - op = self._sfg.find_by_id(op_id) + op = cast(Operation, self._sfg.find_by_id(op_id)) for input_port in op.inputs: input_slacks = {} - usage_time = start_time + input_port.latency_offset + usage_time = start_time + cast(int, input_port.latency_offset) for signal in input_port.signals: + source = cast(OutputPort, signal.source) available_time = ( - signal.source.latency_offset - + self._start_times[signal.source.operation.graph_id] + cast(int, source.latency_offset) + + self._start_times[source.operation.graph_id] - self._schedule_time * self._laps[signal.graph_id] ) input_slacks[signal] = usage_time - available_time @@ -180,7 +184,7 @@ class Schedule: return self._start_times @property - def laps(self) -> Dict[GraphID, List[int]]: + def laps(self) -> Dict[GraphID, int]: return self._laps @property @@ -205,7 +209,7 @@ class Schedule: k: factor * v for k, v in self._start_times.items() } for op_id in self._start_times: - self._sfg.find_by_id(op_id)._increase_time_resolution(factor) + cast(Operation, self._sfg.find_by_id(op_id))._increase_time_resolution(factor) self._schedule_time *= factor return self @@ -218,8 +222,8 @@ class Schedule: ret = [self._schedule_time, *self._start_times.values()] # Loop over operations for op_id in self._start_times: - op = self._sfg.find_by_id(op_id) - ret += [op.execution_time, *op.latency_offsets.values()] + op = cast(Operation, self._sfg.find_by_id(op_id)) + ret += [cast(int, op.execution_time), *op.latency_offsets.values()] # Remove not set values (None) ret = [v for v in ret if v is not None] return ret @@ -256,7 +260,7 @@ class Schedule: k: v // factor for k, v in self._start_times.items() } for op_id in self._start_times: - self._sfg.find_by_id(op_id)._decrease_time_resolution(factor) + cast(Operation, self._sfg.find_by_id(op_id))._decrease_time_resolution(factor) self._schedule_time = self._schedule_time // factor return self @@ -274,7 +278,7 @@ class Schedule: # Update input laps input_slacks = self._backward_slacks(op_id) for in_port, signal_slacks in input_slacks.items(): - tmp_usage = tmp_start + in_port.latency_offset + tmp_usage = tmp_start + cast(int, in_port.latency_offset) new_usage = tmp_usage % self._schedule_time for signal, signal_slack in signal_slacks.items(): new_slack = signal_slack + time diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index 3adf217e0476417e1a17c448740512e850e58da9..0c98df62aeb2fd2a9157988eeaa7c0a63d1c7910 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -11,13 +11,16 @@ from io import StringIO from numbers import Number from queue import PriorityQueue from typing import ( + cast, DefaultDict, + Deque, Dict, Iterable, List, MutableSet, Optional, Sequence, + Set, Tuple, Union, ) @@ -38,7 +41,7 @@ from b_asic.operation import ( Operation, ResultKey, ) -from b_asic.port import OutputPort, SignalSourceProvider +from b_asic.port import InputPort, OutputPort, SignalSourceProvider from b_asic.signal import Signal from b_asic.special_operations import Delay, Input, Output @@ -50,7 +53,7 @@ class GraphIDGenerator: _next_id_number: DefaultDict[TypeName, GraphIDNumber] - def __init__(self, id_number_offset: GraphIDNumber = 0): + def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)): """Construct a GraphIDGenerator.""" self._next_id_number = defaultdict(lambda: id_number_offset) @@ -89,10 +92,11 @@ class SFG(AbstractOperation): _graph_id_generator: GraphIDGenerator _input_operations: List[Input] _output_operations: List[Output] - _original_components_to_new: MutableSet[GraphComponent] + _original_components_to_new: Dict[GraphComponent, GraphComponent] _original_input_signals_to_indices: Dict[Signal, int] _original_output_signals_to_indices: Dict[Signal, int] _precedence_list: Optional[List[List[OutputPort]]] + _used_ids: Set[GraphID] = set() def __init__( self, @@ -100,8 +104,8 @@ class SFG(AbstractOperation): outputs: Optional[Sequence[Output]] = None, input_signals: Optional[Sequence[Signal]] = None, output_signals: Optional[Sequence[Signal]] = None, - id_number_offset: GraphIDNumber = 0, - name: Name = "", + id_number_offset: GraphIDNumber = GraphIDNumber(0), + name: Name = Name(""), input_sources: Optional[ Sequence[Optional[SignalSourceProvider]] ] = None, @@ -139,7 +143,9 @@ class SFG(AbstractOperation): self._components_dfs_order = [] self._operations_dfs_order = [] self._operations_topological_order = [] - self._graph_id_generator = GraphIDGenerator(id_number_offset) + self._graph_id_generator = GraphIDGenerator( + GraphIDNumber(id_number_offset) + ) self._input_operations = [] self._output_operations = [] self._original_components_to_new = {} @@ -154,8 +160,8 @@ class SFG(AbstractOperation): raise ValueError( f"Duplicate input signal {signal!r} in SFG" ) - new_input_op = self._add_component_unconnected_copy(Input()) - new_signal = self._add_component_unconnected_copy(signal) + new_input_op = cast(Input, self._add_component_unconnected_copy(Input())) + new_signal = cast(Signal, self._add_component_unconnected_copy(signal)) new_signal.set_source(new_input_op.output(0)) self._input_operations.append(new_input_op) self._original_input_signals_to_indices[signal] = input_index @@ -184,14 +190,14 @@ class SFG(AbstractOperation): # Setup output signals. if output_signals is not None: for output_index, signal in enumerate(output_signals): - new_output_op = self._add_component_unconnected_copy(Output()) + new_output_op = cast(Output, self._add_component_unconnected_copy(Output())) if signal in self._original_components_to_new: # Signal was already added when setting up inputs. - new_signal = self._original_components_to_new[signal] + new_signal = cast(Signal, self._original_components_to_new[signal]) new_signal.set_destination(new_output_op.input(0)) else: # New signal has to be created. - new_signal = self._add_component_unconnected_copy(signal) + new_signal = cast(Signal, self._add_component_unconnected_copy(signal)) new_signal.set_destination(new_output_op.input(0)) self._output_operations.append(new_output_op) @@ -207,7 +213,7 @@ class SFG(AbstractOperation): f"Duplicate output operation {output_op!r} in SFG" ) - new_output_op = self._add_component_unconnected_copy(output_op) + new_output_op = cast(Output, self._add_component_unconnected_copy(output_op)) for signal in output_op.input(0).signals: if signal in self._original_components_to_new: # Signal was already added when setting up inputs. @@ -233,7 +239,7 @@ class SFG(AbstractOperation): input_index, ) in self._original_input_signals_to_indices.items(): # Check if already added destination. - new_signal = self._original_components_to_new[signal] + new_signal = cast(Signal, self._original_components_to_new[signal]) if new_signal.destination is None: if signal.destination is None: raise ValueError( @@ -412,6 +418,8 @@ class SFG(AbstractOperation): # For each input_signal, connect it to the corresponding operation for port, input_operation in zip(self.inputs, self.input_operations): dest = input_operation.output(0).signals[0].destination + if dest is None: + raise ValueError("Missing destination in signal.") dest.clear() port.signals[0].set_destination(dest) # For each output_signal, connect it to the corresponding operation @@ -419,19 +427,21 @@ class SFG(AbstractOperation): self.outputs, self.output_operations ): src = output_operation.input(0).signals[0].source + if src is None: + raise ValueError("Missing soruce in signal.") src.clear() port.signals[0].set_source(src) return True @property - def input_operations(self) -> Iterable[Operation]: + def input_operations(self) -> Sequence[Operation]: """ Get the internal input operations in the same order as their respective input ports. """ return self._input_operations @property - def output_operations(self) -> Iterable[Operation]: + def output_operations(self) -> Sequence[Operation]: """ Get the internal output operations in the same order as their respective output ports. """ @@ -456,8 +466,8 @@ class SFG(AbstractOperation): for index, input_op in enumerate(self._input_operations) } output_op = self._output_operations[output_index] - queue = deque([output_op]) - visited = {output_op} + queue: Deque[Operation] = deque([output_op]) + visited: Set[Operation] = {output_op} while queue: op = queue.popleft() if isinstance(op, Input): @@ -494,12 +504,12 @@ class SFG(AbstractOperation): return self._graph_id_generator.id_number_offset @property - def components(self) -> Iterable[GraphComponent]: + def components(self) -> List[GraphComponent]: """Get all components of this graph in depth-first order.""" return self._components_dfs_order @property - def operations(self) -> Iterable[Operation]: + def operations(self) -> List[Operation]: """Get all operations of this graph in depth-first order.""" return self._operations_dfs_order @@ -624,7 +634,7 @@ class SFG(AbstractOperation): # Preserve the original SFG by creating a copy. sfg_copy = self() - output_comp = sfg_copy.find_by_id(output_comp_id) + output_comp = cast(Operation, sfg_copy.find_by_id(output_comp_id)) if output_comp is None: return None @@ -642,7 +652,7 @@ class SFG(AbstractOperation): ) for index, signal_in in enumerate(output_comp.output_signals): - destination = signal_in.destination + destination = cast(InputPort, signal_in.destination) signal_in.set_destination(component.input(index)) destination.connect(component.output(index)) @@ -655,7 +665,7 @@ class SFG(AbstractOperation): be raised. If no operation with the entered operation_id is found then returns None and does nothing. """ sfg_copy = self() - operation = sfg_copy.find_by_id(operation_id) + operation = cast(Operation, sfg_copy.find_by_id(operation_id)) if operation is None: return None @@ -672,7 +682,7 @@ class SFG(AbstractOperation): and operation.input(i).signals[0].source is not None ): in_sig = operation.input(i).signals[0] - source_port = in_sig.source + source_port = cast(OutputPort, in_sig.source) source_port.remove_signal(in_sig) operation.input(i).remove_signal(in_sig) for out_sig in outport.signals.copy(): @@ -809,7 +819,7 @@ class SFG(AbstractOperation): } # Maps number of input counts to a queue of seen objects with such a size. - seen_with_inputs_dict = defaultdict(deque) + seen_with_inputs_dict: Dict[int, Deque] = defaultdict(deque) seen = set() top_order = [] @@ -904,7 +914,7 @@ class SFG(AbstractOperation): def set_latency_of_type(self, type_name: TypeName, latency: int) -> None: """Set the latency of all components with the given type name.""" for op in self.find_by_type_name(type_name): - op.set_latency(latency) + cast(Operation, op).set_latency(latency) def set_execution_time_of_type( self, type_name: TypeName, execution_time: int @@ -912,7 +922,7 @@ class SFG(AbstractOperation): """Set the execution time of all components with the given type name. """ for op in self.find_by_type_name(type_name): - op.execution_time = execution_time + cast(Operation, op).execution_time = execution_time def set_latency_offsets_of_type( self, type_name: TypeName, latency_offsets: Dict[str, int] @@ -920,7 +930,7 @@ class SFG(AbstractOperation): """Set the latency offset of all components with the given type name. """ for op in self.find_by_type_name(type_name): - op.set_latency_offsets(latency_offsets) + cast(Operation, op).set_latency_offsets(latency_offsets) def _traverse_for_precedence_list( self, first_iter_ports: List[OutputPort] @@ -983,11 +993,11 @@ class SFG(AbstractOperation): original_op = op_stack.pop() # Add or get the new copy of the operation. if original_op not in self._original_components_to_new: - new_op = self._add_component_unconnected_copy(original_op) + new_op = cast(Operation, self._add_component_unconnected_copy(original_op)) self._components_dfs_order.append(new_op) self._operations_dfs_order.append(new_op) else: - new_op = self._original_components_to_new[original_op] + new_op = cast(Operation, self._original_components_to_new[original_op]) # Connect input ports to new signals. for original_input_port in original_op.inputs: @@ -1001,18 +1011,20 @@ class SFG(AbstractOperation): in self._original_input_signals_to_indices ): # New signal already created during first step of constructor. - new_signal = self._original_components_to_new[ + new_signal = cast(Signal, self._original_components_to_new[ original_signal - ] + ]) + new_signal.set_destination( new_op.input(original_input_port.index) ) + source = cast(OutputPort, new_signal.source) self._components_dfs_order.extend( - [new_signal, new_signal.source.operation] + [new_signal, source.operation] ) self._operations_dfs_order.append( - new_signal.source.operation + source.operation ) # Check if the signal has not been added before. @@ -1024,9 +1036,10 @@ class SFG(AbstractOperation): "Dangling signal without source in SFG" ) - new_signal = self._add_component_unconnected_copy( + new_signal = cast(Signal, self._add_component_unconnected_copy( original_signal - ) + )) + new_signal.set_destination( new_op.input(original_input_port.index) ) @@ -1041,15 +1054,16 @@ class SFG(AbstractOperation): original_connected_op in self._original_components_to_new ): + component = cast(Operation, self._original_components_to_new[ + original_connected_op + ]) # Set source to the already added operations port. new_signal.set_source( - self._original_components_to_new[ - original_connected_op - ].output(original_signal.source.index) + component.output(original_signal.source.index) ) else: # Create new operation, set signal source to it. - new_connected_op = ( + new_connected_op = cast(Operation, self._add_component_unconnected_copy( original_connected_op ) @@ -1075,18 +1089,20 @@ class SFG(AbstractOperation): in self._original_output_signals_to_indices ): # New signal already created during first step of constructor. - new_signal = self._original_components_to_new[ + new_signal = cast(Signal, self._original_components_to_new[ original_signal - ] + ]) + new_signal.set_source( new_op.output(original_output_port.index) ) + destination = cast(InputPort, new_signal.destination) self._components_dfs_order.extend( - [new_signal, new_signal.destination.operation] + [new_signal, destination.operation] ) self._operations_dfs_order.append( - new_signal.destination.operation + destination.operation ) # Check if signal has not been added before. @@ -1110,6 +1126,10 @@ class SFG(AbstractOperation): original_connected_op = ( original_signal.destination.operation ) + if original_connected_op is None: + raise ValueError( + "Signal without destination in SFG" + ) # Check if connected operation has been added. if ( original_connected_op diff --git a/b_asic/simulation.py b/b_asic/simulation.py index 4017328c6516cdbe1fd8daa4702df9704548e7ff..2eaa3a6a188290ff2c5f443c21db9fb8571eb736 100644 --- a/b_asic/simulation.py +++ b/b_asic/simulation.py @@ -130,7 +130,7 @@ class Simulation: Run the simulation until its iteration is greater than or equal to the given iteration and return the output values of the last iteration. """ - result = [] + result: Sequence[Number] = [] while self._iteration < iteration: input_values = [ self._input_functions[i](self._iteration) diff --git a/b_asic/special_operations.py b/b_asic/special_operations.py index b63e112ddc1f3ff6760ad2846e1c9faf0413a69b..ba587580d0643efa580cb0a3241a0f2d6fa59a2d 100644 --- a/b_asic/special_operations.py +++ b/b_asic/special_operations.py @@ -158,7 +158,7 @@ class Delay(AbstractOperation): @classmethod def type_name(cls) -> TypeName: - return "t" + return TypeName("t") def evaluate(self, a): return self.param("initial_value")