Skip to content
Snippets Groups Projects
Commit 3bfa5e12 authored by angloth's avatar angloth
Browse files

Remove old recusrive dfs traversing and replace with working iterative bfs traversing

parent abb484d7
No related branches found
No related tags found
5 merge requests!31Resolve "Specify internal input/output dependencies of an Operation",!25Resolve "System tests iteration 1",!24Resolve "System tests iteration 1",!23Resolve "Simulate SFG",!21Resolve "Print SFG"
......@@ -5,7 +5,8 @@ TODO: More info.
from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
from numbers import Number
from collections import defaultdict
from collections import defaultdict, deque
from pprint import pprint
from b_asic.port import SignalSourceProvider, OutputPort
from b_asic.operation import Operation, AbstractOperation
......@@ -47,7 +48,7 @@ class SFG(AbstractOperation):
_original_output_signals: Dict[Signal, int]
def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [],
inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], operations: Sequence[Operation] = [],
inputs: Sequence[Input] = [], outputs: Sequence[Output] = [],
id_number_offset: GraphIDNumber = 0, name: Name = "",
input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
super().__init__(
......@@ -61,52 +62,49 @@ class SFG(AbstractOperation):
self._graph_id_generator = GraphIDGenerator(id_number_offset)
self._input_operations = []
self._output_operations = []
self._original_components_added = set()
self._original_input_signals = {}
self._original_output_signals = {}
# Maps original components to new copied components
self._added_components_mapping = {}
self._original_input_signals_indexes = {}
self._original_output_signals_indexes = {}
# Setup input operations and signals.
for i, s in enumerate(input_signals):
self._input_operations.append(
self._add_component_copy_unconnected(Input()))
self._original_input_signals[s] = i
self._original_input_signals_indexes[s] = i
for i, op in enumerate(inputs, len(input_signals)):
self._input_operations.append(
self._add_component_copy_unconnected(op))
for s in op.output(0).signals:
self._original_input_signals[s] = i
self._original_input_signals_indexes[s] = i
# Setup output operations and signals.
for i, s in enumerate(output_signals):
self._output_operations.append(
self._add_component_copy_unconnected(Output()))
self._original_output_signals[s] = i
self._original_output_signals_indexes[s] = i
for i, op in enumerate(outputs, len(output_signals)):
self._output_operations.append(
self._add_component_copy_unconnected(op))
for s in op.input(0).signals:
self._original_output_signals[s] = i
self._original_output_signals_indexes[s] = i
# Search the graph inwards from each input signal.
for s, i in self._original_input_signals.items():
for s, i in self._original_input_signals_indexes.items():
if s.destination is None:
raise ValueError(
f"Input signal #{i} is missing destination in SFG")
if s.destination.operation not in self._original_components_added:
self._add_operation_copy_recursively(s.destination.operation)
if s.destination.operation not in self._added_components_mapping:
self._copy_structure_from_operation_bfs(
s.destination.operation)
# Search the graph inwards from each output signal.
for s, i in self._original_output_signals.items():
for s, i in self._original_output_signals_indexes.items():
if s.source is None:
raise ValueError(
f"Output signal #{i} is missing source in SFG")
if s.source.operation not in self._original_components_added:
self._add_operation_copy_recursively(s.source.operation)
# Search the graph outwards from each operation.
for op in operations:
if op not in self._original_components_added:
self._add_operation_copy_recursively(op)
if s.source.operation not in self._added_components_mapping:
self._copy_structure_from_operation_bfs(s.source.operation)
@property
def type_name(self) -> TypeName:
......@@ -161,75 +159,125 @@ class SFG(AbstractOperation):
return self._components_by_name.get(name, [])
def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent:
assert original_comp not in self._original_components_added, "Tried to add duplicate SFG component"
self._original_components_added.add(original_comp)
assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component"
new_comp = original_comp.copy_unconnected()
self._added_components_mapping[original_comp] = new_comp
self._components_by_id[self._graph_id_generator.next_id(
new_comp.type_name)] = new_comp
self._components_by_name[new_comp.name].append(new_comp)
return new_comp
def _add_operation_copy_recursively(self, original_op: Operation) -> Operation:
# Add a copy of the operation without any connections.
new_op = self._add_component_copy_unconnected(original_op)
# Connect input ports.
for original_input_port, new_input_port in zip(original_op.inputs, new_op.inputs):
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(new_input_port)
# Check if the signal is one of the SFG's input signals.
if original_signal in self._original_input_signals:
new_signal.set_source(
self._input_operations[self._original_input_signals[original_signal]].output(0))
# Only add the signal if it wasn't already added.
elif original_signal not in self._original_components_added:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
# Recursively add the connected operation.
new_connected_op = self._add_operation_copy_recursively(
original_signal.source.operation)
new_signal.set_source(new_connected_op.output(
original_signal.source.index))
# Connect output ports.
for original_output_port, new_output_port in zip(original_op.outputs, new_op.outputs):
for original_signal in original_output_port.signals:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(new_output_port)
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals:
new_signal.set_destination(
self._output_operations[self._original_output_signals[original_signal]].input(0))
# Only add the signal if it wasn't already added.
elif original_signal not in self._original_components_added:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(new_output_port)
if original_signal.destination is None:
raise ValueError(
"Dangling signal without destination in SFG")
# Recursively add the connected operation.
new_connected_op = self._add_operation_copy_recursively(
original_signal.destination.operation)
new_signal.set_destination(new_connected_op.input(
original_signal.destination.index))
return new_op
def _copy_structure_from_operation_bfs(self, start_op: Operation):
op_queue = deque([start_op])
while op_queue:
original_op = op_queue.popleft()
print("CURRENT:", original_op.name, "-------------------")
# Add a copy of the operation without any connections.
new_op = None
if original_op not in self._added_components_mapping:
new_op = self._add_component_copy_unconnected(original_op)
else:
new_op = self._added_components_mapping[original_op]
# Connect input ports to new signals
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals
if original_signal in self._original_input_signals_indexes:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(
new_op.input(original_input_port.index))
new_signal.set_source(
self._input_operations[self._original_input_signals_indexes[
original_signal]].output(0))
# Check if the signal has not been added before
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(
new_op.input(original_input_port.index))
original_connected_op = original_signal.source.operation
# Check if connected Operation has been added before
if original_connected_op in self._added_components_mapping:
# Set source to the already added operations port
new_signal.set_source(
self._added_components_mapping[original_connected_op].output(
original_signal.source.index))
else:
# Create new operation, set signal source to it
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_source(new_connected_op.output(
original_signal.source.index))
# Add connected operation to queue of operations to visit
op_queue.append(original_connected_op)
# Connect output ports
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_indexes:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(
new_op.output(original_output_port.index))
new_signal.set_destination(
self._output_operations[self._original_output_signals_indexes[
original_signal]].input(0))
# Check if signal has not been added before.
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(
new_op.output(original_output_port.index))
original_connected_op = original_signal.destination.operation
# Check if connected operation has been added.
if original_connected_op in self._added_components_mapping:
# Set destination to the already connected operations port
new_signal.set_destination(
self._added_components_mapping[original_connected_op].input(
original_signal.destination.index))
else:
# Create new operation, set destination to it.
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_destination(new_connected_op.input(
original_signal.destination.index))
print("Adding signal:", new_signal.name,
"to op:", new_connected_op.name)
print(
[inport.signals for inport in new_connected_op.inputs])
# Add connected operation to the queue of operations to visist
op_queue.append(original_connected_op)
def _evaluate_source(self, src: OutputPort) -> Number:
input_values = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment