Skip to content
Snippets Groups Projects
Commit a8edfcd9 authored by Oscar Gustafsson's avatar Oscar Gustafsson :bicyclist:
Browse files

Change return type for coordinates to tuples and split port methods

parent e06e5026
No related branches found
No related tags found
1 merge request!172Change return type for coordinates to tuples and split port methods
Pipeline #89109 passed
...@@ -407,7 +407,9 @@ class Operation(GraphComponent, SignalSourceProvider): ...@@ -407,7 +407,9 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod @abstractmethod
def get_plot_coordinates( def get_plot_coordinates(
self, self,
) -> Tuple[List[List[float]], List[List[float]]]: ) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
""" """
Return a tuple containing coordinates for the two polygons outlining Return a tuple containing coordinates for the two polygons outlining
the latency and execution time of the operation. the latency and execution time of the operation.
...@@ -418,11 +420,51 @@ class Operation(GraphComponent, SignalSourceProvider): ...@@ -418,11 +420,51 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod @abstractmethod
def get_io_coordinates( def get_io_coordinates(
self, self,
) -> Tuple[List[List[float]], List[List[float]]]: ) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
""" """
Return a tuple containing coordinates for inputs and outputs, respectively. Return a tuple containing coordinates for inputs and outputs, respectively.
These maps to the polygons and are corresponding to a start time of 0 These maps to the polygons and are corresponding to a start time of 0
and height 1. and height 1.
See also
========
get_input_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_input_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for inputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_io_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_output_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for outputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_input_coordinates
get_io_coordinates
""" """
raise NotImplementedError raise NotImplementedError
...@@ -915,7 +957,7 @@ class AbstractOperation(Operation, AbstractGraphComponent): ...@@ -915,7 +957,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
return self.output(0) return self.output(0)
@property @property
def destination(self) -> OutputPort: def destination(self) -> InputPort:
if self.input_count != 1: if self.input_count != 1:
diff = "more" if self.input_count > 1 else "less" diff = "more" if self.input_count > 1 else "less"
raise TypeError( raise TypeError(
...@@ -1044,25 +1086,29 @@ class AbstractOperation(Operation, AbstractGraphComponent): ...@@ -1044,25 +1086,29 @@ class AbstractOperation(Operation, AbstractGraphComponent):
def get_plot_coordinates( def get_plot_coordinates(
self, self,
) -> Tuple[List[List[float]], List[List[float]]]: ) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited # Doc-string inherited
return ( return (
self._get_plot_coordinates_for_latency(), self._get_plot_coordinates_for_latency(),
self._get_plot_coordinates_for_execution_time(), self._get_plot_coordinates_for_execution_time(),
) )
def _get_plot_coordinates_for_execution_time(self) -> List[List[float]]: def _get_plot_coordinates_for_execution_time(
self,
) -> Tuple[Tuple[float, float], ...]:
# Always a rectangle, but easier if coordinates are returned # Always a rectangle, but easier if coordinates are returned
execution_time = self._execution_time # Copy for type checking execution_time = self._execution_time # Copy for type checking
if execution_time is None: if execution_time is None:
return [] return tuple()
return [ return (
[0, 0], (0, 0),
[0, 1], (0, 1),
[execution_time, 1], (execution_time, 1),
[execution_time, 0], (execution_time, 0),
[0, 0], (0, 0),
] )
def _check_all_latencies_set(self): def _check_all_latencies_set(self):
if any(val is None for val in self.latency_offsets.values()): if any(val is None for val in self.latency_offsets.values()):
...@@ -1070,47 +1116,58 @@ class AbstractOperation(Operation, AbstractGraphComponent): ...@@ -1070,47 +1116,58 @@ class AbstractOperation(Operation, AbstractGraphComponent):
f"All latencies must be set: {self.latency_offsets}" f"All latencies must be set: {self.latency_offsets}"
) )
def _get_plot_coordinates_for_latency(self) -> List[List[float]]: def _get_plot_coordinates_for_latency(
self,
) -> Tuple[Tuple[float, float], ...]:
self._check_all_latencies_set() self._check_all_latencies_set()
# Points for latency polygon # Points for latency polygon
latency = [] latency = []
# Remember starting point # Remember starting point
start_point = [self.inputs[0].latency_offset, 0.0] start_point = (self.inputs[0].latency_offset, 0.0)
num_in = self.input_count num_in = self.input_count
latency.append(start_point) latency.append(start_point)
for k in range(1, num_in): for k in range(1, num_in):
latency.append([self.inputs[k - 1].latency_offset, k / num_in]) latency.append((self.inputs[k - 1].latency_offset, k / num_in))
latency.append([self.inputs[k].latency_offset, k / num_in]) latency.append((self.inputs[k].latency_offset, k / num_in))
latency.append([self.inputs[num_in - 1].latency_offset, 1]) latency.append((self.inputs[num_in - 1].latency_offset, 1))
num_out = self.output_count num_out = self.output_count
latency.append([self.outputs[num_out - 1].latency_offset, 1]) latency.append((self.outputs[num_out - 1].latency_offset, 1))
for k in reversed(range(1, num_out)): for k in reversed(range(1, num_out)):
latency.append([self.outputs[k].latency_offset, k / num_out]) latency.append((self.outputs[k].latency_offset, k / num_out))
latency.append([self.outputs[k - 1].latency_offset, k / num_out]) latency.append((self.outputs[k - 1].latency_offset, k / num_out))
latency.append([self.outputs[0].latency_offset, 0.0]) latency.append((self.outputs[0].latency_offset, 0.0))
# Close the polygon # Close the polygon
latency.append(start_point) latency.append(start_point)
return latency return tuple(latency)
def get_io_coordinates( def get_input_coordinates(self) -> Tuple[Tuple[float, float], ...]:
self, # doc-string inherited
) -> Tuple[List[List[float]], List[List[float]]]:
# Doc-string inherited
self._check_all_latencies_set() self._check_all_latencies_set()
input_coordinates = [ return tuple(
[ (
self.inputs[k].latency_offset, self.inputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.inputs)), (1 + 2 * k) / (2 * len(self.inputs)),
] )
for k in range(len(self.inputs)) for k in range(len(self.inputs))
] )
output_coordinates = [
[ def get_output_coordinates(self) -> Tuple[Tuple[float, float], ...]:
# doc-string inherited
self._check_all_latencies_set()
return tuple(
(
self.outputs[k].latency_offset, self.outputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.outputs)), (1 + 2 * k) / (2 * len(self.outputs)),
] )
for k in range(len(self.outputs)) for k in range(len(self.outputs))
] )
return input_coordinates, output_coordinates
def get_io_coordinates(
self,
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited
return self.get_input_coordinates(), self.get_output_coordinates()
...@@ -745,7 +745,7 @@ class Schedule: ...@@ -745,7 +745,7 @@ class Schedule:
for graph_id, op_start_time in self._start_times.items(): for graph_id, op_start_time in self._start_times.items():
op = self._sfg.find_by_id(graph_id) op = self._sfg.find_by_id(graph_id)
_, out_coordinates = op.get_io_coordinates() out_coordinates = op.get_output_coordinates()
source_y_pos = self._get_y_position( source_y_pos = self._get_y_position(
graph_id, operation_gap=operation_gap graph_id, operation_gap=operation_gap
) )
...@@ -759,11 +759,8 @@ class Schedule: ...@@ -759,11 +759,8 @@ class Schedule:
destination_y_pos = self._get_y_position( destination_y_pos = self._get_y_position(
destination_op.graph_id, operation_gap=operation_gap destination_op.graph_id, operation_gap=operation_gap
) )
( destination_in_coordinates = (
destination_in_coordinates, output_signal.destination.operation.get_input_coordinates()
_,
) = (
output_signal.destination.operation.get_io_coordinates()
) )
_draw_offset_arrow( _draw_offset_arrow(
out_coordinates[output_port.index], out_coordinates[output_port.index],
......
...@@ -47,10 +47,8 @@ class Signal(AbstractGraphComponent): ...@@ -47,10 +47,8 @@ class Signal(AbstractGraphComponent):
def __init__( def __init__(
self, self,
source: Optional[Union["OutputPort", "Signal", "Operation"]] = None, source: Optional["OutputPort"] = None,
destination: Optional[ destination: Optional["InputPort"] = None,
Union["InputPort", "Signal", "Operation"]
] = None,
bits: Optional[int] = None, bits: Optional[int] = None,
name: Name = Name(""), name: Name = Name(""),
): ):
...@@ -104,7 +102,10 @@ class Signal(AbstractGraphComponent): ...@@ -104,7 +102,10 @@ class Signal(AbstractGraphComponent):
If Operation, it must have a single output, otherwise a TypeError is If Operation, it must have a single output, otherwise a TypeError is
raised. That output is used to extract the OutputPort. raised. That output is used to extract the OutputPort.
""" """
if hasattr(source, "source"): # import here to avoid cyclic imports
from b_asic.operation import Operation
if isinstance(source, (Signal, Operation)):
# Signal or Operation # Signal or Operation
source = source.source source = source.source
......
...@@ -57,32 +57,36 @@ class Input(AbstractOperation): ...@@ -57,32 +57,36 @@ class Input(AbstractOperation):
def get_plot_coordinates( def get_plot_coordinates(
self, self,
) -> Tuple[List[List[float]], List[List[float]]]: ) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited # Doc-string inherited
return ( return (
[ (
[-0.5, 0], (-0.5, 0),
[-0.5, 1], (-0.5, 1),
[-0.25, 1], (-0.25, 1),
[0, 0.5], (0, 0.5),
[-0.25, 0], (-0.25, 0),
[-0.5, 0], (-0.5, 0),
], ),
[ (
[-0.5, 0], (-0.5, 0),
[-0.5, 1], (-0.5, 1),
[-0.25, 1], (-0.25, 1),
[0, 0.5], (0, 0.5),
[-0.25, 0], (-0.25, 0),
[-0.5, 0], (-0.5, 0),
], ),
) )
def get_io_coordinates( def get_input_coordinates(self) -> Tuple[Tuple[float, float], ...]:
self, # doc-string inherited
) -> Tuple[List[List[float]], List[List[float]]]: return tuple()
# Doc-string inherited
return ([], [[0, 0.5]]) def get_output_coordinates(self) -> Tuple[Tuple[float, float], ...]:
# doc-string inherited
return ((0, 0.5),)
class Output(AbstractOperation): class Output(AbstractOperation):
...@@ -119,18 +123,22 @@ class Output(AbstractOperation): ...@@ -119,18 +123,22 @@ class Output(AbstractOperation):
def get_plot_coordinates( def get_plot_coordinates(
self, self,
) -> Tuple[List[List[float]], List[List[float]]]: ) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited # Doc-string inherited
return ( return (
[[0, 0], [0, 1], [0.25, 1], [0.5, 0.5], [0.25, 0], [0, 0]], ((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
[[0, 0], [0, 1], [0.25, 1], [0.5, 0.5], [0.25, 0], [0, 0]], ((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
) )
def get_io_coordinates( def get_input_coordinates(self) -> Tuple[Tuple[float, float], ...]:
self, # doc-string inherited
) -> Tuple[List[List[float]], List[List[float]]]: return ((0, 0.5),)
# Doc-string inherited
return ([[0, 0.5]], []) def get_output_coordinates(self) -> Tuple[Tuple[float, float], ...]:
# doc-string inherited
return tuple()
class Delay(AbstractOperation): class Delay(AbstractOperation):
......
...@@ -270,8 +270,8 @@ class TestPlotCoordinates: ...@@ -270,8 +270,8 @@ class TestPlotCoordinates:
cmult.set_latency(3) cmult.set_latency(3)
lat, exe = cmult.get_plot_coordinates() lat, exe = cmult.get_plot_coordinates()
assert lat == [[0, 0], [0, 1], [3, 1], [3, 0], [0, 0]] assert lat == ((0, 0), (0, 1), (3, 1), (3, 0), (0, 0))
assert exe == [[0, 0], [0, 1], [1, 1], [1, 0], [0, 0]] assert exe == ((0, 0), (0, 1), (1, 1), (1, 0), (0, 0))
def test_complicated_case(self): def test_complicated_case(self):
bfly = Butterfly( bfly = Butterfly(
...@@ -280,18 +280,18 @@ class TestPlotCoordinates: ...@@ -280,18 +280,18 @@ class TestPlotCoordinates:
bfly.execution_time = 7 bfly.execution_time = 7
lat, exe = bfly.get_plot_coordinates() lat, exe = bfly.get_plot_coordinates()
assert lat == [ assert lat == (
[2, 0], (2, 0),
[2, 0.5], (2, 0.5),
[3, 0.5], (3, 0.5),
[3, 1], (3, 1),
[10, 1], (10, 1),
[10, 0.5], (10, 0.5),
[5, 0.5], (5, 0.5),
[5, 0], (5, 0),
[2, 0], (2, 0),
] )
assert exe == [[0, 0], [0, 1], [7, 1], [7, 0], [0, 0]] assert exe == ((0, 0), (0, 1), (7, 1), (7, 0), (0, 0))
class TestIOCoordinates: class TestIOCoordinates:
...@@ -301,8 +301,8 @@ class TestIOCoordinates: ...@@ -301,8 +301,8 @@ class TestIOCoordinates:
cmult.set_latency(3) cmult.set_latency(3)
i_c, o_c = cmult.get_io_coordinates() i_c, o_c = cmult.get_io_coordinates()
assert i_c == [[0, 0.5]] assert i_c == ((0, 0.5),)
assert o_c == [[3, 0.5]] assert o_c == ((3, 0.5),)
def test_complicated_case(self): def test_complicated_case(self):
bfly = Butterfly( bfly = Butterfly(
...@@ -311,8 +311,15 @@ class TestIOCoordinates: ...@@ -311,8 +311,15 @@ class TestIOCoordinates:
bfly.execution_time = 7 bfly.execution_time = 7
i_c, o_c = bfly.get_io_coordinates() i_c, o_c = bfly.get_io_coordinates()
assert i_c == [[2, 0.25], [3, 0.75]] assert i_c == ((2, 0.25), (3, 0.75))
assert o_c == [[5, 0.25], [10, 0.75]] assert o_c == ((5, 0.25), (10, 0.75))
def test_io_coordinates_error(self):
bfly = Butterfly()
bfly.set_latency_offsets({"in0": 3, "out1": 5})
with pytest.raises(ValueError, match="All latencies must be set:"):
bfly.get_io_coordinates()
class TestSplit: class TestSplit:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment