Skip to content
Snippets Groups Projects
Commit 81491e13 authored by Oscar Gustafsson's avatar Oscar Gustafsson :bicyclist:
Browse files

Change return type for coordinates to tuples and split port methods

parent e06e5026
No related branches found
No related tags found
1 merge request!172Change return type for coordinates to tuples and split port methods
Pipeline #89091 passed
......@@ -407,7 +407,7 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
"""
Return a tuple containing coordinates for the two polygons outlining
the latency and execution time of the operation.
......@@ -418,11 +418,49 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
"""
Return a tuple containing coordinates for inputs and outputs, respectively.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_input_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_input_coordinates(
self,
) -> Tuple[Tuple[float]]:
"""
Return coordinates for inputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_io_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_output_coordinates(
self,
) -> Tuple[Tuple[float]]:
"""
Return coordinates for outputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_input_coordinates
get_io_coordinates
"""
raise NotImplementedError
......@@ -1044,25 +1082,25 @@ class AbstractOperation(Operation, AbstractGraphComponent):
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
# Doc-string inherited
return (
self._get_plot_coordinates_for_latency(),
self._get_plot_coordinates_for_execution_time(),
)
def _get_plot_coordinates_for_execution_time(self) -> List[List[float]]:
def _get_plot_coordinates_for_execution_time(self) -> Tuple[Tuple[float]]:
# Always a rectangle, but easier if coordinates are returned
execution_time = self._execution_time # Copy for type checking
if execution_time is None:
return []
return [
[0, 0],
[0, 1],
[execution_time, 1],
[execution_time, 0],
[0, 0],
]
return tuple()
return (
(0, 0),
(0, 1),
(execution_time, 1),
(execution_time, 0),
(0, 0),
)
def _check_all_latencies_set(self):
if any(val is None for val in self.latency_offsets.values()):
......@@ -1070,47 +1108,54 @@ class AbstractOperation(Operation, AbstractGraphComponent):
f"All latencies must be set: {self.latency_offsets}"
)
def _get_plot_coordinates_for_latency(self) -> List[List[float]]:
def _get_plot_coordinates_for_latency(self) -> Tuple[Tuple[float]]:
self._check_all_latencies_set()
# Points for latency polygon
latency = []
# Remember starting point
start_point = [self.inputs[0].latency_offset, 0.0]
start_point = (self.inputs[0].latency_offset, 0.0)
num_in = self.input_count
latency.append(start_point)
for k in range(1, num_in):
latency.append([self.inputs[k - 1].latency_offset, k / num_in])
latency.append([self.inputs[k].latency_offset, k / num_in])
latency.append([self.inputs[num_in - 1].latency_offset, 1])
latency.append((self.inputs[k - 1].latency_offset, k / num_in))
latency.append((self.inputs[k].latency_offset, k / num_in))
latency.append((self.inputs[num_in - 1].latency_offset, 1))
num_out = self.output_count
latency.append([self.outputs[num_out - 1].latency_offset, 1])
latency.append((self.outputs[num_out - 1].latency_offset, 1))
for k in reversed(range(1, num_out)):
latency.append([self.outputs[k].latency_offset, k / num_out])
latency.append([self.outputs[k - 1].latency_offset, k / num_out])
latency.append([self.outputs[0].latency_offset, 0.0])
latency.append((self.outputs[k].latency_offset, k / num_out))
latency.append((self.outputs[k - 1].latency_offset, k / num_out))
latency.append((self.outputs[0].latency_offset, 0.0))
# Close the polygon
latency.append(start_point)
return latency
return tuple(latency)
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
# Doc-string inherited
def get_input_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
self._check_all_latencies_set()
input_coordinates = [
[
return tuple(
(
self.inputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.inputs)),
]
)
for k in range(len(self.inputs))
]
output_coordinates = [
[
)
def get_output_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
self._check_all_latencies_set()
return tuple(
(
self.outputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.outputs)),
]
)
for k in range(len(self.outputs))
]
return input_coordinates, output_coordinates
)
def get_io_coordinates(
self,
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
# Doc-string inherited
return self.get_input_coordinates(), self.get_output_coordinates()
......@@ -745,7 +745,7 @@ class Schedule:
for graph_id, op_start_time in self._start_times.items():
op = self._sfg.find_by_id(graph_id)
_, out_coordinates = op.get_io_coordinates()
out_coordinates = op.get_output_coordinates()
source_y_pos = self._get_y_position(
graph_id, operation_gap=operation_gap
)
......@@ -759,11 +759,8 @@ class Schedule:
destination_y_pos = self._get_y_position(
destination_op.graph_id, operation_gap=operation_gap
)
(
destination_in_coordinates,
_,
) = (
output_signal.destination.operation.get_io_coordinates()
destination_in_coordinates = (
output_signal.destination.operation.get_input_coordinates()
)
_draw_offset_arrow(
out_coordinates[output_port.index],
......
......@@ -57,32 +57,34 @@ class Input(AbstractOperation):
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
# Doc-string inherited
return (
[
[-0.5, 0],
[-0.5, 1],
[-0.25, 1],
[0, 0.5],
[-0.25, 0],
[-0.5, 0],
],
[
[-0.5, 0],
[-0.5, 1],
[-0.25, 1],
[0, 0.5],
[-0.25, 0],
[-0.5, 0],
],
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
)
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
# Doc-string inherited
return ([], [[0, 0.5]])
def get_input_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
return tuple()
def get_output_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
return ((0, 0.5),)
class Output(AbstractOperation):
......@@ -119,18 +121,20 @@ class Output(AbstractOperation):
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[Tuple[Tuple[float]], Tuple[Tuple[float]]]:
# Doc-string inherited
return (
[[0, 0], [0, 1], [0.25, 1], [0.5, 0.5], [0.25, 0], [0, 0]],
[[0, 0], [0, 1], [0.25, 1], [0.5, 0.5], [0.25, 0], [0, 0]],
((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
)
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
# Doc-string inherited
return ([[0, 0.5]], [])
def get_input_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
return ((0, 0.5),)
def get_output_coordinates(self) -> Tuple[Tuple[float]]:
# doc-string inherited
return tuple()
class Delay(AbstractOperation):
......
......@@ -270,8 +270,8 @@ class TestPlotCoordinates:
cmult.set_latency(3)
lat, exe = cmult.get_plot_coordinates()
assert lat == [[0, 0], [0, 1], [3, 1], [3, 0], [0, 0]]
assert exe == [[0, 0], [0, 1], [1, 1], [1, 0], [0, 0]]
assert lat == ((0, 0), (0, 1), (3, 1), (3, 0), (0, 0))
assert exe == ((0, 0), (0, 1), (1, 1), (1, 0), (0, 0))
def test_complicated_case(self):
bfly = Butterfly(
......@@ -280,18 +280,18 @@ class TestPlotCoordinates:
bfly.execution_time = 7
lat, exe = bfly.get_plot_coordinates()
assert lat == [
[2, 0],
[2, 0.5],
[3, 0.5],
[3, 1],
[10, 1],
[10, 0.5],
[5, 0.5],
[5, 0],
[2, 0],
]
assert exe == [[0, 0], [0, 1], [7, 1], [7, 0], [0, 0]]
assert lat == (
(2, 0),
(2, 0.5),
(3, 0.5),
(3, 1),
(10, 1),
(10, 0.5),
(5, 0.5),
(5, 0),
(2, 0),
)
assert exe == ((0, 0), (0, 1), (7, 1), (7, 0), (0, 0))
class TestIOCoordinates:
......@@ -301,8 +301,8 @@ class TestIOCoordinates:
cmult.set_latency(3)
i_c, o_c = cmult.get_io_coordinates()
assert i_c == [[0, 0.5]]
assert o_c == [[3, 0.5]]
assert i_c == ((0, 0.5),)
assert o_c == ((3, 0.5),)
def test_complicated_case(self):
bfly = Butterfly(
......@@ -311,8 +311,15 @@ class TestIOCoordinates:
bfly.execution_time = 7
i_c, o_c = bfly.get_io_coordinates()
assert i_c == [[2, 0.25], [3, 0.75]]
assert o_c == [[5, 0.25], [10, 0.75]]
assert i_c == ((2, 0.25), (3, 0.75))
assert o_c == ((5, 0.25), (10, 0.75))
def test_io_coordinates_error(self):
bfly = Butterfly()
bfly.set_latency_offsets({"in0": 3, "out1": 5})
with pytest.raises(ValueError, match="All latencies must be set:"):
bfly.get_io_coordinates()
class TestSplit:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment