diff --git a/b_asic/GUI/drag_button.py b/b_asic/GUI/drag_button.py index 18308ef7aa1059c9827d5205742757fe21d946a3..7561808b597736e9a5eee9a227d06e23486c91ff 100644 --- a/b_asic/GUI/drag_button.py +++ b/b_asic/GUI/drag_button.py @@ -222,15 +222,10 @@ class DragButton(QPushButton): _signals = [] for signal, ports in self._window._arrow_ports.items(): - if any( - map( - lambda port: set(port).intersection(set(self._ports)), - ports, - ) - ): + if any(set(port).intersection(set(self._ports)) for port in ports): self._window._logger.info( - "Removed signal with name: %s to/from operation: %s." - % (signal.signal.name, self.operation.name) + f"Removed signal with name: {signal.signal.name}" + f" to/from operation: {self.operation.name}." ) _signals.append(signal) diff --git a/b_asic/GUI/main_window.py b/b_asic/GUI/main_window.py index 1a4419c1c7b0ecd52a9aa820ef5778e44488f212..40dd53101a3133b0ad774956db722645179dabeb 100644 --- a/b_asic/GUI/main_window.py +++ b/b_asic/GUI/main_window.py @@ -12,7 +12,7 @@ import webbrowser from collections import deque from collections.abc import Sequence from types import ModuleType -from typing import TYPE_CHECKING, Deque, cast +from typing import TYPE_CHECKING, cast from qtpy.QtCore import QCoreApplication, QFileInfo, QSettings, QSize, Qt, QThread, Slot from qtpy.QtGui import QCursor, QIcon, QKeySequence, QPainter @@ -81,7 +81,7 @@ class SFGMainWindow(QMainWindow): self._scene = QGraphicsScene(self._ui.splitter) self._operations_from_name: dict[str, Operation] = {} self._zoom = 1 - self._drag_operation_scenes: dict[DragButton, "QGraphicsProxyWidget"] = {} + self._drag_operation_scenes: dict[DragButton, QGraphicsProxyWidget] = {} self._drag_buttons: dict[Operation, DragButton] = {} self._mouse_pressed = False self._mouse_dragging = False @@ -121,7 +121,7 @@ class SFGMainWindow(QMainWindow): # Add operations self._max_recent_files = 4 self._recent_files_actions: list[QAction] = [] - self._recent_files_paths: Deque[str] = deque(maxlen=self._max_recent_files) + self._recent_files_paths: deque[str] = deque(maxlen=self._max_recent_files) self.add_operations_from_namespace( b_asic.core_operations, self._ui.core_operations_list @@ -488,10 +488,10 @@ class SFGMainWindow(QMainWindow): self._logger.warning("Failed to initialize SFG with empty name.") return - self._logger.info("Creating SFG with name: %s from selected operations." % name) + self._logger.info(f"Creating SFG with name: {name} from selected operations.") sfg = SFG(inputs=inputs, outputs=outputs, name=name) - self._logger.info("Created SFG with name: %s from selected operations." % name) + self._logger.info(f"Created SFG with name: {name} from selected operations.") self.update_statusbar(f"Created SFG: {name}") def check_equality(signal: Signal, signal_2: Signal) -> bool: @@ -756,7 +756,7 @@ class SFGMainWindow(QMainWindow): ) def _create_operation_item(self, item) -> None: - self._logger.info("Creating operation of type: %s" % str(item.text())) + self._logger.info(f"Creating operation of type: {str(item.text())}") try: attr_operation = self._operations_from_name[item.text()]() self.add_operation(attr_operation) @@ -840,11 +840,8 @@ class SFGMainWindow(QMainWindow): if signal.destination is destination.port ) self._logger.info( - "Connecting: %s -> %s." - % ( - source.operation.type_name(), - destination.operation.type_name(), - ) + f"Connecting: {source.operation.type_name()}" + f" -> {destination.operation.type_name()}." ) try: arrow = Arrow(source, destination, self, signal=next(signal_exists)) @@ -903,10 +900,10 @@ class SFGMainWindow(QMainWindow): def _simulate_sfg(self) -> None: """Callback for simulating SFGs in separate threads.""" - self._thread = dict() - self._sim_worker = dict() + self._thread = {} + self._sim_worker = {} for sfg, properties in self._simulation_dialog._properties.items(): - self._logger.info("Simulating SFG with name: %s" % str(sfg.name)) + self._logger.info(f"Simulating SFG with name: {str(sfg.name)}") self._sim_worker[sfg] = SimulationWorker(sfg, properties) self._thread[sfg] = QThread() self._sim_worker[sfg].moveToThread(self._thread[sfg]) diff --git a/b_asic/architecture.py b/b_asic/architecture.py index 99625a644d8551cbd637baf805846835d6b0b360..81dbed5e4a2747a70e98cc31d7f9c14231704112 100644 --- a/b_asic/architecture.py +++ b/b_asic/architecture.py @@ -8,7 +8,6 @@ from collections.abc import Iterable, Iterator from io import TextIOWrapper from itertools import chain from typing import ( - DefaultDict, Literal, cast, ) @@ -623,10 +622,10 @@ of :class:`~b_asic.architecture.ProcessingElement` ) self._memories = [memories] if isinstance(memories, Memory) else list(memories) self._direct_interconnects = direct_interconnects - self._variable_input_port_to_resource: DefaultDict[ + self._variable_input_port_to_resource: defaultdict[ InputPort, set[tuple[Resource, int]] ] = defaultdict(set) - self._variable_outport_to_resource: DefaultDict[ + self._variable_outport_to_resource: defaultdict[ OutputPort, set[tuple[Resource, int]] ] = defaultdict(set) self._operation_input_port_to_resource: dict[InputPort, Resource] = {} @@ -766,8 +765,8 @@ of :class:`~b_asic.architecture.ProcessingElement` if isinstance(mem, str): mem = cast(Memory, self.resource_from_name(mem)) - d_in: DefaultDict[Resource, int] = defaultdict(_interconnect_dict) - d_out: DefaultDict[Resource, int] = defaultdict(_interconnect_dict) + d_in: defaultdict[Resource, int] = defaultdict(_interconnect_dict) + d_out: defaultdict[Resource, int] = defaultdict(_interconnect_dict) for var in mem.collection: var = cast(MemoryVariable, var) d_in[self._operation_outport_to_resource[var.write_port]] += 1 @@ -802,10 +801,10 @@ of :class:`~b_asic.architecture.ProcessingElement` if isinstance(pe, str): pe = cast(ProcessingElement, self.resource_from_name(pe)) - d_in: list[DefaultDict[tuple[Resource, int], int]] = [ + d_in: list[defaultdict[tuple[Resource, int], int]] = [ defaultdict(_interconnect_dict) for _ in range(pe.input_count) ] - d_out: list[DefaultDict[tuple[Resource, int], int]] = [ + d_out: list[defaultdict[tuple[Resource, int], int]] = [ defaultdict(_interconnect_dict) for _ in range(pe.output_count) ] for var in pe.collection: @@ -1052,8 +1051,8 @@ of :class:`~b_asic.architecture.ProcessingElement` ) # Create list of interconnects - edges: DefaultDict[str, set[tuple[str, str]]] = defaultdict(set) - destination_edges: DefaultDict[str, set[str]] = defaultdict(set) + edges: defaultdict[str, set[tuple[str, str]]] = defaultdict(set) + destination_edges: defaultdict[str, set[str]] = defaultdict(set) for pe in self._processing_elements: inputs, outputs = self.get_interconnects_for_pe(pe) for i, inp in enumerate(inputs): diff --git a/b_asic/codegen/vhdl/__init__.py b/b_asic/codegen/vhdl/__init__.py index 203450754601e8323ff6f397247e2ae9d2cae172..f34b60d09859dee42b83b41c1b3acecce84182ff 100644 --- a/b_asic/codegen/vhdl/__init__.py +++ b/b_asic/codegen/vhdl/__init__.py @@ -2,7 +2,7 @@ Module for basic VHDL code generation. """ -from typing import List, Optional, TextIO, Tuple, Union +from typing import TextIO # VHDL code generation tab length VHDL_TAB = r" " @@ -14,7 +14,7 @@ def write( text: str, *, end: str = "\n", - start: Optional[str] = None, + start: str | None = None, ): """ Base VHDL code generation utility. @@ -42,7 +42,7 @@ def write( f.write(f"{VHDL_TAB * indent_level}{text}{end}") -def write_lines(f: TextIO, lines: List[Union[Tuple[int, str], Tuple[int, str, str]]]): +def write_lines(f: TextIO, lines: list[tuple[int, str] | tuple[int, str, str]]): """ Multiline VHDL code generation utility. diff --git a/b_asic/codegen/vhdl/architecture.py b/b_asic/codegen/vhdl/architecture.py index 524f8c9b72915a8a51292da69d1b39689b1e0957..fb548d41e1fd301aca3032faa7e9cb1497b9cc3c 100644 --- a/b_asic/codegen/vhdl/architecture.py +++ b/b_asic/codegen/vhdl/architecture.py @@ -3,7 +3,7 @@ Module for code generation of VHDL architectures. """ from math import ceil, log2 -from typing import TYPE_CHECKING, Dict, List, Optional, Set, TextIO, Tuple, cast +from typing import TYPE_CHECKING, TextIO, cast from b_asic.codegen.vhdl import common, write, write_lines from b_asic.process import MemoryVariable @@ -14,7 +14,7 @@ if TYPE_CHECKING: def memory_based_storage( f: TextIO, - assignment: List["ProcessCollection"], + assignment: list["ProcessCollection"], entity_name: str, word_length: int, read_ports: int, @@ -314,7 +314,7 @@ def memory_based_storage( write(f, 1, "--", end="\n") # Extract all the write addresses - write_list: List[Optional[Tuple[int, MemoryVariable]]] = [ + write_list: list[tuple[int, MemoryVariable] | None] = [ None for _ in range(schedule_time) ] for i, collection in enumerate(assignment): @@ -441,7 +441,7 @@ def memory_based_storage( write(f, 1, "--", end="\n") # Extract all the read addresses - read_list: List[Optional[Tuple[int, MemoryVariable]]] = [ + read_list: list[tuple[int, MemoryVariable] | None] = [ None for _ in range(schedule_time) ] for i, collection in enumerate(assignment): @@ -578,15 +578,15 @@ def register_based_storage( } # Table with mapping: register to output multiplexer index - output_mux_table: Dict[int, int] = {reg: i for i, reg in enumerate(output_regs)} + output_mux_table: dict[int, int] = {reg: i for i, reg in enumerate(output_regs)} # Back-edge register indices - back_edges: Set[Tuple[int, int]] = { + back_edges: set[tuple[int, int]] = { (frm, to) for entry in forward_backward_table for frm, to in entry.back_edge_to.items() } - back_edge_table: Dict[Tuple[int, int], int] = { + back_edge_table: dict[tuple[int, int], int] = { edge: i + 1 for i, edge in enumerate(back_edges) } diff --git a/b_asic/codegen/vhdl/common.py b/b_asic/codegen/vhdl/common.py index 198eb2d6f86093eed05c7e2a1d1ab1d00eaa9f8d..d0e7eff02d49ee7652a144657d02a8f6346f2bfd 100644 --- a/b_asic/codegen/vhdl/common.py +++ b/b_asic/codegen/vhdl/common.py @@ -5,7 +5,7 @@ Generation of common VHDL constructs import re from datetime import datetime from subprocess import PIPE, Popen -from typing import Any, Optional, Set, TextIO, Tuple +from typing import Any, TextIO from b_asic.codegen.vhdl import write, write_lines @@ -76,10 +76,10 @@ def signal_declaration( f: TextIO, name: str, signal_type: str, - default_value: Optional[str] = None, - name_pad: Optional[int] = None, - vivado_ram_style: Optional[str] = None, - quartus_ram_style: Optional[str] = None, + default_value: str | None = None, + name_pad: int | None = None, + vivado_ram_style: str | None = None, + quartus_ram_style: str | None = None, ): """ Create a VHDL signal declaration. @@ -137,8 +137,8 @@ def alias_declaration( f: TextIO, name: str, signal_type: str, - value: Optional[str] = None, - name_pad: Optional[int] = None, + value: str | None = None, + name_pad: int | None = None, ): name_pad = name_pad or 0 write(f, 1, f"alias {name:<{name_pad}} : {signal_type} is {value};") @@ -149,7 +149,7 @@ def constant_declaration( name: str, signal_type: str, value: Any, - name_pad: Optional[int] = None, + name_pad: int | None = None, ): """ Write a VHDL constant declaration with a name, a type and a value. @@ -195,7 +195,7 @@ def process_prologue( f: TextIO, sensitivity_list: str, indent: int = 1, - name: Optional[str] = None, + name: str | None = None, ): """ Write the prologue of a regular VHDL process with a user provided sensitivity list. @@ -222,9 +222,9 @@ def process_prologue( def process_epilogue( f: TextIO, - sensitivity_list: Optional[str] = None, + sensitivity_list: str | None = None, indent: int = 1, - name: Optional[str] = None, + name: str | None = None, ): """ Write the epilogue of a regular VHDL process. @@ -253,7 +253,7 @@ def synchronous_process_prologue( f: TextIO, clk: str, indent: int = 1, - name: Optional[str] = None, + name: str | None = None, ): """ Write the prologue of a regular VHDL synchronous process with a single clock object. @@ -280,9 +280,9 @@ def synchronous_process_prologue( def synchronous_process_epilogue( f: TextIO, - clk: Optional[str] = None, + clk: str | None = None, indent: int = 1, - name: Optional[str] = None, + name: str | None = None, ): """ Write the epilogue of a regular VHDL synchronous process with a single clock. @@ -311,7 +311,7 @@ def synchronous_process( clk: str, body: str, indent: int = 1, - name: Optional[str] = None, + name: str | None = None, ): """ Write a regular VHDL synchronous process with a single clock. @@ -342,9 +342,9 @@ def synchronous_process( def synchronous_memory( f: TextIO, clk: str, - read_ports: Set[Tuple[str, str, str]], - write_ports: Set[Tuple[str, str, str]], - name: Optional[str] = None, + read_ports: set[tuple[str, str, str]], + write_ports: set[tuple[str, str, str]], + name: str | None = None, ): """ Infer a VHDL synchronous reads and writes. @@ -389,9 +389,9 @@ def synchronous_memory( def asynchronous_read_memory( f: TextIO, clk: str, - read_ports: Set[Tuple[str, str, str]], - write_ports: Set[Tuple[str, str, str]], - name: Optional[str] = None, + read_ports: set[tuple[str, str, str]], + write_ports: set[tuple[str, str, str]], + name: str | None = None, ): """ Infer a VHDL memory with synchronous writes and asynchronous reads. diff --git a/b_asic/codegen/vhdl/entity.py b/b_asic/codegen/vhdl/entity.py index 9a53e4a44f4389767bdab2e5eae12650219bacdd..28a2e6cc47f3720fc91fae8ca6df50f4c37eb8d9 100644 --- a/b_asic/codegen/vhdl/entity.py +++ b/b_asic/codegen/vhdl/entity.py @@ -2,7 +2,7 @@ Module for code generation of VHDL entity declarations """ -from typing import Set, TextIO +from typing import TextIO from b_asic.codegen.vhdl import VHDL_TAB, write_lines from b_asic.port import Port @@ -63,7 +63,7 @@ def memory_based_storage( f.write(f"{2 * VHDL_TAB}{port_name} : in std_logic_vector(WL-1 downto 0);\n") # Write the output port specification - write_ports: Set[Port] = {mv.write_port for mv in collection} # type: ignore + write_ports: set[Port] = {mv.write_port for mv in collection} # type: ignore for idx, write_port in enumerate(write_ports): port_name = write_port if isinstance(write_port, int) else write_port.name port_name = "p_" + str(port_name) + "_out" diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index a879edec3c3da1f6cc732beed1dce31c3768e666..30375a2ff24a308bd20a20d77b21b308561a0c6f 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -100,7 +100,7 @@ class Constant(AbstractOperation): def get_input_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited - return tuple() + return () def get_output_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited @@ -1724,7 +1724,7 @@ class DontCare(AbstractOperation): def get_input_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited - return tuple() + return () def get_output_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited @@ -1791,4 +1791,4 @@ class Sink(AbstractOperation): def get_output_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited - return tuple() + return () diff --git a/b_asic/operation.py b/b_asic/operation.py index 2a1be12ac73d21125db07d9451f424b4c91a5681..d22caccab4d9383a88f86e70fcaff22db395e8b7 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -13,7 +13,6 @@ from numbers import Number from typing import ( TYPE_CHECKING, NewType, - Optional, cast, overload, ) @@ -28,8 +27,8 @@ if TYPE_CHECKING: ResultKey = NewType("ResultKey", str) -ResultMap = Mapping[ResultKey, Optional[Num]] -MutableResultMap = MutableMapping[ResultKey, Optional[Num]] +ResultMap = Mapping[ResultKey, Num | None] +MutableResultMap = MutableMapping[ResultKey, Num | None] DelayMap = Mapping[ResultKey, Num] MutableDelayMap = MutableMapping[ResultKey, Num] @@ -985,7 +984,7 @@ class AbstractOperation(Operation, AbstractGraphComponent): # Always a rectangle, but easier if coordinates are returned execution_time = self._execution_time # Copy for type checking if execution_time is None: - return tuple() + return () return ( (0, 0), (0, 1), diff --git a/b_asic/process.py b/b_asic/process.py index d165a13ee80c84e979baf4d46a9500637e7a7a92..be4ccbe299295564655158b42d3d72be4c553ef9 100644 --- a/b_asic/process.py +++ b/b_asic/process.py @@ -198,8 +198,8 @@ class MemoryProcess(Process): reads exists), and vice-versa for the other tuple element. """ reads = self.reads - short_reads = {k: v for k, v in filter(lambda t: t[1] <= length, reads.items())} - long_reads = {k: v for k, v in filter(lambda t: t[1] > length, reads.items())} + short_reads = dict(filter(lambda t: t[1] <= length, reads.items())) + long_reads = dict(filter(lambda t: t[1] > length, reads.items())) short_process = None long_process = None if short_reads: @@ -319,7 +319,7 @@ class MemoryVariable(MemoryProcess): return self._write_port def __repr__(self) -> str: - reads = {k: v for k, v in zip(self._read_ports, self._life_times, strict=True)} + reads = dict(zip(self._read_ports, self._life_times, strict=True)) return ( f"MemoryVariable({self.start_time}, {self.write_port}," f" {reads!r}, {self.name!r})" @@ -413,7 +413,7 @@ class PlainMemoryVariable(MemoryProcess): return self._write_port def __repr__(self) -> str: - reads = {k: v for k, v in zip(self._read_ports, self._life_times, strict=True)} + reads = dict(zip(self._read_ports, self._life_times, strict=True)) return ( f"PlainMemoryVariable({self.start_time}, {self.write_port}," f" {reads!r}, {self.name!r})" diff --git a/b_asic/resources.py b/b_asic/resources.py index 36e2cec9f0eb1142b22e5b20553ec8e385f55c88..9f4015ae670b4a8cf5faa36b1ae429b546e9b33f 100644 --- a/b_asic/resources.py +++ b/b_asic/resources.py @@ -1172,7 +1172,7 @@ class ProcessCollection: f"{process} has execution time greater than the schedule time" ) - cell_assignment: dict[int, ProcessCollection] = dict() + cell_assignment: dict[int, ProcessCollection] = {} exclusion_graph = self.create_exclusion_graph_from_execution_time() if coloring is None: coloring = nx.coloring.greedy_color( @@ -1599,7 +1599,7 @@ class ProcessCollection: def total_port_accesses(self) -> dict[int, int]: accesses = sum( ( - list(read_time % self.schedule_time for read_time in process.read_times) + [read_time % self.schedule_time for read_time in process.read_times] for process in self._collection ), [process.start_time % self.schedule_time for process in self._collection], diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 874d64946e93c7e656fc803beb1352957be9c6f8..b81879dfb271b8eae1a174318819305ddb111374 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -15,6 +15,48 @@ if TYPE_CHECKING: class Scheduler(ABC): + def __init__( + self, + input_times: dict["GraphID", int] | None = None, + output_delta_times: dict["GraphID", int] | None = None, + ): + self._logger = logger.getLogger("scheduler") + self._op_laps = {} + + if input_times is not None: + if not isinstance(input_times, dict): + raise ValueError("Provided input_times must be a dictionary.") + for key, value in input_times.items(): + if not isinstance(key, str): + raise ValueError("Provided input_times keys must be strings.") + if not isinstance(value, int): + raise ValueError("Provided input_times values must be integers.") + if any(time < 0 for time in input_times.values()): + raise ValueError("Provided input_times values must be non-negative.") + self._input_times = input_times + else: + self._input_times = {} + + if output_delta_times is not None: + if not isinstance(output_delta_times, dict): + raise ValueError("Provided output_delta_times must be a dictionary.") + for key, value in output_delta_times.items(): + if not isinstance(key, str): + raise ValueError( + "Provided output_delta_times keys must be strings." + ) + if not isinstance(value, int): + raise ValueError( + "Provided output_delta_times values must be integers." + ) + if any(time < 0 for time in output_delta_times.values()): + raise ValueError( + "Provided output_delta_times values must be non-negative." + ) + self._output_delta_times = output_delta_times + else: + self._output_delta_times = {} + @abstractmethod def apply_scheduling(self, schedule: "Schedule") -> None: """Applies the scheduling algorithm on the given Schedule. @@ -26,7 +68,17 @@ class Scheduler(ABC): """ raise NotImplementedError - def _handle_outputs( + def _place_inputs_on_given_times(self) -> None: + self._logger.debug("--- Input placement starting ---") + for input_id in self._input_times: + self._schedule.start_times[input_id] = self._input_times[input_id] + self._op_laps[input_id] = 0 + self._logger.debug( + f" {input_id} time: {self._schedule.start_times[input_id]}" + ) + self._logger.debug("--- Input placement completed ---") + + def _place_outputs_asap( self, schedule: "Schedule", non_schedulable_ops: list["GraphID"] | None = [] ) -> None: for output in schedule.sfg.find_by_type(Output): @@ -45,6 +97,72 @@ class Scheduler(ABC): source_port.operation.graph_id ] + cast(int, source_port.latency_offset) + def _place_outputs_on_given_times(self) -> None: + self._logger.debug("--- Output placement starting ---") + if self._schedule._cyclic and isinstance(self, ListScheduler): + end = self._schedule._schedule_time + else: + end = self._schedule.get_max_end_time() + for output in self._sfg.find_by_type(Output): + output = cast(Output, output) + if output.graph_id in self._output_delta_times: + delta_time = self._output_delta_times[output.graph_id] + + new_time = end + delta_time + + if ( + self._schedule._cyclic + and self._schedule._schedule_time is not None + and isinstance(self, ListScheduler) + ): + self._schedule.place_operation(output, new_time, self._op_laps) + else: + self._schedule.start_times[output.graph_id] = new_time + + count = -1 + for op_id, time in self._schedule.start_times.items(): + if time == new_time and isinstance( + self._sfg.find_by_id(op_id), Output + ): + count += 1 + + modulo_time = ( + new_time % self._schedule._schedule_time + if self._schedule._schedule_time + else new_time + ) + self._logger.debug(f" {output.graph_id} time: {modulo_time}") + self._logger.debug("--- Output placement completed ---") + + self._logger.debug("--- Output placement optimization starting ---") + min_slack = min( + self._schedule.backward_slack(op.graph_id) + for op in self._sfg.find_by_type(Output) + ) + if min_slack != 0: + for output in self._sfg.find_by_type(Output): + if self._schedule._cyclic and self._schedule._schedule_time is not None: + self._schedule.move_operation(output.graph_id, -min_slack) + else: + self._schedule.start_times[output.graph_id] = ( + self._schedule.start_times[output.graph_id] - min_slack + ) + new_time = self._schedule.start_times[output.graph_id] + if ( + not self._schedule._cyclic + and self._schedule._schedule_time is not None + and new_time > self._schedule._schedule_time + ): + raise ValueError( + f"Cannot place output {output.graph_id} at time {new_time} " + f"for scheduling time {self._schedule._schedule_time}. " + "Try to relax the scheduling time, change the output delta times or enable cyclic." + ) + self._logger.debug( + f" {output.graph_id} moved {min_slack} time steps backwards to new time {new_time}" + ) + self._logger.debug("--- Output placement optimization completed ---") + class ASAPScheduler(Scheduler): """Scheduler that implements the as-soon-as-possible (ASAP) algorithm.""" @@ -57,27 +175,26 @@ class ASAPScheduler(Scheduler): schedule : Schedule Schedule to apply the scheduling algorithm on. """ - + self._schedule = schedule + self._sfg = schedule._sfg prec_list = schedule.sfg.get_precedence_list() if len(prec_list) < 2: raise ValueError("Empty signal flow graph cannot be scheduled.") + if self._input_times: + self._place_inputs_on_given_times() + # handle the first set in precedence graph (input and delays) non_schedulable_ops = [] for outport in prec_list[0]: operation = outport.operation if operation.type_name() == Delay.type_name(): non_schedulable_ops.append(operation.graph_id) - else: + elif operation.graph_id not in self._input_times: schedule.start_times[operation.graph_id] = 0 - # handle second set in precedence graph (first operations) - for outport in prec_list[1]: - operation = outport.operation - schedule.start_times[operation.graph_id] = 0 - # handle the remaining sets - for outports in prec_list[2:]: + for outports in prec_list[1:]: for outport in outports: operation = outport.operation if operation.graph_id not in schedule.start_times: @@ -117,7 +234,9 @@ class ASAPScheduler(Scheduler): schedule.start_times[operation.graph_id] = op_start_time - self._handle_outputs(schedule, non_schedulable_ops) + self._place_outputs_asap(schedule, non_schedulable_ops) + if self._input_times: + self._place_outputs_on_given_times() schedule.remove_delays() max_end_time = schedule.get_max_end_time() @@ -141,26 +260,41 @@ class ALAPScheduler(Scheduler): schedule : Schedule Schedule to apply the scheduling algorithm on. """ - ASAPScheduler().apply_scheduling(schedule) - self.op_laps = {} + self._schedule = schedule + self._sfg = schedule._sfg + ASAPScheduler( + self._input_times, + self._output_delta_times, + ).apply_scheduling(schedule) + self._op_laps = {} + + if self._output_delta_times: + self._place_outputs_on_given_times() + + for output in schedule.sfg.find_by_type(Input): + output = cast(Output, output) + self._op_laps[output.graph_id] = 0 # move all outputs ALAP before operations for output in schedule.sfg.find_by_type(Output): output = cast(Output, output) - self.op_laps[output.graph_id] = 0 + self._op_laps[output.graph_id] = 0 + if output.graph_id in self._output_delta_times: + continue schedule.move_operation_alap(output.graph_id) # move all operations ALAP for step in reversed(schedule.sfg.get_precedence_list()): for outport in step: - if not isinstance(outport.operation, Delay): + op = outport.operation + if not isinstance(op, Delay) and op.graph_id not in self._input_times: new_unwrapped_start_time = schedule.start_times[ - outport.operation.graph_id - ] + schedule.forward_slack(outport.operation.graph_id) - self.op_laps[outport.operation.graph_id] = ( + op.graph_id + ] + schedule.forward_slack(op.graph_id) + self._op_laps[op.graph_id] = ( new_unwrapped_start_time // schedule._schedule_time ) - schedule.move_operation_alap(outport.operation.graph_id) + schedule.move_operation_alap(op.graph_id) # adjust the scheduling time if empty time slots have appeared in the start slack = min(schedule.start_times.values()) @@ -202,8 +336,8 @@ class ListScheduler(Scheduler): input_times: dict["GraphID", int] | None = None, output_delta_times: dict["GraphID", int] | None = None, ) -> None: - super() - self._logger = logger.getLogger("list_scheduler") + super().__init__(input_times, output_delta_times) + self._sort_order = sort_order if max_resources is not None: if not isinstance(max_resources, dict): @@ -233,42 +367,6 @@ class ListScheduler(Scheduler): ) self._max_concurrent_writes = max_concurrent_writes or 0 - if input_times is not None: - if not isinstance(input_times, dict): - raise ValueError("Provided input_times must be a dictionary.") - for key, value in input_times.items(): - if not isinstance(key, str): - raise ValueError("Provided input_times keys must be strings.") - if not isinstance(value, int): - raise ValueError("Provided input_times values must be integers.") - if any(time < 0 for time in input_times.values()): - raise ValueError("Provided input_times values must be non-negative.") - self._input_times = input_times - else: - self._input_times = {} - - if output_delta_times is not None: - if not isinstance(output_delta_times, dict): - raise ValueError("Provided output_delta_times must be a dictionary.") - for key, value in output_delta_times.items(): - if not isinstance(key, str): - raise ValueError( - "Provided output_delta_times keys must be strings." - ) - if not isinstance(value, int): - raise ValueError( - "Provided output_delta_times values must be integers." - ) - if any(time < 0 for time in output_delta_times.values()): - raise ValueError( - "Provided output_delta_times values must be non-negative." - ) - self._output_delta_times = output_delta_times - else: - self._output_delta_times = {} - - self._sort_order = sort_order - def apply_scheduling(self, schedule: "Schedule") -> None: """Applies the scheduling algorithm on the given Schedule. @@ -280,13 +378,22 @@ class ListScheduler(Scheduler): self._logger.debug("--- Scheduler initializing ---") self._initialize_scheduler(schedule) + if self._sfg.loops and self._schedule.cyclic: + raise ValueError( + "ListScheduler does not support cyclic scheduling of " + "recursive algorithms. Use RecursiveListScheduler instead." + ) + if self._input_times: self._place_inputs_on_given_times() + self._remaining_ops = [ + op_id for op_id in self._remaining_ops if op_id not in self._input_times + ] self._schedule_nonrecursive_ops() if self._output_delta_times: - self._handle_outputs() + self._place_outputs_on_given_times() if self._schedule._schedule_time is None: self._schedule.set_schedule_time(self._schedule.get_max_end_time()) @@ -568,10 +675,10 @@ class ListScheduler(Scheduler): alap_schedule = copy.copy(self._schedule) alap_schedule._schedule_time = None - alap_scheduler = ALAPScheduler() + alap_scheduler = ALAPScheduler(self._input_times, self._output_delta_times) alap_scheduler.apply_scheduling(alap_schedule) self._alap_start_times = alap_schedule.start_times - self._alap_op_laps = alap_scheduler.op_laps + self._alap_op_laps = alap_scheduler._op_laps self._alap_schedule_time = alap_schedule._schedule_time self._schedule.start_times = {} for key in self._schedule._laps: @@ -633,7 +740,6 @@ class ListScheduler(Scheduler): self._used_reads = {0: 0} self._current_time = 0 - self._op_laps = {} def _schedule_nonrecursive_ops(self) -> None: self._logger.debug("--- Non-Recursive Operation scheduling starting ---") @@ -691,83 +797,6 @@ class ListScheduler(Scheduler): else: self._used_reads[time] = 1 - def _place_inputs_on_given_times(self) -> None: - self._logger.debug("--- Input placement starting ---") - for input_id in self._input_times: - self._schedule.start_times[input_id] = self._input_times[input_id] - self._op_laps[input_id] = 0 - self._logger.debug( - f" {input_id} time: {self._schedule.start_times[input_id]}" - ) - self._remaining_ops = [ - op_id - for op_id in self._remaining_ops - if not isinstance(self._sfg.find_by_id(op_id), Input) - ] - self._logger.debug("--- Input placement completed ---") - - def _handle_outputs(self) -> None: - self._logger.debug("--- Output placement starting ---") - if self._schedule._cyclic: - end = self._schedule._schedule_time - else: - end = self._schedule.get_max_end_time() - for output in self._sfg.find_by_type(Output): - output = cast(Output, output) - if output.graph_id in self._output_delta_times: - delta_time = self._output_delta_times[output.graph_id] - - new_time = end + delta_time - - if self._schedule._cyclic and self._schedule._schedule_time is not None: - self._schedule.place_operation(output, new_time, self._op_laps) - else: - self._schedule.start_times[output.graph_id] = new_time - - count = -1 - for op_id, time in self._schedule.start_times.items(): - if time == new_time and isinstance( - self._sfg.find_by_id(op_id), Output - ): - count += 1 - - modulo_time = ( - new_time % self._schedule._schedule_time - if self._schedule._schedule_time - else new_time - ) - self._logger.debug(f" {output.graph_id} time: {modulo_time}") - self._logger.debug("--- Output placement completed ---") - - self._logger.debug("--- Output placement optimization starting ---") - min_slack = min( - self._schedule.backward_slack(op.graph_id) - for op in self._sfg.find_by_type(Output) - ) - if min_slack != 0: - for output in self._sfg.find_by_type(Output): - if self._schedule._cyclic and self._schedule._schedule_time is not None: - self._schedule.move_operation(output.graph_id, -min_slack) - else: - self._schedule.start_times[output.graph_id] = ( - self._schedule.start_times[output.graph_id] - min_slack - ) - new_time = self._schedule.start_times[output.graph_id] - if ( - not self._schedule._cyclic - and self._schedule._schedule_time is not None - and new_time > self._schedule._schedule_time - ): - raise ValueError( - f"Cannot place output {output.graph_id} at time {new_time} " - f"for scheduling time {self._schedule._schedule_time}. " - "Try to relax the scheduling time, change the output delta times or enable cyclic." - ) - self._logger.debug( - f" {output.graph_id} moved {min_slack} time steps backwards to new time {new_time}" - ) - self._logger.debug("--- Output placement optimization completed ---") - def _handle_dont_cares(self) -> None: # schedule all dont cares ALAP for dc_op in self._sfg.find_by_type(DontCare): @@ -802,6 +831,9 @@ class RecursiveListScheduler(ListScheduler): if self._input_times: self._place_inputs_on_given_times() + self._remaining_ops = [ + op_id for op_id in self._remaining_ops if op_id not in self._input_times + ] loops = self._sfg.loops if loops: @@ -810,7 +842,7 @@ class RecursiveListScheduler(ListScheduler): self._schedule_nonrecursive_ops() if self._output_delta_times: - self._handle_outputs() + self._place_outputs_on_given_times() if self._schedule._schedule_time is None: self._schedule.set_schedule_time(self._schedule.get_max_end_time()) diff --git a/b_asic/scheduler_gui/main_window.py b/b_asic/scheduler_gui/main_window.py index b295026828a329e8eea2bf07485280b07254fa8f..75bb33f737d8333eb93bf699cbd15b6726dda239 100644 --- a/b_asic/scheduler_gui/main_window.py +++ b/b_asic/scheduler_gui/main_window.py @@ -15,7 +15,7 @@ import webbrowser from collections import defaultdict, deque from copy import deepcopy from importlib.machinery import SourceFileLoader -from typing import TYPE_CHECKING, Deque, cast, overload +from typing import TYPE_CHECKING, cast, overload # Qt/qtpy import qtpy @@ -115,8 +115,8 @@ class ScheduleMainWindow(QMainWindow, Ui_MainWindow): _splitter_pos: int _splitter_min: int _zoom: float - _color_per_type: dict[str, QColor] = dict() - _converted_color_per_type: dict[str, str] = dict() + _color_per_type: dict[str, QColor] = {} + _converted_color_per_type: dict[str, str] = {} def __init__(self): """Initialize Scheduler-GUI.""" @@ -137,12 +137,12 @@ class ScheduleMainWindow(QMainWindow, Ui_MainWindow): self._execution_time_plot_dialogs = defaultdict(lambda: None) self._ports_accesses_for_storage = None self._color_changed_per_type = False - self._changed_operation_colors: dict[str, QColor] = dict() + self._changed_operation_colors: dict[str, QColor] = {} # Recent files self._max_recent_files = 4 self._recent_files_actions: list[QAction] = [] - self._recent_file_paths: Deque[str] = deque(maxlen=self._max_recent_files) + self._recent_file_paths: deque[str] = deque(maxlen=self._max_recent_files) self._create_recent_file_actions_and_menus() self._init_graphics() @@ -349,7 +349,7 @@ class ScheduleMainWindow(QMainWindow, Ui_MainWindow): except Exception as e: log.exception( "Exception occurred. Could not load module from file" - " '{}'.\n\n{}".format(abs_path_filename, e) + f" '{abs_path_filename}'.\n\n{e}" ) return @@ -368,15 +368,13 @@ class ScheduleMainWindow(QMainWindow, Ui_MainWindow): ), ) log.info( - "Cannot find any Schedule object in file '{}'.".format( - os.path.basename(abs_path_filename) - ) + f"Cannot find any Schedule object in file '{os.path.basename(abs_path_filename)}'." ) del module return if len(schedule_obj_list) == 1: - schedule = [val for val in schedule_obj_list.values()][0] + schedule = list(schedule_obj_list.values())[0] else: ret_tuple = QInputDialog.getItem( self, diff --git a/b_asic/scheduler_gui/scheduler_item.py b/b_asic/scheduler_gui/scheduler_item.py index 4309154597d0959033549d127bfae1ffc6ed30a6..21df470c7e8d0631ba061e99c32efab430b203cc 100644 --- a/b_asic/scheduler_gui/scheduler_item.py +++ b/b_asic/scheduler_gui/scheduler_item.py @@ -306,7 +306,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup): @property def components(self) -> list[OperationItem]: - return list(component for component in self._operation_items.values()) + return list(self._operation_items.values()) @property def event_items(self) -> list[QGraphicsItem]: diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index de2a9db8754ec57973d28a8017b430fbabd13aa8..d5ccbc045ee87169532c8ed471160a326c4af030 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -15,8 +15,6 @@ from math import ceil from numbers import Number from queue import PriorityQueue from typing import ( - DefaultDict, - Deque, Optional, Union, cast, @@ -41,7 +39,7 @@ from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName DelayQueue = list[tuple[str, ResultKey, OutputPort]] -_OPERATION_SHAPE: DefaultDict[TypeName, str] = defaultdict(lambda: "ellipse") +_OPERATION_SHAPE: defaultdict[TypeName, str] = defaultdict(lambda: "ellipse") _OPERATION_SHAPE.update( { Input.type_name(): "cds", @@ -54,7 +52,7 @@ _OPERATION_SHAPE.update( class GraphIDGenerator: """Generates Graph IDs for objects.""" - _next_id_number: DefaultDict[TypeName, GraphIDNumber] + _next_id_number: defaultdict[TypeName, GraphIDNumber] def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)): """Construct a GraphIDGenerator.""" @@ -112,7 +110,7 @@ class SFG(AbstractOperation): """ _components_by_id: dict[GraphID, GraphComponent] - _components_by_name: DefaultDict[Name, list[GraphComponent]] + _components_by_name: defaultdict[Name, list[GraphComponent]] _components_dfs_order: list[GraphComponent] _operations_dfs_order: list[Operation] _operations_topological_order: list[Operation] @@ -508,7 +506,7 @@ class SFG(AbstractOperation): input_op: index for index, input_op in enumerate(self._input_operations) } output_op = self._output_operations[output_index] - queue: Deque[Operation] = deque([output_op]) + queue: deque[Operation] = deque([output_op]) visited: set[Operation] = {output_op} while queue: op = queue.popleft() @@ -587,7 +585,8 @@ class SFG(AbstractOperation): for comp in self._components_dfs_order if isinstance(comp, component_type) ] - return sorted(list(set(components)), key=lambda c: c.name or c.graph_id) + components = list(set(components)) # ensure no redundant elements + return sorted(components, key=lambda c: c.name or c.graph_id) def find_by_id(self, graph_id: GraphID) -> GraphComponent | None: """ @@ -1115,7 +1114,7 @@ class SFG(AbstractOperation): remaining_inports_per_operation = {op: op.input_count for op in self.operations} # Maps number of input counts to a queue of seen objects with such a size. - seen_with_inputs_dict: dict[int, Deque] = defaultdict(deque) + seen_with_inputs_dict: dict[int, deque] = defaultdict(deque) seen = set() top_order = [] @@ -1873,8 +1872,6 @@ class SFG(AbstractOperation): """ Return the recursive loops found in the SFG. - If -1, the SFG does not have any loops. - Returns ------- A list of the recursive loops. @@ -1888,7 +1885,7 @@ class SFG(AbstractOperation): return [] for input in inputs_used: input_op = self._input_operations[input] - queue: Deque[Operation] = deque([input_op]) + queue: deque[Operation] = deque([input_op]) visited: set[Operation] = {input_op} dict_of_sfg = {} while queue: @@ -1958,7 +1955,7 @@ class SFG(AbstractOperation): dict_of_sfg = {} for output in output_index_used: output_op = self._output_operations[output] - queue: Deque[Operation] = deque([output_op]) + queue: deque[Operation] = deque([output_op]) visited: set[Operation] = {output_op} while queue: op = queue.popleft() @@ -1974,7 +1971,7 @@ class SFG(AbstractOperation): raise ValueError("Source does not exist") for input in input_index_used: input_op = self._input_operations[input] - queue: Deque[Operation] = deque([input_op]) + queue: deque[Operation] = deque([input_op]) visited: set[Operation] = {input_op} while queue: op = queue.popleft() diff --git a/b_asic/simulation.py b/b_asic/simulation.py index 4b3004339da76a48d38f63338b5abf9c88b9630d..80fe184b0cb2526d23d27700fcbb025264affc90 100644 --- a/b_asic/simulation.py +++ b/b_asic/simulation.py @@ -7,9 +7,6 @@ Contains a class for simulating the result of an SFG given a set of input values from collections import defaultdict from collections.abc import Callable, Mapping, MutableMapping, MutableSequence, Sequence from numbers import Number -from typing import ( - Union, -) import numpy as np @@ -20,7 +17,7 @@ from b_asic.types import Num ResultArrayMap = Mapping[ResultKey, Sequence[Num]] MutableResultArrayMap = MutableMapping[ResultKey, MutableSequence[Num]] InputFunction = Callable[[int], Num] -InputProvider = Union[Num, Sequence[Num], InputFunction] +InputProvider = Num | Sequence[Num] | InputFunction class Simulation: diff --git a/b_asic/special_operations.py b/b_asic/special_operations.py index dbffb2af8436e88f86361a3864a54d9924be8ae5..ca115908f2a9e44243b2867a3781b2cac90f7f2a 100644 --- a/b_asic/special_operations.py +++ b/b_asic/special_operations.py @@ -90,7 +90,7 @@ class Input(AbstractOperation): def get_input_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited - return tuple() + return () def get_output_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited @@ -153,7 +153,7 @@ class Output(AbstractOperation): def get_output_coordinates(self) -> tuple[tuple[float, float], ...]: # doc-string inherited - return tuple() + return () @property def latency(self) -> int: diff --git a/b_asic/types.py b/b_asic/types.py index b141be1c8fc5b61fb70fc41daa2d2124b90696af..5dfc6d9886afd8c4b45ee1bb8b44d059543e084d 100644 --- a/b_asic/types.py +++ b/b_asic/types.py @@ -1,7 +1,7 @@ -from typing import NewType, Union +from typing import NewType # https://stackoverflow.com/questions/69334475/how-to-hint-at-number-types-i-e-subclasses-of-number-not-numbers-themselv -Num = Union[int, float, complex] +Num = int | float | complex NumRuntime = (complex, float, int) diff --git a/docs_sphinx/conf.py b/docs_sphinx/conf.py index 1822a3ecd41abbddc6a8b84e5c7b5a4740dbc475..0212d854335cd858ff3276ef54d04060a3a49664 100644 --- a/docs_sphinx/conf.py +++ b/docs_sphinx/conf.py @@ -56,7 +56,7 @@ numpydoc_validation_checks = { "RT03", } -inheritance_node_attrs = dict(fontsize=16) +inheritance_node_attrs = {"fontsize": 16} graphviz_dot = shutil.which("dot") diff --git a/examples/auto_scheduling_with_custom_io_times.py b/examples/auto_scheduling_with_custom_io_times.py index 3e70b0524677156fce66106430c1d2a0d470bfeb..381d982d1ed7c19e6fb3510d21a888a856f77f67 100644 --- a/examples/auto_scheduling_with_custom_io_times.py +++ b/examples/auto_scheduling_with_custom_io_times.py @@ -8,10 +8,13 @@ It is possible to specify the IO times and provide those to the scheduling. from b_asic.core_operations import Butterfly, ConstantMultiplication from b_asic.list_schedulers import HybridScheduler +from b_asic.logger import getLogger from b_asic.schedule import Schedule -from b_asic.scheduler import ASAPScheduler +from b_asic.scheduler import ALAPScheduler, ASAPScheduler from b_asic.sfg_generators import radix_2_dif_fft +getLogger("list_scheduler", console_log_level="debug") + points = 8 sfg = radix_2_dif_fft(points=points) @@ -27,16 +30,21 @@ sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1) sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1) # %% -# Generate an ASAP schedule for reference. -schedule1 = Schedule(sfg, scheduler=ASAPScheduler()) +# Generate an ASAP schedule for reference with custom IO times. +input_times = {f"in{i}": i for i in range(points)} +output_delta_times = {f"out{i}": i for i in range(points)} +schedule1 = Schedule(sfg, scheduler=ASAPScheduler(input_times, output_delta_times)) schedule1.show() +# %% +# Generate an ALAP schedule for reference with custom IO times.. +schedule_t = Schedule(sfg, scheduler=ALAPScheduler(input_times, output_delta_times)) +schedule_t.show() + # %% # Generate a non-cyclic Schedule from HybridScheduler with custom IO times, -# one input and output per time unit +# one input and output per time unit and one butterfly/multiplication per time unit. resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1} -input_times = {f"in{i}": i for i in range(points)} -output_delta_times = {f"out{i}": i for i in range(points)} schedule2 = Schedule( sfg, scheduler=HybridScheduler( diff --git a/pyproject.toml b/pyproject.toml index c1ab97e23f9366426b5fc9b4e52e4c06990fed52..9d44ead1b48a0697f21d7dd3eaffa048944797d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,8 +100,8 @@ precision = 2 exclude = ["examples"] [tool.ruff.lint] -select = ["E4", "E7", "E9", "F", "SIM", "B"] -ignore = ["F403", "B008", "B021", "B006"] +select = ["E4", "E7", "E9", "F", "SIM", "B", "NPY", "C4", "UP"] +ignore = ["F403", "B008", "B021", "B006", "UP038"] [tool.typos] default.extend-identifiers = { ba = "ba", addd0 = "addd0", inout = "inout", ArChItEctUrE = "ArChItEctUrE" } diff --git a/test/fixtures/signal_flow_graph.py b/test/fixtures/signal_flow_graph.py index df3e50b02cf193da1344d02a99d832d6e39cc46b..7764e47f43dc1e9f3c760dc4ffa4ed918d0ed347 100644 --- a/test/fixtures/signal_flow_graph.py +++ b/test/fixtures/signal_flow_graph.py @@ -1,5 +1,3 @@ -from typing import Optional - import pytest from b_asic import ( @@ -195,9 +193,7 @@ def sfg_custom_operation(): """A valid SFG containing a custom operation.""" class CustomOperation(AbstractOperation): - def __init__( - self, src0: Optional[SignalSourceProvider] = None, name: Name = "" - ): + def __init__(self, src0: SignalSourceProvider | None = None, name: Name = ""): super().__init__( input_count=1, output_count=2, name=name, input_sources=[src0] ) diff --git a/test/unit/test_architecture.py b/test/unit/test_architecture.py index 6ceabe631a215578926048dc5b722a8addadfacc..e611c0481f16781db702666a9ab5eb706a07a30e 100644 --- a/test/unit/test_architecture.py +++ b/test/unit/test_architecture.py @@ -1,6 +1,5 @@ import re from itertools import chain -from typing import List import pytest @@ -89,7 +88,7 @@ def test_architecture(schedule_direct_form_iir_lp_filter: Schedule): assert multiplier.entity_name == "multiplier" input_pe = ProcessingElement(inputs[0], entity_name="input") output_pe = ProcessingElement(outputs[0], entity_name="output") - processing_elements: List[ProcessingElement] = [ + processing_elements: list[ProcessingElement] = [ adder, multiplier, input_pe, @@ -111,7 +110,7 @@ def test_architecture(schedule_direct_form_iir_lp_filter: Schedule): direct_conn, mvs = mvs.split_on_length() # Create Memories from the memory variables - memories: List[Memory] = [ + memories: list[Memory] = [ Memory(pc) for pc in mvs.split_on_ports(read_ports=1, write_ports=1) ] assert len(memories) == 1 @@ -198,7 +197,7 @@ def test_move_process(schedule_direct_form_iir_lp_filter: Schedule): outputs = operations.get_by_type_name(Output.type_name()).split_on_execution_time() # Create necessary processing elements - processing_elements: List[ProcessingElement] = [ + processing_elements: list[ProcessingElement] = [ ProcessingElement(operation, entity_name=f"pe{i}") for i, operation in enumerate(chain(adders1, adders2, const_mults)) ] @@ -211,7 +210,7 @@ def test_move_process(schedule_direct_form_iir_lp_filter: Schedule): direct_conn, mvs = mvs.split_on_length() # Create Memories from the memory variables (split on length to get two memories) - memories: List[Memory] = [Memory(pc) for pc in mvs.split_on_length(6)] + memories: list[Memory] = [Memory(pc) for pc in mvs.split_on_length(6)] # Create architecture architecture = Architecture( diff --git a/test/unit/test_list_schedulers.py b/test/unit/test_list_schedulers.py index 52d29a130c9dad0dff2c6c3ec2ea6784502640ed..edff8a5da4d874f608872084130cb0ceccfb553c 100644 --- a/test/unit/test_list_schedulers.py +++ b/test/unit/test_list_schedulers.py @@ -882,7 +882,7 @@ class TestHybridScheduler: assert schedule.schedule_time == 16 # validate regenerated sfg with random 2x2 real s.p.d. matrix - A = np.random.rand(2, 2) + A = np.random.default_rng().random((2, 2)) A = np.dot(A, A.T) A_inv = np.linalg.inv(A) input_signals = [] @@ -1202,7 +1202,7 @@ class TestHybridScheduler: for i in range(POINTS): assert schedule.start_times[f"in{i}"] == i - assert schedule.start_times[f"out{i}"] == 95 + i + assert schedule.start_times[f"out{i}"] == 81 + i # too slow for pipeline timeout # def test_64_point_fft_custom_io_times(self): @@ -1258,12 +1258,7 @@ class TestHybridScheduler: for i in range(POINTS): assert schedule.start_times[f"in{i}"] == i - if i == 0: - expected_value = 95 - elif i == 1: - expected_value = 96 - else: - expected_value = i - 1 + expected_value = ((81 + i - 1) % 96) + 1 assert schedule.start_times[f"out{i}"] == expected_value def test_cyclic_scheduling(self): @@ -1663,7 +1658,7 @@ class TestHybridScheduler: "rec2": 36, } - assert all([val == 0 for val in schedule.laps.values()]) + assert all(val == 0 for val in schedule.laps.values()) _validate_recreated_sfg_ldlt_matrix_inverse(schedule, 3) def test_latency_offsets_cyclic(self): @@ -1789,6 +1784,38 @@ class TestListScheduler: ), ) + def test_cyclic_and_recursive_loops(self): + N = 3 + Wc = 0.2 + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + sfg = direct_form_1_iir(b, a) + + sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2) + sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1) + sfg.set_latency_of_type_name(Addition.type_name(), 3) + sfg.set_execution_time_of_type_name(Addition.type_name(), 1) + + resources = { + Addition.type_name(): 1, + ConstantMultiplication.type_name(): 1, + Input.type_name(): 1, + Output.type_name(): 1, + } + + with pytest.raises( + ValueError, + match="ListScheduler does not support cyclic scheduling of recursive algorithms. Use RecursiveListScheduler instead.", + ): + Schedule( + sfg, + scheduler=ListScheduler( + sort_order=((1, True), (3, False), (4, False)), + max_resources=resources, + ), + cyclic=True, + schedule_time=sfg.iteration_period_bound(), + ) + class TestRecursiveListScheduler: def test_empty_sfg(self, sfg_empty): @@ -1985,7 +2012,7 @@ def _validate_recreated_sfg_ldlt_matrix_inverse( delays = [0 for i in range(num_of_outputs)] # random real s.p.d matrix - A = np.random.rand(N, N) + A = np.random.default_rng().random((N, N)) A = np.dot(A, A.T) # iterate through the upper diagonal and construct the input to the SFG diff --git a/test/unit/test_schedule.py b/test/unit/test_schedule.py index b23c76a0865deb3e802698d77963d0792ce1ac29..5795dc64939a2291fe07df2fa927572c83d865ba 100644 --- a/test/unit/test_schedule.py +++ b/test/unit/test_schedule.py @@ -788,7 +788,7 @@ class TestErrors: def test_no_latency(self, sfg_simple_filter): with pytest.raises( ValueError, - match="Input port 0 of operation add0 has no latency-offset.", + match="Input port 0 of operation cmul0 has no latency-offset.", ): Schedule(sfg_simple_filter, scheduler=ASAPScheduler()) diff --git a/test/unit/test_sfg.py b/test/unit/test_sfg.py index 7cd54fc40a7e469605b8a95ebcdc5d3db822ed50..c5691aaf9155423c384d09cf59b02e5a7f6f1664 100644 --- a/test/unit/test_sfg.py +++ b/test/unit/test_sfg.py @@ -4,8 +4,8 @@ import random import re import string import sys +from collections import Counter from os import path, remove -from typing import Counter, Dict, Type import numpy as np import pytest @@ -359,8 +359,8 @@ class TestInsertComponent: _sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id) assert _sfg.evaluate() != sfg.evaluate() - assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations]) - assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations]) + assert any(isinstance(comp, SquareRoot) for comp in _sfg.operations) + assert not any(isinstance(comp, SquareRoot) for comp in sfg.operations) assert not isinstance( sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, @@ -1514,7 +1514,7 @@ class TestCriticalPath: class TestUnfold: - def count_kinds(self, sfg: SFG) -> Dict[Type, int]: + def count_kinds(self, sfg: SFG) -> dict[type, int]: return Counter([type(op) for op in sfg.operations]) # Checks that the number of each kind of operation in sfg2 is multiple*count @@ -1664,8 +1664,8 @@ class TestInsertComponentAfter: ) assert _sfg.evaluate() != sfg.evaluate() - assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations]) - assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations]) + assert any(isinstance(comp, SquareRoot) for comp in _sfg.operations) + assert not any(isinstance(comp, SquareRoot) for comp in sfg.operations) assert not isinstance( sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, @@ -1716,8 +1716,8 @@ class TestInsertComponentBefore: ) assert _sfg.evaluate() != sfg.evaluate() - assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations]) - assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations]) + assert any(isinstance(comp, SquareRoot) for comp in _sfg.operations) + assert not any(isinstance(comp, SquareRoot) for comp in sfg.operations) assert not isinstance( sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation, diff --git a/test/unit/test_sfg_generators.py b/test/unit/test_sfg_generators.py index a51224bd121887976bc3879b4316c757ee999036..569528b88c0875209d6208aa3b3fb14d3851e1d9 100644 --- a/test/unit/test_sfg_generators.py +++ b/test/unit/test_sfg_generators.py @@ -293,12 +293,12 @@ class TestDirectFormIIRType1: with pytest.raises( ValueError, match="Size of coefficient lists a and b are not the same." ): - direct_form_1_iir([i for i in range(10)], [i for i in range(11)]) + direct_form_1_iir(list(range(10)), list(range(11))) with pytest.raises( ValueError, match="Size of coefficient lists a and b are not the same." ): - direct_form_1_iir([i for i in range(10)], [i for i in range(11)]) + direct_form_1_iir(list(range(10)), list(range(11))) def test_a0_not_1(self): with pytest.raises(ValueError, match=r"The value of a\[0] must be 1\."): @@ -310,7 +310,7 @@ class TestDirectFormIIRType1: b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_1_iir(b, a, name="test iir direct form 1") @@ -326,7 +326,7 @@ class TestDirectFormIIRType1: b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_1_iir(b, a, name="test iir direct form 1") @@ -343,7 +343,7 @@ class TestDirectFormIIRType1: b, a = signal.ellip(N, 0.1, 60, Wc, btype="low", analog=False) b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_1_iir(b, a, name="test iir direct form 1") @@ -430,7 +430,7 @@ class TestDirectFormIIRType2: b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_2_iir(b, a, name="test iir direct form 1") @@ -446,7 +446,7 @@ class TestDirectFormIIRType2: b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_2_iir(b, a, name="test iir direct form 1") @@ -463,7 +463,7 @@ class TestDirectFormIIRType2: b, a = signal.ellip(N, 0.1, 60, Wc, btype="low", analog=False) b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - input_signal = np.random.randn(100) + input_signal = np.random.default_rng().standard_normal(100) reference_filter_output = signal.lfilter(b, a, input_signal) sfg = direct_form_2_iir(b, a, name="test iir direct form 1") @@ -804,7 +804,7 @@ class TestLdltMatrixInverse: # assert np.isclose(actual_values[i], expected_values[i]) def _generate_random_spd_matrix(self, N: int) -> np.ndarray: - A = np.random.rand(N, N) + A = np.random.default_rng().random((N, N)) A = (A + A.T) / 2 # ensure symmetric min_eig = np.min(np.linalg.eigvals(A)) A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness