Skip to content
Snippets Groups Projects
Commit 359a0243 authored by angloth's avatar angloth
Browse files

Change input_indexes_required_for_output_port to be iterative and use bfs

parent ec3da7e8
No related branches found
No related tags found
1 merge request!29Resolve "Generate PG from SFG"
...@@ -254,7 +254,30 @@ class SFG(AbstractOperation): ...@@ -254,7 +254,30 @@ class SFG(AbstractOperation):
if output_index < 0 or output_index >= self.output_count: if output_index < 0 or output_index >= self.output_count:
raise IndexError( raise IndexError(
f"Output index out of range (expected 0-{self.output_count - 1}, got {output_index})") f"Output index out of range (expected 0-{self.output_count - 1}, got {output_index})")
return self._inputs_required_for_source(self._output_operations[output_index].input(0).signals[0].source, set())
input_indexes_required = []
sfg_input_operations_to_indexes = {
input_op: index for index, input_op in enumerate(self._input_operations)}
output_op = self._output_operations[output_index]
queue = deque([output_op])
visited = set([output_op])
while queue:
op = queue.popleft()
if isinstance(op, Input):
if op in sfg_input_operations_to_indexes:
input_indexes_required.append(
sfg_input_operations_to_indexes[op])
del sfg_input_operations_to_indexes[op]
for input_port in op.inputs:
for signal in input_port.signals:
if signal.source is not None:
new_op = signal.source.operation
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
return input_indexes_required
def copy_component(self, *args, **kwargs) -> GraphComponent: def copy_component(self, *args, **kwargs) -> GraphComponent:
return super().copy_component(*args, **kwargs, inputs=self._input_operations, outputs=self._output_operations, return super().copy_component(*args, **kwargs, inputs=self._input_operations, outputs=self._output_operations,
...@@ -526,19 +549,3 @@ class SFG(AbstractOperation): ...@@ -526,19 +549,3 @@ class SFG(AbstractOperation):
print(self._precedence_list) print(self._precedence_list)
return self._precedence_list return self._precedence_list
def _inputs_required_for_source(self, src: OutputPort, visited: MutableSet[Operation]) -> Sequence[bool]:
if src.operation in visited:
return []
visited.add(src.operation)
if isinstance(src.operation, Input):
for i, input_operation in enumerate(self._input_operations):
if input_operation is src.operation:
return [i]
input_indices = []
for i in src.operation.inputs_required_for_output(src.index):
input_indices.extend(self._inputs_required_for_source(
src.operation.input(i).signals[0].source, visited))
return input_indices
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment