Skip to content
Snippets Groups Projects
Commit 620d6743 authored by angloth's avatar angloth
Browse files

Refactor constructor so that Input signals and Output signals are connected to...

Refactor constructor so that Input signals and Output signals are connected to ports before traversal is started, that way edge cases of empty SFG's are easily handled
parent f4ded614
Branches
No related tags found
5 merge requests!31Resolve "Specify internal input/output dependencies of an Operation",!25Resolve "System tests iteration 1",!24Resolve "System tests iteration 1",!23Resolve "Simulate SFG",!21Resolve "Print SFG"
...@@ -6,7 +6,6 @@ TODO: More info. ...@@ -6,7 +6,6 @@ TODO: More info.
from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
from numbers import Number from numbers import Number
from collections import defaultdict, deque from collections import defaultdict, deque
from pprint import pprint
from b_asic.port import SignalSourceProvider, OutputPort from b_asic.port import SignalSourceProvider, OutputPort
from b_asic.operation import Operation, AbstractOperation from b_asic.operation import Operation, AbstractOperation
...@@ -68,55 +67,87 @@ class SFG(AbstractOperation): ...@@ -68,55 +67,87 @@ class SFG(AbstractOperation):
self._original_output_signals_indexes = {} self._original_output_signals_indexes = {}
# Setup input signals. # Setup input signals.
for sig_ind, sig in enumerate(input_signals): for input_index, sig in enumerate(input_signals):
self._input_operations.append( assert sig not in self._added_components_mapping, "Duplicate input signals sent to SFG construcctor."
self._add_component_copy_unconnected(Input()))
self._original_input_signals_indexes[sig] = sig_ind new_input_op = self._add_component_copy_unconnected(Input())
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_source(new_input_op.output(0))
self._input_operations.append(new_input_op)
self._original_input_signals_indexes[sig] = input_index
# Setup input operations, starting from indexes ater input signals. # Setup input operations, starting from indexes ater input signals.
for sig_ind, op in enumerate(inputs, len(input_signals)): for input_index, input_op in enumerate(inputs, len(input_signals)):
self._input_operations.append( assert input_op not in self._added_components_mapping, "Duplicate input operations sent to SFG constructor."
self._add_component_copy_unconnected(op)) new_input_op = self._add_component_copy_unconnected(input_op)
for sig in op.output(0).signals:
self._original_input_signals_indexes[sig] = sig_ind for sig in input_op.output(0).signals:
assert sig not in self._added_components_mapping, "Duplicate input signals connected to input ports sent to SFG construcctor."
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_source(new_input_op.output(0))
self._original_input_signals_indexes[sig] = input_index
self._input_operations.append(new_input_op)
# Setup output signals. # Setup output signals.
for sig_ind, sig in enumerate(output_signals): for output_ind, sig in enumerate(output_signals):
self._output_operations.append( new_out = self._add_component_copy_unconnected(Output())
self._add_component_copy_unconnected(Output())) if sig in self._added_components_mapping:
self._original_output_signals_indexes[sig] = sig_ind # Signal already added when setting up inputs
new_sig = self._added_components_mapping[sig]
new_sig.set_destination(new_out.input(0))
else:
# New signal has to be created
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_destination(new_out.input(0))
self._output_operations.append(new_out)
self._original_output_signals_indexes[sig] = output_ind
# Setup output operations, starting from indexes after output signals. # Setup output operations, starting from indexes after output signals.
for sig_ind, op in enumerate(outputs, len(output_signals)): for output_ind, output_op in enumerate(outputs, len(output_signals)):
self._output_operations.append( assert output_op not in self._added_components_mapping, "Duplicate output operations sent to SFG constructor."
self._add_component_copy_unconnected(op))
for sig in op.input(0).signals: new_out = self._add_component_copy_unconnected(output_op)
self._original_output_signals_indexes[sig] = sig_ind for sig in output_op.input(0).signals:
if sig in self._added_components_mapping:
# Signal already added when setting up inputs
new_sig = self._added_components_mapping[sig]
new_sig.set_destination(new_out.input(0))
else:
# New signal has to be created
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_destination(new_out.input(0))
self._original_output_signals_indexes[sig] = output_ind
self._output_operations.append(new_out)
output_operations_set = set(outputs) output_operations_set = set(outputs)
# Search the graph inwards from each input signal. # Search the graph inwards from each input signal.
for sig, sig_ind in self._original_input_signals_indexes.items(): for sig, input_index in self._original_input_signals_indexes.items():
if sig.destination is None: # Check if already added destination.
raise ValueError( if self._added_components_mapping[sig].destination is None:
f"Input signal #{sig_ind} is missing destination in SFG") if sig.destination is None:
if sig.destination.operation not in self._added_components_mapping: raise ValueError(
self._copy_structure_from_operation_bfs( f"Input signal #{input_index} is missing destination in SFG")
sig.destination.operation) if sig.destination.operation not in self._added_components_mapping:
elif sig.destination.operation in output_operations_set: self._copy_structure_from_operation_bfs(
# Add direct signal between the input and output. sig.destination.operation)
new_signal = self._add_component_copy_unconnected(sig)
new_signal.set_source(
self._input_operations[sig_ind].output(0))
new_signal.set_destination(
self._output_operations[
self._original_output_signals_indexes[sig]].input(0))
# Search the graph inwards from each output signal. # Search the graph inwards from each output signal.
for sig, sig_ind in self._original_output_signals_indexes.items(): for sig, output_index in self._original_output_signals_indexes.items():
if sig.source is None: # Check if already added source.
raise ValueError( if self._added_components_mapping[sig].source is None:
f"Output signal #{sig_ind} is missing source in SFG") if sig.source is None:
if sig.source.operation not in self._added_components_mapping: raise ValueError(
self._copy_structure_from_operation_bfs(sig.source.operation) f"Output signal #{output_index} is missing source in SFG")
if sig.source.operation not in self._added_components_mapping:
self._copy_structure_from_operation_bfs(
sig.source.operation)
@property @property
def type_name(self) -> TypeName: def type_name(self) -> TypeName:
...@@ -205,13 +236,11 @@ class SFG(AbstractOperation): ...@@ -205,13 +236,11 @@ class SFG(AbstractOperation):
# Check if the signal is one of the SFG's input signals # Check if the signal is one of the SFG's input signals
if original_signal in self._original_input_signals_indexes: if original_signal in self._original_input_signals_indexes:
new_signal = self._add_component_copy_unconnected( # New signal already created during first step of constructor
original_signal) new_signal = self._added_components_mapping[
original_signal]
new_signal.set_destination( new_signal.set_destination(
new_op.input(original_input_port.index)) new_op.input(original_input_port.index))
new_signal.set_source(
self._input_operations[self._original_input_signals_indexes[
original_signal]].output(0))
# Check if the signal has not been added before # Check if the signal has not been added before
elif original_signal not in self._added_components_mapping: elif original_signal not in self._added_components_mapping:
...@@ -248,13 +277,11 @@ class SFG(AbstractOperation): ...@@ -248,13 +277,11 @@ class SFG(AbstractOperation):
# Check if the signal is one of the SFG's output signals. # Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_indexes: if original_signal in self._original_output_signals_indexes:
new_signal = self._add_component_copy_unconnected( # New signal already created during first step of constructor.
original_signal) new_signal = self._added_components_mapping[
original_signal]
new_signal.set_source( new_signal.set_source(
new_op.output(original_output_port.index)) new_op.output(original_output_port.index))
new_signal.set_destination(
self._output_operations[self._original_output_signals_indexes[
original_signal]].input(0))
# Check if signal has not been added before. # Check if signal has not been added before.
elif original_signal not in self._added_components_mapping: elif original_signal not in self._added_components_mapping:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment