Skip to content
Snippets Groups Projects

Added new resource algorithm for minimizing pe -> memory connections etc

Merged Simon Bjurek requested to merge add-new-res-alloc-alg into master
3 files
+ 300
41
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 169
39
import io
import itertools
import re
import sys
from collections import Counter, defaultdict
from collections.abc import Iterable
from functools import reduce
from math import floor, log2
from typing import Literal, TypeVar
from typing import TYPE_CHECKING, Literal, TypeVar
import matplotlib.pyplot as plt
import networkx as nx
@@ -23,6 +24,9 @@ from b_asic.process import (
)
from b_asic.types import TypeName
if TYPE_CHECKING:
from b_asic.architecture import ProcessingElement
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
_WARNING_COLOR = tuple(c / 255 for c in WARNING_COLOR)
@@ -912,6 +916,7 @@ class ProcessCollection:
read_ports: int | None = None,
write_ports: int | None = None,
total_ports: int | None = None,
processing_elements: list["ProcessingElement"] | None = None,
) -> list["ProcessCollection"]:
"""
Split based on concurrent read and write accesses.
@@ -926,6 +931,8 @@ class ProcessCollection:
* "graph_color"
* "left_edge"
* "min_pe_to_mem"
* "min_mem_to_pe"
read_ports : int, optional
The number of read ports used when splitting process collection based on
@@ -939,6 +946,9 @@ class ProcessCollection:
The total number of ports used when splitting process collection based on
memory variable access.
processing_elements : list[ProcessingElements], optional
The currently used PEs, only required if heuristic = "min_mem_to_pe".
Returns
-------
A set of new ProcessCollection objects with the process splitting.
@@ -949,16 +959,40 @@ class ProcessCollection:
if heuristic == "graph_color":
return self._split_ports_graph_color(read_ports, write_ports, total_ports)
elif heuristic == "left_edge":
return self.split_ports_sequentially(
return self._split_ports_sequentially(
read_ports,
write_ports,
total_ports,
sequence=sorted(self),
)
elif heuristic == "min_pe_to_mem":
if processing_elements is None:
raise ValueError(
"processing_elements must be provided if heuristic = 'min_pe_to_mem'"
)
return self._split_ports_minimize_pe_to_memory_connections(
read_ports,
write_ports,
total_ports,
sequence=sorted(self),
processing_elements=processing_elements,
)
elif heuristic == "min_mem_to_pe":
if processing_elements is None:
raise ValueError(
"processing_elements must be provided if heuristic = 'min_mem_to_pe'"
)
return self._split_ports_minimize_memory_to_pe_connections(
read_ports,
write_ports,
total_ports,
sequence=sorted(self),
processing_elements=processing_elements,
)
else:
raise ValueError("Invalid heuristic provided.")
def split_ports_sequentially(
def _split_ports_sequentially(
self,
read_ports: int,
write_ports: int,
@@ -995,36 +1029,6 @@ class ProcessCollection:
A set of new ProcessCollection objects with the process splitting.
"""
def ports_collide(proc: Process, collection: ProcessCollection):
"""
Predicate test if insertion of a process `proc` results in colliding ports
when inserted to `collection` based on the `read_ports`, `write_ports`, and
`total_ports`.
"""
# Test the number of concurrent write accesses
collection_writes = defaultdict(int, collection.write_port_accesses())
if collection_writes[proc.start_time] >= write_ports:
return True
# Test the number of concurrent read accesses
collection_reads = defaultdict(int, collection.read_port_accesses())
for proc_read_time in proc.read_times:
if collection_reads[proc_read_time % self.schedule_time] >= read_ports:
return True
# Test the number of total accesses
collection_total_accesses = defaultdict(
int, Counter(collection_writes) + Counter(collection_reads)
)
for access_time in [proc.start_time, *proc.read_times]:
if collection_total_accesses[access_time] >= total_ports:
return True
# No collision detected
return False
# Make sure that processes from `sequence` and and `self` are equal
if set(self.collection) != set(sequence):
raise KeyError("processes in `sequence` must be equal to processes in self")
@@ -1032,12 +1036,13 @@ class ProcessCollection:
for process in sequence:
process_added = False
for collection in collections:
if not ports_collide(process, collection):
if not self._ports_collide(
process, collection, write_ports, read_ports, total_ports
):
collection.add_process(process)
process_added = True
break
if not process_added:
# Stuff the process in a new collection
collections.append(
ProcessCollection(
[process],
@@ -1045,9 +1050,137 @@ class ProcessCollection:
cyclic=self._cyclic,
)
)
# Return the list of created ProcessCollections
return collections
def _split_ports_minimize_pe_to_memory_connections(
self,
read_ports: int,
write_ports: int,
total_ports: int,
sequence: list[Process],
processing_elements: list["ProcessingElement"],
) -> list["ProcessCollection"]:
if set(self.collection) != set(sequence):
raise KeyError("processes in `sequence` must be equal to processes in self")
num_of_memories = len(
self._split_ports_sequentially(
read_ports, write_ports, total_ports, sequence
)
)
collections: list[ProcessCollection] = [
ProcessCollection(
[],
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
for _ in range(num_of_memories)
]
for process in sequence:
process_fits_in_collection = self._get_process_fits_in_collection(
process, collections, read_ports, write_ports, total_ports
)
best_collection = None
best_delta = sys.maxsize
for i, collection in enumerate(collections):
if process_fits_in_collection[i]:
count_1 = ProcessCollection._count_number_of_pes_connected(
processing_elements, collection
)
tmp_collection = [*collection.collection, process]
count_2 = ProcessCollection._count_number_of_pes_connected(
processing_elements, tmp_collection
)
delta = count_2 - count_1
if delta < best_delta:
best_collection = collection
best_delta = delta
elif not any(process_fits_in_collection):
collections.append(
ProcessCollection(
[],
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
)
process_fits_in_collection = self._get_process_fits_in_collection(
process, collections, read_ports, write_ports, total_ports
)
best_collection.add_process(process)
for i in range(len(collections)):
if not collections[i].collection:
collections.pop(i)
return collections
def _split_ports_minimize_memory_to_pe_connections(
self,
read_ports: int,
write_ports: int,
total_ports: int,
sequence: list[Process],
processing_elements: list["ProcessingElement"],
) -> list["ProcessCollection"]:
raise NotImplementedError()
def _get_process_fits_in_collection(
self, process, collections, write_ports, read_ports, total_ports
) -> list[bool]:
return [
not self._ports_collide(
process, collection, write_ports, read_ports, total_ports
)
for collection in collections
]
def _ports_collide(
self,
proc: Process,
collection: "ProcessCollection",
write_ports: int,
read_ports: int,
total_ports: int,
) -> bool:
# Test the number of concurrent write accesses
collection_writes = defaultdict(int, collection.write_port_accesses())
if collection_writes[proc.start_time] >= write_ports:
return True
# Test the number of concurrent read accesses
collection_reads = defaultdict(int, collection.read_port_accesses())
for proc_read_time in proc.read_times:
if collection_reads[proc_read_time % self.schedule_time] >= read_ports:
return True
# Test the number of total accesses
collection_total_accesses = defaultdict(
int, Counter(collection_writes) + Counter(collection_reads)
)
for access_time in [proc.start_time, *proc.read_times]:
if collection_total_accesses[access_time] >= total_ports:
return True
return False
@staticmethod
def _count_number_of_pes_connected(
processing_elements: list["ProcessingElement"],
collection: "ProcessCollection",
) -> int:
collection_process_names = {proc.name.split(".")[0] for proc in collection}
count = 0
for pe in processing_elements:
if any(
proc.name.split(".")[0] in collection_process_names
for proc in pe.collection
):
count += 1
return count
def _split_ports_graph_color(
self,
read_ports: int,
@@ -1203,12 +1336,10 @@ class ProcessCollection:
assignment: list[ProcessCollection] = []
for next_process in sorted(self):
if next_process.execution_time > self.schedule_time:
# Can not assign process to any cell
raise ValueError(
f"{next_process} has execution time greater than the schedule time"
)
elif next_process.execution_time == self.schedule_time:
# Always assign maximum lifetime process to new cell
assignment.append(
ProcessCollection(
(next_process,),
@@ -1216,7 +1347,6 @@ class ProcessCollection:
cyclic=self._cyclic,
)
)
continue # Continue assigning next process
else:
next_process_stop_time = (
next_process.start_time + next_process.execution_time
Loading