-
Mikael Henriksson authoredMikael Henriksson authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
resources.py 54.49 KiB
import io
import re
from collections import Counter
from functools import reduce
from typing import Dict, Iterable, List, Optional, Tuple, TypeVar, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
from b_asic._preferences import LATENCY_COLOR, WARNING_COLOR
from b_asic.codegen.vhdl.common import is_valid_vhdl_identifier
from b_asic.process import (
MemoryProcess,
MemoryVariable,
OperatorProcess,
PlainMemoryVariable,
Process,
)
from b_asic.types import TypeName
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
_WARNING_COLOR = tuple(c / 255 for c in WARNING_COLOR)
#
# Human-intuitive sorting:
# https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
#
# Typing '_T' to help Pyright propagate type-information
#
_T = TypeVar('_T')
def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
"""Sort the given iterable in the way that humans expect."""
def convert(text):
return int(text) if text.isdigit() else text
def alphanum_key(key):
return [convert(c) for c in re.split('([0-9]+)', str(key))]
return sorted(to_be_sorted, key=alphanum_key)
def _sanitize_port_option(
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Tuple[int, int, int]:
"""
General port sanitization function used to test if a port specification makes sense.
Raises ValueError if the port specification is in-proper.
Parameters
----------
read_ports : int, optional
The number of read ports.
write_ports : int, optional
The number of write ports.
total_ports : int, optional
The total number of ports
Returns
-------
Returns a triple int tuple (read_ports, write_ports, total_ports) equal to the
input, or sanitized if one of the input equals None. If total_ports is set to None
at the input, it is set to read_ports+write_ports at the output. If read_ports or
write_ports is set to None at the input, it is set to total_ports at the output.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if total_ports < read_ports:
raise ValueError(
f'Total ports ({total_ports}) less then read ports ({read_ports})'
)
if total_ports < write_ports:
raise ValueError(
f'Total ports ({total_ports}) less then write ports ({write_ports})'
)
return read_ports, write_ports, total_ports
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
ax: Optional[Axes] = None,
color_list: Optional[Union[List[str], List[Tuple[float, float, float]]]] = None,
) -> None:
"""
Helper function for drawing a colored exclusion graphs.
Example usage:
.. code-block:: python
import networkx as nx
import matplotlib.pyplot as plt
_, ax = plt.subplots()
collection = ProcessCollection(...)
exclusion_graph = collection.create_exclusion_graph_from_ports(
read_ports = 1,
write_ports = 1,
total_ports = 2,
)
coloring = nx.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, coloring, ax=ax)
plt.show()
Parameters
----------
exclusion_graph : :class:`networkx.Graph`
The :class:`networkx.Graph` exclusion graph object that is to be drawn.
color_dict : dict
A dict where keys are :class:`~b_asic.process.Process` objects and values are
integers representing colors. These dictionaries are automatically generated by
:func:`networkx.algorithms.coloring.greedy_color`.
ax : :class:`matplotlib.axes.Axes`, optional
A Matplotlib :class:`~matplotlib.axes.Axes` object to draw the exclusion graph.
color_list : iterable of color, optional
A list of colors in Matplotlib format.
Returns
-------
None
"""
COLOR_LIST = [
'#aa0000',
'#00aa00',
'#0000ff',
'#ff00aa',
'#ffaa00',
'#ffffff',
'#00ffaa',
'#aaff00',
'#aa00ff',
'#00aaff',
'#ff0000',
'#00ff00',
'#0000aa',
'#aaaa00',
'#aa00aa',
'#00aaaa',
'#666666',
]
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
else:
node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
node_color_list = [node_color_dict[node] for node in exclusion_graph]
nx.draw_networkx(
exclusion_graph,
node_color=node_color_list,
ax=ax,
pos=nx.spring_layout(exclusion_graph, seed=1),
)
class _ForwardBackwardEntry:
def __init__(
self,
inputs: Optional[List[Process]] = None,
outputs: Optional[List[Process]] = None,
regs: Optional[List[Optional[Process]]] = None,
back_edge_to: Optional[Dict[int, int]] = None,
back_edge_from: Optional[Dict[int, int]] = None,
outputs_from: Optional[int] = None,
):
"""
Single entry in a _ForwardBackwardTable.
Aggregate type of input, output and list of registers.
Parameters
----------
inputs : list of Process, optional
input
outputs : list of Process, optional
output
regs : list of Process, optional
regs
back_edge_to : dict, optional
Dictionary containing back edges of this entry to registers in the next
entry.
back_edge_from : dict, optional
Dictionary containing the back edge of the previous entry to registers in
this entry.
outputs_from : int, optional
"""
self.inputs: List[Process] = [] if inputs is None else inputs
self.outputs: List[Process] = [] if outputs is None else outputs
self.regs: List[Optional[Process]] = [] if regs is None else regs
self.back_edge_to: Dict[int, int] = {} if back_edge_to is None else back_edge_to
self.back_edge_from: Dict[int, int] = (
{} if back_edge_from is None else back_edge_from
)
self.outputs_from = outputs_from
class _ForwardBackwardTable:
def __init__(self, collection: 'ProcessCollection'):
"""
Forward-Backward allocation table for ProcessCollections.
This structure implements the forward-backward register allocation algorithm,
which is used to generate hardware from MemoryVariables in a ProcessCollection.
Parameters
----------
collection : ProcessCollection
ProcessCollection to apply forward-backward allocation on
"""
# Generate an alive variable list
self._collection = set(collection.collection)
self._live_variables: List[int] = [0] * collection.schedule_time
for mv in self._collection:
stop_time = mv.start_time + mv.execution_time
for alive_time in range(mv.start_time, stop_time):
self._live_variables[alive_time % collection.schedule_time] += 1
# First, create an empty forward-backward table with the right dimensions
self.table: List[_ForwardBackwardEntry] = []
for _ in range(collection.schedule_time):
entry = _ForwardBackwardEntry()
# https://github.com/microsoft/pyright/issues/1073
for _ in range(max(self._live_variables)):
entry.regs.append(None)
self.table.append(entry)
# Insert all processes (one per time-slot) to the table input
# TODO: "Input each variable at the time step corresponding to the beginning of
# its lifetime. If multiple variables are input in a given cycle, these
# are allocated to multple registers such that the variable with the
# longest lifetime is allocated to the inital register and the other
# variables are allocated to consecutive registers in decreasing order
# of lifetime." -- K. Parhi
for mv in collection:
self.table[mv.start_time].inputs.append(mv)
if mv.execution_time:
self.table[(mv.start_time + 1) % collection.schedule_time].regs[0] = mv
else:
self.table[mv.start_time].outputs.append(mv)
self.table[mv.start_time].outputs_from = -1
# Forward-backward allocation
forward = True
while not self._forward_backward_is_complete():
if forward:
self._do_forward_allocation()
else:
self._do_single_backward_allocation()
forward = not (forward)
def _forward_backward_is_complete(self) -> bool:
s = {proc for e in self.table for proc in e.outputs}
return len(self._collection - s) == 0
def _do_forward_allocation(self):
"""
Forward all Processes as far as possible in the register chain.
Processes are forwarded until they reach their end time (at which they are
added to the output list), or until they reach the end of the register chain.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
# Note that two passes of the forward allocation need to be done, since
# variables may loop around the schedule cycle boundary.
for _ in range(2):
for time, entry in enumerate(self.table):
for reg_idx, reg in enumerate(entry.regs):
if reg is not None:
reg_end_time = (reg.start_time + reg.execution_time) % rows
if reg_end_time == time:
if reg not in self.table[time].outputs:
self.table[time].outputs.append(reg)
self.table[time].outputs_from = reg_idx
elif reg_idx != cols - 1:
next_row = (time + 1) % rows
next_col = reg_idx + 1
if self.table[next_row].regs[next_col] not in (None, reg):
cell = self.table[next_row].regs[next_col]
raise ValueError(
f'Can\'t forward allocate {reg} in row={time},'
f' col={reg_idx} to next_row={next_row},'
f' next_col={next_col} (cell contains: {cell})'
)
else:
self.table[(time + 1) % rows].regs[reg_idx + 1] = reg
def _do_single_backward_allocation(self):
"""
Perform backward allocation of Processes in the allocation table.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
outputs = {out for e in self.table for out in e.outputs}
#
# Pass #1: Find any (one) non-dead variable from the last register and try to
# backward allocate it to a previous register where it is not blocking an open
# path. This heuristic helps minimize forward allocation moves later.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None and (
nreg_idx == 0 or entry.regs[nreg_idx - 1] is not None
):
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx
next_entry.back_edge_from[nreg_idx] = cols - 1
return
#
# Pass #2: Backward allocate the first non-dead variable from the last
# registers to an empty register.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None:
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx
next_entry.back_edge_from[nreg_idx] = cols - 1
return
# All passes failed, raise exception...
raise ValueError(
"Can't backward allocate any variable. This should not happen."
)
def __getitem__(self, key):
return self.table[key]
def __iter__(self):
yield from self.table
def __len__(self):
return len(self.table)
def __str__(self):
# ANSI escape codes for coloring in the forward-backward table stirng
GREEN_BACKGROUND_ANSI = "\u001b[42m"
BROWN_BACKGROUND_ANSI = "\u001b[43m"
RESET_BACKGROUND_ANSI = "\033[0m"
# Text width of input and output column
def lst_w(proc_lst):
return reduce(lambda n, p: n + len(str(p)) + 1, proc_lst, 0)
input_col_w = max(5, max(lst_w(pl.inputs) for pl in self.table) + 1)
output_col_w = max(5, max(lst_w(pl.outputs) for pl in self.table) + 1)
# Text width of register columns
reg_col_w = 0
for entry in self.table:
for reg in entry.regs:
reg_col_w = max(len(str(reg)), reg_col_w)
reg_col_w = max(4, reg_col_w + 2)
# Header row of the string
res = f' T |{"In":^{input_col_w}}|'
for i in range(max(self._live_variables)):
reg = f'R{i}'
res += f'{reg:^{reg_col_w}}|'
res += f'{"Out":^{output_col_w}}|'
res += '\n'
res += (
6 + input_col_w + (reg_col_w + 1) * max(self._live_variables) + output_col_w
) * '-' + '\n'
for time, entry in enumerate(self.table):
# Time
res += f'{time:^3}| '
# Input column
inputs_str = ''
for input in entry.inputs:
inputs_str += input.name + ','
if inputs_str:
inputs_str = inputs_str[:-1]
res += f'{inputs_str:^{input_col_w-1}}|'
# Register columns
for reg_idx, reg in enumerate(entry.regs):
if reg is None:
res += " " * reg_col_w + "|"
else:
if reg_idx in entry.back_edge_to:
res += f'{GREEN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
elif reg_idx in entry.back_edge_from:
res += f'{BROWN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
else:
res += f'{reg.name:^{reg_col_w}}' + "|"
# Output column
outputs_str = ''
for output in entry.outputs:
outputs_str += output.name + ','
if outputs_str:
outputs_str = outputs_str[:-1]
if entry.outputs_from is not None:
outputs_str += f"({entry.outputs_from})"
res += f'{outputs_str:^{output_col_w}}|'
res += '\n'
return res
class ProcessCollection:
r"""
Collection of :class:`~b_asic.process.Process` objects.
Parameters
----------
collection : Iterable of :class:`~b_asic.process.Process` objects
The :class:`~b_asic.process.Process` objects forming this
:class:`~b_asic.resources.ProcessCollection`.
schedule_time : int
The scheduling time associated with this
:class:`~b_asic.resources.ProcessCollection`.
cyclic : bool, default: False
Whether the processes operates cyclically, i.e., if time
.. math:: t = t \bmod T_{\textrm{schedule}}.
"""
def __init__(
self,
collection: Iterable[Process],
schedule_time: int,
cyclic: bool = False,
):
self._collection = list(collection)
self._schedule_time = schedule_time
self._cyclic = cyclic
@property
def collection(self) -> List[Process]:
return self._collection
@property
def schedule_time(self) -> int:
return self._schedule_time
def __len__(self):
return len(self.collection)
def add_process(self, process: Process):
"""
Add a new :class:`~b_asic.process.Process` to this
:class:`~b_asic.resources.ProcessCollection`.
Parameters
----------
process : :class:`~b_asic.process.Process`
The :class:`~b_asic.process.Process` object to add.
"""
if process in self.collection:
raise ValueError("Process already in ProcessCollection")
self.collection.append(process)
def remove_process(self, process: Process):
"""
Remove a :class:`~b_asic.process.Process` from this
:class:`~b_asic.resources.ProcessCollection`.
Raises :class:`KeyError` if the specified :class:`~b_asic.process.Process` is
not in this collection.
Parameters
----------
process : :class:`~b_asic.process.Process`
The :class:`~b_asic.process.Process` object to remove from this collection.
"""
if process not in self.collection:
raise KeyError(
f"Can't remove process: '{process}', as it is not in collection."
)
else:
self.collection.remove(process)
def plot(
self,
ax: Optional[Axes] = None,
*,
show_name: bool = True,
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
row: Optional[int] = None,
allow_excessive_lifetimes: bool = False,
):
"""
Plot all :class:`~b_asic.process.Process` objects of this
:class:`~b_asic.resources.ProcessCollection` in a lifetime diagram.
If the ``ax`` parameter is not specified, a new Matplotlib figure is created.
Raises :class:`KeyError` if any :class:`~b_asic.process.Process` lifetime
excedes this :class:`~b_asic.resources.ProcessCollection` schedule time,
unless ``allow_excessive_lifetimes`` is specified. In that case,
:class:`~b_asic.process.Process` objects whose lifetime exceed the scheudle
time are drawn using the B-ASIC warning color.
Parameters
----------
ax : :class:`matplotlib.axes.Axes`, optional
Matplotlib :class:`~matplotlib.axes.Axes` object to draw this lifetime chart
onto. If not provided (i.e., set to None), this method will return a new
Axes object.
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
row : int, optional
Render all processes in this collection on a specified row in the Matplotlib
axes object. Defaults to None, which renders all processes on separate rows.
This option is useful when drawing cell assignments.
allow_excessive_lifetimes : bool, default False
If set to true, the plot method allows ploting collections of variables with
a greater lifetime than the schedule time.
Returns
-------
ax : Associated Matplotlib Axes (or array of Axes) object
"""
# Set up the Axes object
if ax is None:
_, _ax = plt.subplots()
else:
_ax = ax
# Lifetime chart left and right padding
PAD_L, PAD_R = 0.05, 0.05
max_execution_time = max(process.execution_time for process in self._collection)
if not allow_excessive_lifetimes and max_execution_time > self._schedule_time:
# Schedule time needs to be greater than or equal to the maximum process
# lifetime
raise KeyError(
f'Error: Schedule time: {self._schedule_time} < Max execution'
f' time: {max_execution_time}'
)
# Generate the life-time chart
for i, process in enumerate(_sorted_nicely(self._collection)):
bar_row = i if row is None else row
bar_start = process.start_time
bar_end = bar_start + process.execution_time
bar_start = (
bar_start
if process.execution_time == 0
else bar_start % self._schedule_time
)
bar_end = (
self.schedule_time
if bar_end and bar_end % self._schedule_time == 0
else bar_end % self._schedule_time
)
if show_markers:
_ax.scatter( # type: ignore
x=bar_start,
y=bar_row + 1,
marker=marker_write,
color=marker_color,
zorder=10,
)
for end_time in process.read_times:
end_time = (
self.schedule_time
if end_time and end_time % self.schedule_time == 0
else end_time % self._schedule_time
)
_ax.scatter( # type: ignore
x=end_time,
y=bar_row + 1,
marker=marker_read,
color=marker_color,
zorder=10,
)
if process.execution_time > self.schedule_time:
# Execution time longer than schedule time, draw with warning color
_ax.broken_barh( # type: ignore
[(0, self.schedule_time)],
(bar_row + 0.55, 0.9),
color=_WARNING_COLOR,
)
elif process.execution_time == 0:
# Execution time zero, draw a slim bar
_ax.broken_barh( # type: ignore
[(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
(bar_row + 0.55, 0.9),
color=bar_color,
)
elif bar_end > bar_start:
_ax.broken_barh( # type: ignore
[(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
(bar_row + 0.55, 0.9),
color=bar_color,
)
else: # bar_end <= bar_start
_ax.broken_barh( # type: ignore
[
(
PAD_L + bar_start,
self._schedule_time - bar_start - PAD_L,
)
],
(bar_row + 0.55, 0.9),
color=bar_color,
)
_ax.broken_barh( # type: ignore
[(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color
)
if show_name:
_ax.annotate( # type: ignore
str(process),
(bar_start + PAD_L + 0.025, bar_row + 1.00),
va="center",
)
_ax.grid(True) # type: ignore
_ax.xaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore
_ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore
_ax.set_xlim(0, self._schedule_time) # type: ignore
if row is None:
_ax.set_ylim(0.25, len(self._collection) + 0.75) # type: ignore
else:
pass
return _ax
def show(
self,
*,
show_name: bool = True,
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
allow_excessive_lifetimes: bool = False,
title: Optional[str] = None,
) -> None:
"""
Display this :class:`~b_asic.resources.ProcessCollection` using the current
Matplotlib backend.
Equivalent to creating a Matplotlib figure, passing it and arguments to
:meth:`plot` and invoking :py:meth:`matplotlib.figure.Figure.show`.
Parameters
----------
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
allow_excessive_lifetimes : bool, default False
If set to True, the plot method allows ploting collections of variables with
a greater lifetime than the schedule time.
title : str, optional
Title of plot.
"""
fig, ax = plt.subplots()
self.plot(
ax=ax,
show_name=show_name,
bar_color=bar_color,
marker_color=marker_color,
marker_read=marker_read,
marker_write=marker_write,
show_markers=show_markers,
allow_excessive_lifetimes=allow_excessive_lifetimes,
)
if title:
fig.suptitle(title)
fig.show() # type: ignore
def create_exclusion_graph_from_ports(
self,
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> nx.Graph:
"""
Create an exclusion graph from a given number of read and write ports based on
concurrent read and write accesses to this
:class:`~b_asic.resources.ProcessCollection`.
Parameters
----------
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.
Returns
-------
A :class:`networkx.Graph` object.
"""
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Guard for proper read/write port settings
if read_ports != 1 or write_ports != 1:
raise ValueError(
"Splitting with read and write ports not equal to one with the"
" graph coloring heuristic does not make sense."
)
if total_ports not in (1, 2):
raise ValueError(
"Total ports should be either 1 (non-concurrent reads/writes)"
" or 2 (concurrent read/writes) for graph coloring heuristic."
)
# Create new exclusion graph. Nodes are Processes
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for node1 in exclusion_graph:
node1_stop_times = tuple(
read_time % self.schedule_time for read_time in node1.read_times
)
if total_ports == 1 and node1.start_time in node1_stop_times:
print(node1.start_time, node1_stop_times)
raise ValueError("Cannot read and write in same cycle.")
for node2 in exclusion_graph:
if node1 == node2:
continue
else:
node2_stop_times = tuple(
read_time % self.schedule_time for read_time in node2.read_times
)
for node1_stop_time in node1_stop_times:
for node2_stop_time in node2_stop_times:
if total_ports == 1:
# Single-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1.start_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
else:
# Dual-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
return exclusion_graph
def create_exclusion_graph_from_execution_time(self) -> nx.Graph:
"""
Create an exclusion graph from processes overlapping in execution time.
Returns
-------
A :class:`networkx.Graph` object.
"""
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for process1 in self._collection:
for process2 in self._collection:
if process1 == process2:
continue
else:
t1 = set(
range(
process1.start_time,
min(
process1.start_time + process1.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process1.start_time
+ process1.execution_time
- self._schedule_time,
)
)
)
t2 = set(
range(
process2.start_time,
min(
process2.start_time + process2.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process2.start_time
+ process2.execution_time
- self._schedule_time,
)
)
)
if t1.intersection(t2):
exclusion_graph.add_edge(process1, process2)
return exclusion_graph
def split_on_execution_time(
self,
heuristic: str = "graph_color",
coloring_strategy: str = "saturation_largest_first",
) -> List["ProcessCollection"]:
"""
Split a ProcessCollection based on overlapping execution time.
Parameters
----------
heuristic : {'graph_color', 'left_edge'}, default: 'graph_color'
The heuristic used when splitting based on execution times.
coloring_strategy : str, default: 'saturation_largest_first'
Node ordering strategy passed to
:func:`networkx.algorithms.coloring.greedy_color`.
This parameter is only considered if *heuristic* is set to 'graph_color'.
One of
* 'largest_first'
* 'random_sequential'
* 'smallest_last'
* 'independent_set'
* 'connected_sequential_bfs'
* 'connected_sequential_dfs' or 'connected_sequential'
* 'saturation_largest_first' or 'DSATUR'
Returns
-------
A list of new ProcessCollection objects with the process splitting.
"""
if heuristic == "graph_color":
return self._graph_color_assignment(coloring_strategy)
elif heuristic == "left_edge":
return self._left_edge_assignment()
else:
raise ValueError(f"Invalid heuristic '{heuristic}'")
def split_on_ports(
self,
heuristic: str = "graph_color",
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> List["ProcessCollection"]:
"""
Split this process storage based on concurrent read/write times according.
Different heurstic methods can be used.
Parameters
----------
heuristic : str, default: "graph_color"
The heuristic used when splitting this :class:`ProcessCollection`.
Valid options are:
* "graph_color"
* "..."
read_ports : int, optional
The number of read ports used when splitting process collection based on
memory variable access.
write_ports : int, optional
The number of write ports used when splitting process collection based on
memory variable access.
total_ports : int, optional
The total number of ports used when splitting process collection based on
memory variable access.
Returns
-------
A set of new ProcessCollection objects with the process splitting.
"""
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
if heuristic == "graph_color":
return self._split_ports_graph_color(read_ports, write_ports, total_ports)
else:
raise ValueError("Invalid heuristic provided.")
def _split_ports_graph_color(
self,
read_ports: int,
write_ports: int,
total_ports: int,
coloring_strategy: str = "saturation_largest_first",
) -> List["ProcessCollection"]:
"""
Parameters
----------
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.
coloring_strategy : str, default: 'saturation_largest_first'
Node ordering strategy passed to
:func:`networkx.algorithms.coloring.greedy_color`
One of
* 'largest_first'
* 'random_sequential'
* 'smallest_last'
* 'independent_set'
* 'connected_sequential_bfs'
* 'connected_sequential_dfs' or 'connected_sequential'
* 'saturation_largest_first' or 'DSATUR'
"""
# Create new exclusion graph. Nodes are Processes
exclusion_graph = self.create_exclusion_graph_from_ports(
read_ports, write_ports, total_ports
)
# Perform assignment from coloring and return result
coloring = nx.coloring.greedy_color(exclusion_graph, strategy=coloring_strategy)
return self._split_from_graph_coloring(coloring)
def _split_from_graph_coloring(
self,
coloring: Dict[Process, int],
) -> List["ProcessCollection"]:
"""
Split :class:`Process` objects into a set of :class:`ProcessesCollection`
objects based on a provided graph coloring.
Resulting :class:`ProcessCollection` will have the same schedule time and cyclic
property as self.
Parameters
----------
coloring : dict
Process->int (color) mappings
Returns
-------
A set of new ProcessCollections.
"""
process_collection_set_list = [[] for _ in range(max(coloring.values()) + 1)]
for process, color in coloring.items():
process_collection_set_list[color].append(process)
return [
ProcessCollection(process_collection_set, self._schedule_time, self._cyclic)
for process_collection_set in process_collection_set_list
]
def _repr_svg_(self) -> str:
"""
Generate an SVG_ of the resource collection. This is automatically displayed in
e.g. Jupyter Qt console.
"""
fig, ax = plt.subplots()
self.plot(ax=ax, show_markers=False)
f = io.StringIO()
fig.savefig(f, format="svg") # type: ignore
return f.getvalue()
# SVG is valid HTML. This is useful for e.g. sphinx-gallery
_repr_html_ = _repr_svg_
def __repr__(self):
return (
f"ProcessCollection({self._collection}, {self._schedule_time},"
f" {self._cyclic})"
)
def __iter__(self):
return iter(self._collection)
def _graph_color_assignment(
self,
coloring_strategy: str = "saturation_largest_first",
*,
coloring: Optional[Dict[Process, int]] = None,
) -> List["ProcessCollection"]:
"""
Perform assignment of the processes in this collection using graph coloring.
Two or more processes can share a single resource if, and only if, they have no
overlaping execution time.
Parameters
----------
coloring_strategy : str, default: "saturation_largest_first"
Graph coloring strategy passed to
:func:`networkx.algorithms.coloring.greedy_color`.
coloring : dict, optional
An optional graph coloring, dictionary with Process and its associated color
(int). If a graph coloring is not provided throught this parameter, one will
be created when calling this method.
Returns
-------
List[ProcessCollection]
"""
for process in self:
if process.execution_time > self.schedule_time:
# Can not assign process to any cell
raise ValueError(
f"{process} has execution time greater than the schedule time"
)
cell_assignment: Dict[int, ProcessCollection] = dict()
exclusion_graph = self.create_exclusion_graph_from_execution_time()
if coloring is None:
coloring = nx.coloring.greedy_color(
exclusion_graph, strategy=coloring_strategy
)
for process, cell in coloring.items():
if cell not in cell_assignment:
cell_assignment[cell] = ProcessCollection([], self._schedule_time)
cell_assignment[cell].add_process(process)
return list(cell_assignment.values())
def _left_edge_assignment(self) -> List["ProcessCollection"]:
"""
Perform assignment of the processes in this collection using the left-edge
algorithm.
Two or more processes can share a single resource if, and only if, they have no
overlaping execution time.
Raises :class:`ValueError` if any process in this collection has an execution
time which is greater than the collection schedule time.
Returns
-------
List[ProcessCollection]
"""
assignment: List[ProcessCollection] = []
for next_process in sorted(self):
if next_process.execution_time > self.schedule_time:
# Can not assign process to any cell
raise ValueError(
f"{next_process} has execution time greater than the schedule time"
)
elif next_process.execution_time == self.schedule_time:
# Always assign maximum lifetime process to new cell
assignment.append(
ProcessCollection(
(next_process,),
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
)
continue # Continue assigning next process
else:
next_process_stop_time = (
next_process.start_time + next_process.execution_time
) % self._schedule_time
insert_to_new_cell = True
for cell_assignment in assignment:
insert_to_this_cell = True
for process in cell_assignment:
# The next_process start_time is always greater than or equal to
# the start time of all other assigned processes
process_end_time = process.start_time + process.execution_time
if next_process.start_time < process_end_time:
insert_to_this_cell = False
break
if (
next_process.start_time
> next_process_stop_time
> process.start_time
):
insert_to_this_cell = False
break
if insert_to_this_cell:
cell_assignment.add_process(next_process)
insert_to_new_cell = False
break
if insert_to_new_cell:
assignment.append(
ProcessCollection(
(next_process,),
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
)
return assignment
def generate_memory_based_storage_vhdl(
self,
filename: str,
entity_name: str,
word_length: int,
assignment: List['ProcessCollection'],
read_ports: int = 1,
write_ports: int = 1,
total_ports: int = 2,
input_sync: bool = True,
):
"""
Generate VHDL code for memory based storage of processes (MemoryVariables).
Parameters
----------
filename : str
Filename of output file.
entity_name : str
Name used for the VHDL entity.
word_length : int
Word length of the memory variable objects.
assignment : set
A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the
integer corresponds to the cell to assign all MemoryVariables in
corresponding process collection.
If unset, each MemoryVariable will be assigned to a unique single cell.
read_ports : int, default: 1
The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be
set and total_ports is assumed to be read_ports + write_ports.
write_ports : int, default: 1
The number of write ports used when splitting process collection based on
memory variable access. If total ports is unset, this parameter has to be
set and total_ports is assumed to be read_ports + write_ports.
total_ports : int, default: 2
The total number of ports used when splitting process collection based on
memory variable access.
input_sync : bool, default: True
Add registers to the input signals (enable signal and data input signals).
Adding registers to the inputs allow pipelining of address generation
(which is added automatically). For large interleavers, this can improve
timing significantly.
"""
# Check that entity name is a valid VHDL identifier
if not is_valid_vhdl_identifier(entity_name):
raise KeyError(f'{entity_name} is not a valid identifier')
# Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in self._collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in self._collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of"
" (Plain)MemoryVariables"
)
# Sanitize port settings
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Make sure the provided assignment (List[ProcessCollection]) only
# contains memory variables from this (self).
for collection in assignment:
for mv in collection:
if mv not in self:
raise ValueError(f'{mv!r} is not part of {self!r}.')
# Make sure that concurrent reads/writes do not surpass the port setting
for mv in self:
def filter_write(p):
return p.start_time == mv.start_time
def filter_read(p):
return (
(p.start_time + p.execution_time) % self._schedule_time
== mv.start_time + mv.execution_time % self._schedule_time
)
needed_write_ports = len(list(filter(filter_write, self)))
needed_read_ports = len(list(filter(filter_read, self)))
if needed_write_ports > write_ports + 1:
raise ValueError(
f'More than {write_ports} write ports needed ({needed_write_ports})'
' to generate HDL for this ProcessCollection'
)
if needed_read_ports > read_ports + 1:
raise ValueError(
f'More than {read_ports} read ports needed ({needed_read_ports}) to'
' generate HDL for this ProcessCollection'
)
with open(filename, 'w') as f:
from b_asic.codegen.vhdl import architecture, common, entity
common.b_asic_preamble(f)
common.ieee_header(f)
entity.memory_based_storage(
f, entity_name=entity_name, collection=self, word_length=word_length
)
architecture.memory_based_storage(
f,
assignment=assignment,
entity_name=entity_name,
word_length=word_length,
read_ports=read_ports,
write_ports=write_ports,
total_ports=total_ports,
input_sync=input_sync,
)
def split_on_length(
self, length: int = 0
) -> Tuple["ProcessCollection", "ProcessCollection"]:
"""
Split into two new ProcessCollections based on execution time length.
Parameters
----------
length : int, default: 0
The execution time length to split on. Length is inclusive for the smaller
collection.
Returns
-------
A tuple of two ProcessCollections, one with shorter than or equal execution
times and one with longer execution times.
"""
short = []
long = []
for process in self.collection:
if process.execution_time <= length:
short.append(process)
else:
if isinstance(process, MemoryProcess):
# Split this MultiReadProcess into two new processes
p_short, p_long = process.split_on_length(length)
if p_short is not None:
short.append(p_short)
if p_long is not None:
long.append(p_long)
else:
# Not a MultiReadProcess: has only a single read
long.append(process)
return (
ProcessCollection(short, self.schedule_time, self._cyclic),
ProcessCollection(long, self.schedule_time, self._cyclic),
)
def generate_register_based_storage_vhdl(
self,
filename: str,
word_length: int,
entity_name: str,
read_ports: int = 1,
write_ports: int = 1,
total_ports: int = 2,
):
"""
Generate VHDL code for register based storages of processes based on
Forward-Backward Register Allocation [1].
[1]: K. Parhi: VLSI Digital Signal Processing Systems: Design and
Implementation, Ch. 6.3.2
Parameters
----------
filename : str
Filename of output file.
word_length : int
Word length of the memory variable objects.
entity_name : str
Name used for the VHDL entity.
read_ports : int, default: 1
The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be
set and total_ports is assumed to be read_ports + write_ports.
write_ports : int, default: 1
The number of write ports used when splitting process collection based on
memory variable access. If total ports is unset, this parameter has to be
set and total_ports is assumed to be read_ports + write_ports.
total_ports : int, default: 2
The total number of ports used when splitting process collection based on
memory variable access.
"""
# Check that entity name is a valid VHDL identifier
if not is_valid_vhdl_identifier(entity_name):
raise KeyError(f'{entity_name} is not a valid identifier')
# Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in self._collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in self._collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of"
" (Plain)MemoryVariables"
)
# Sanitize port settings
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Create the forward-backward table
forward_backward_table = _ForwardBackwardTable(self)
with open(filename, 'w') as f:
from b_asic.codegen.vhdl import architecture, common, entity
common.b_asic_preamble(f)
common.ieee_header(f)
entity.register_based_storage(
f, entity_name=entity_name, collection=self, word_length=word_length
)
architecture.register_based_storage(
f,
forward_backward_table=forward_backward_table,
entity_name=entity_name,
word_length=word_length,
read_ports=read_ports,
write_ports=write_ports,
total_ports=total_ports,
)
def get_by_type_name(self, type_name: TypeName) -> "ProcessCollection":
"""
Return a new :class:`~b_asic.resources.ProcessCollection` with only a given
type of operation.
Parameters
----------
type_name : TypeName
The TypeName of the operation to extract.
Returns
-------
A new :class:`~b_asic.resources.ProcessCollection`.
"""
return ProcessCollection(
{
process
for process in self._collection
if isinstance(process, OperatorProcess)
and process.operation.type_name() == type_name
},
self._schedule_time,
self._cyclic,
)
def read_ports_bound(self) -> int:
"""
Get the read port lower-bound (maximum number of concurrent reads) of this
:class:`~b_asic.resources.ProcessCollection`.
Returns
-------
int
"""
reads = []
for process in self._collection:
reads.extend(
set(read_time % self.schedule_time for read_time in process.read_times)
)
count = Counter(reads)
return max(count.values())
def write_ports_bound(self) -> int:
"""
Get the write port lower-bound (maximum number of concurrent writes) of this
:class:`~b_asic.resources.ProcessCollection`.
Returns
-------
int
"""
writes = [process.start_time for process in self._collection]
count = Counter(writes)
return max(count.values())
def from_name(self, name: str):
"""
Get a :class:`~b_asic.process.Process` from this collection from its name.
Raises :class:`KeyError` if no processes with ``name`` is found in this
colleciton.
Parameters
----------
name : str
The name of the process to retrieve.
"""
name_to_proc = {p.name: p for p in self.collection}
if name not in name_to_proc:
raise KeyError(f'{name} not in {self}')
else:
return name_to_proc[name]