diff --git a/b_asic/operation.py b/b_asic/operation.py index a18b383a1a20be2213f6e18fb8ac66de1a404394..f43abd242e05b4bcb335a955a22ef2673d7836c7 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -157,8 +157,7 @@ class AbstractOperation(Operation, AbstractGraphComponent): if input_sources is not None: source_count = len(input_sources) if source_count != input_count: - raise ValueError( - f"Operation expected {input_count} input sources but only got {source_count}") + raise ValueError(f"Operation expected {input_count} input sources but only got {source_count}") for i, src in enumerate(input_sources): if src is not None: self._input_ports[i].connect(src.source) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index 3d0ba6cbdae32fe21ec8f0d0012eecfa64b264e6..a30ca0c3bcebe733351c4ee997881fabee7003e4 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -40,13 +40,14 @@ class SFG(AbstractOperation): _components_by_id: Dict[GraphID, GraphComponent] _components_by_name: DefaultDict[Name, List[GraphComponent]] - _components_in_dfs_order: List[GraphComponent] + _components_ordered: List[GraphComponent] + _operations_ordered: List[Operation] _graph_id_generator: GraphIDGenerator _input_operations: List[Input] _output_operations: List[Output] _original_components_to_new: Set[GraphComponent] - _original_input_signals_to_indexes: Dict[Signal, int] - _original_output_signals_to_indexes: Dict[Signal, int] + _original_input_signals_to_indices: Dict[Signal, int] + _original_output_signals_to_indices: Dict[Signal, int] def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [], \ inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], \ @@ -60,104 +61,101 @@ class SFG(AbstractOperation): self._components_by_id = dict() self._components_by_name = defaultdict(list) - self._components_in_dfs_order = [] + self._components_ordered = [] + self._operations_ordered = [] self._graph_id_generator = GraphIDGenerator(id_number_offset) self._input_operations = [] self._output_operations = [] self._original_components_to_new = {} - self._original_input_signals_to_indexes = {} - self._original_output_signals_to_indexes = {} + self._original_input_signals_to_indices = {} + self._original_output_signals_to_indices = {} # Setup input signals. - for input_index, sig in enumerate(input_signals): - assert sig not in self._original_components_to_new, "Duplicate input signals sent to SFG construcctor." + for input_index, signal in enumerate(input_signals): + assert signal not in self._original_components_to_new, "Duplicate input signals supplied to SFG construcctor." new_input_op = self._add_component_copy_unconnected(Input()) - new_sig = self._add_component_copy_unconnected(sig) - new_sig.set_source(new_input_op.output(0)) + new_signal = self._add_component_copy_unconnected(signal) + new_signal.set_source(new_input_op.output(0)) self._input_operations.append(new_input_op) - self._original_input_signals_to_indexes[sig] = input_index + self._original_input_signals_to_indices[signal] = input_index - # Setup input operations, starting from indexes ater input signals. + # Setup input operations, starting from indices ater input signals. for input_index, input_op in enumerate(inputs, len(input_signals)): - assert input_op not in self._original_components_to_new, "Duplicate input operations sent to SFG constructor." + assert input_op not in self._original_components_to_new, "Duplicate input operations supplied to SFG constructor." new_input_op = self._add_component_copy_unconnected(input_op) - for sig in input_op.output(0).signals: - assert sig not in self._original_components_to_new, "Duplicate input signals connected to input ports sent to SFG construcctor." - new_sig = self._add_component_copy_unconnected(sig) - new_sig.set_source(new_input_op.output(0)) - self._original_input_signals_to_indexes[sig] = input_index + for signal in input_op.output(0).signals: + assert signal not in self._original_components_to_new, "Duplicate input signals connected to input ports supplied to SFG construcctor." + new_signal = self._add_component_copy_unconnected(signal) + new_signal.set_source(new_input_op.output(0)) + self._original_input_signals_to_indices[signal] = input_index self._input_operations.append(new_input_op) # Setup output signals. - for output_ind, sig in enumerate(output_signals): - new_out = self._add_component_copy_unconnected(Output()) - if sig in self._original_components_to_new: - # Signal already added when setting up inputs - new_sig = self._original_components_to_new[sig] - new_sig.set_destination(new_out.input(0)) + for output_index, signal in enumerate(output_signals): + new_output_op = self._add_component_copy_unconnected(Output()) + if signal in self._original_components_to_new: + # Signal was already added when setting up inputs. + new_signal = self._original_components_to_new[signal] + new_signal.set_destination(new_output_op.input(0)) else: - # New signal has to be created - new_sig = self._add_component_copy_unconnected(sig) - new_sig.set_destination(new_out.input(0)) - - self._output_operations.append(new_out) - self._original_output_signals_to_indexes[sig] = output_ind - - # Setup output operations, starting from indexes after output signals. - for output_ind, output_op in enumerate(outputs, len(output_signals)): - assert output_op not in self._original_components_to_new, "Duplicate output operations sent to SFG constructor." - new_out = self._add_component_copy_unconnected(output_op) - for sig in output_op.input(0).signals: - if sig in self._original_components_to_new: - # Signal already added when setting up inputs - new_sig = self._original_components_to_new[sig] - new_sig.set_destination(new_out.input(0)) + # New signal has to be created. + new_signal = self._add_component_copy_unconnected(signal) + new_signal.set_destination(new_output_op.input(0)) + + self._output_operations.append(new_output_op) + self._original_output_signals_to_indices[signal] = output_index + + # Setup output operations, starting from indices after output signals. + for output_index, output_op in enumerate(outputs, len(output_signals)): + assert output_op not in self._original_components_to_new, "Duplicate output operations supplied to SFG constructor." + new_output_op = self._add_component_copy_unconnected(output_op) + for signal in output_op.input(0).signals: + new_signal = None + if signal in self._original_components_to_new: + # Signal was already added when setting up inputs. + new_signal = self._original_components_to_new[signal] else: - # New signal has to be created - new_sig = self._add_component_copy_unconnected(sig) - new_sig.set_destination(new_out.input(0)) + # New signal has to be created. + new_signal = self._add_component_copy_unconnected(signal) - self._original_output_signals_to_indexes[sig] = output_ind + new_signal.set_destination(new_output_op.input(0)) + self._original_output_signals_to_indices[signal] = output_index - self._output_operations.append(new_out) + self._output_operations.append(new_output_op) output_operations_set = set(self._output_operations) # Search the graph inwards from each input signal. - for sig, input_index in self._original_input_signals_to_indexes.items(): + for signal, input_index in self._original_input_signals_to_indices.items(): # Check if already added destination. - new_sig = self._original_components_to_new[sig] - if new_sig.destination is None: - if sig.destination is None: - raise ValueError( - f"Input signal #{input_index} is missing destination in SFG") - elif sig.destination.operation not in self._original_components_to_new: - self._copy_structure_from_operation_dfs( - sig.destination.operation) + new_signal = self._original_components_to_new[signal] + if new_signal.destination is None: + if signal.destination is None: + raise ValueError(f"Input signal #{input_index} is missing destination in SFG") + elif signal.destination.operation not in self._original_components_to_new: + self._add_operation_copy_connected_tree(signal.destination.operation) else: - if new_sig.destination.operation in output_operations_set: - # Add directly connected input to output to dfs order list - self._components_in_dfs_order.extend([new_sig.source.operation, new_sig, new_sig.destination.operation]) + if new_signal.destination.operation in output_operations_set: + # Add directly connected input to output to ordered list. + self._components_ordered.extend([new_signal.source.operation, new_signal, new_signal.destination.operation]) + self._operations_ordered.extend([new_signal.source.operation, new_signal.destination.operation]) # Search the graph inwards from each output signal. - for sig, output_index in self._original_output_signals_to_indexes.items(): + for signal, output_index in self._original_output_signals_to_indices.items(): # Check if already added source. - new_sig = self._original_components_to_new[sig] - if new_sig.source is None: - if sig.source is None: + new_signal = self._original_components_to_new[signal] + if new_signal.source is None: + if signal.source is None: raise ValueError(f"Output signal #{output_index} is missing source in SFG") - if sig.source.operation not in self._original_components_to_new: - self._copy_structure_from_operation_dfs(sig.source.operation) + if signal.source.operation not in self._original_components_to_new: + self._add_operation_copy_connected_tree(signal.source.operation) def __call__(self, *src: Optional[SignalSourceProvider], name: Name = "") -> "SFG": """Get a new independent SFG instance that is identical to this SFG except without any of its external connections.""" - input_sources = src - if not input_sources: - input_sources = None return SFG(inputs = self._input_operations, outputs = self._output_operations, id_number_offset = self._graph_id_generator.id_number_offset, - name = name, input_sources = input_sources) + name = name, input_sources = src if src else None) @property def type_name(self) -> TypeName: @@ -191,7 +189,7 @@ class SFG(AbstractOperation): return value def split(self) -> Iterable[Operation]: - return filter(lambda comp: isinstance(comp, Operation), self._components_by_id.values()) + return self.operations @property def id_number_offset(self) -> GraphIDNumber: @@ -200,95 +198,103 @@ class SFG(AbstractOperation): @property def components(self) -> Iterable[GraphComponent]: - """Get all components of this graph in the dfs-traversal order.""" - return self._components_in_dfs_order + """Get all components of this graph in depth-first order.""" + return self._components_ordered + + @property + def operations(self) -> Iterable[Operation]: + """Get all operations of this graph in depth-first order.""" + return self._operations_ordered def find_by_id(self, graph_id: GraphID) -> Optional[GraphComponent]: - """Find a graph object based on the entered Graph ID and return it. If no graph - object with the entered ID was found then return None. + """Find the graph component with the specified ID. + Returns None if the component was not found. Keyword arguments: - graph_id: Graph ID of the wanted object. + graph_id: Graph ID of the desired component(s) """ return self._components_by_id.get(graph_id, None) def find_by_name(self, name: Name) -> List[GraphComponent]: - """Find all graph objects that have the entered name and return them - in a list. If no graph object with the entered name was found then return an - empty list. + """Find all graph components with the specified name. + Returns an empty list if no components were found. Keyword arguments: - name: Name of the wanted object. + name: Name of the desired component(s) """ return self._components_by_name.get(name, []) - def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent: - assert original_comp not in self._original_components_to_new, "Tried to add duplicate SFG component" - new_comp = original_comp.copy_component() - self._original_components_to_new[original_comp] = new_comp - new_id = self._graph_id_generator.next_id(new_comp.type_name) - new_comp.graph_id = new_id - self._components_by_id[new_id] = new_comp - self._components_by_name[new_comp.name].append(new_comp) - return new_comp - - def _copy_structure_from_operation_dfs(self, start_op: Operation): + def _add_component_copy_unconnected(self, original_component: GraphComponent) -> GraphComponent: + assert original_component not in self._original_components_to_new, "Tried to add duplicate SFG component" + new_component = original_component.copy_component() + self._original_components_to_new[original_component] = new_component + new_id = self._graph_id_generator.next_id(new_component.type_name) + new_component.graph_id = new_id + self._components_by_id[new_id] = new_component + self._components_by_name[new_component.name].append(new_component) + return new_component + + def _add_operation_copy_connected_tree(self, start_op: Operation): op_stack = deque([start_op]) while op_stack: original_op = op_stack.pop() - # Add or get the new copy of the operation.. + # Add or get the new copy of the operation. new_op = None if original_op not in self._original_components_to_new: new_op = self._add_component_copy_unconnected(original_op) - self._components_in_dfs_order.append(new_op) + self._components_ordered.append(new_op) + self._operations_ordered.append(new_op) else: new_op = self._original_components_to_new[original_op] - # Connect input ports to new signals + # Connect input ports to new signals. for original_input_port in original_op.inputs: if original_input_port.signal_count < 1: raise ValueError("Unconnected input port in SFG") for original_signal in original_input_port.signals: - # Check if the signal is one of the SFG's input signals - if original_signal in self._original_input_signals_to_indexes: - # New signal already created during first step of constructor + # Check if the signal is one of the SFG's input signals. + if original_signal in self._original_input_signals_to_indices: + # New signal already created during first step of constructor. new_signal = self._original_components_to_new[original_signal] new_signal.set_destination(new_op.input(original_input_port.index)) - self._components_in_dfs_order.extend([new_signal, new_signal.source.operation]) + self._components_ordered.extend([new_signal, new_signal.source.operation]) + self._operations_ordered.append(new_signal.source.operation) - # Check if the signal has not been added before + # Check if the signal has not been added before. elif original_signal not in self._original_components_to_new: if original_signal.source is None: raise ValueError("Dangling signal without source in SFG") new_signal = self._add_component_copy_unconnected(original_signal) new_signal.set_destination(new_op.input(original_input_port.index)) - self._components_in_dfs_order.append(new_signal) + self._components_ordered.append(new_signal) original_connected_op = original_signal.source.operation - # Check if connected Operation has been added before + # Check if connected Operation has been added before. if original_connected_op in self._original_components_to_new: - # Set source to the already added operations port + # Set source to the already added operations port. new_signal.set_source(self._original_components_to_new[original_connected_op].output(original_signal.source.index)) else: - # Create new operation, set signal source to it + # Create new operation, set signal source to it. new_connected_op = self._add_component_copy_unconnected(original_connected_op) new_signal.set_source(new_connected_op.output(original_signal.source.index)) - self._components_in_dfs_order.append(new_connected_op) + self._components_ordered.append(new_connected_op) + self._operations_ordered.append(new_connected_op) - # Add connected operation to queue of operations to visit + # Add connected operation to queue of operations to visit. op_stack.append(original_connected_op) - # Connect output ports + # Connect output ports. for original_output_port in original_op.outputs: for original_signal in original_output_port.signals: # Check if the signal is one of the SFG's output signals. - if original_signal in self._original_output_signals_to_indexes: + if original_signal in self._original_output_signals_to_indices: # New signal already created during first step of constructor. new_signal = self._original_components_to_new[original_signal] new_signal.set_source(new_op.output(original_output_port.index)) - self._components_in_dfs_order.extend([new_signal, new_signal.destination.operation]) + self._components_ordered.extend([new_signal, new_signal.destination.operation]) + self._operations_ordered.append(new_signal.destination.operation) # Check if signal has not been added before. elif original_signal not in self._original_components_to_new: @@ -297,20 +303,21 @@ class SFG(AbstractOperation): new_signal = self._add_component_copy_unconnected(original_signal) new_signal.set_source(new_op.output(original_output_port.index)) - self._components_in_dfs_order.append(new_signal) + self._components_ordered.append(new_signal) original_connected_op = original_signal.destination.operation # Check if connected operation has been added. if original_connected_op in self._original_components_to_new: - # Set destination to the already connected operations port + # Set destination to the already connected operations port. new_signal.set_destination(self._original_components_to_new[original_connected_op].input(original_signal.destination.index)) else: # Create new operation, set destination to it. new_connected_op = self._add_component_copy_unconnected(original_connected_op) new_signal.set_destination(new_connected_op.input(original_signal.destination.index)) - self._components_in_dfs_order.append(new_connected_op) + self._components_ordered.append(new_connected_op) + self._operations_ordered.append(new_connected_op) - # Add connected operation to the queue of operations to visist + # Add connected operation to the queue of operations to visit. op_stack.append(original_connected_op) def _evaluate_source(self, src: OutputPort, results: MutableMapping[str, Number], registers: MutableMapping[str, Number], prefix: str) -> Number: