Skip to content
Snippets Groups Projects
Commit 044b26e2 authored by Mikael Henriksson's avatar Mikael Henriksson :runner:
Browse files

codegen: VHDL generation for register based storage

parent fbd2612c
No related branches found
No related tags found
No related merge requests found
Pipeline #91429 passed
This commit is part of merge request !244. Comments created here will be created in the context of that merge request.
vunit_out
streaming_matrix_transposition_memory_*x*.vhdl
streaming_matrix_transposition_register_*x*.vhdl
work
1076.1-2017
--
-- Generic streaming transposition testbench using VUnit
-- Author: Mikael Henriksson (2023)
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_tester is
generic(
WL : integer;
ROWS : integer;
COLS : integer
);
port(
clk, rst, en : out std_logic;
input : out std_logic_vector(WL-1 downto 0);
output : in std_logic_vector(WL-1 downto 0);
done : out boolean
);
end entity streaming_matrix_transposition_tester;
architecture behav of streaming_matrix_transposition_tester is
signal clk_sig : std_logic;
begin
-- Clock (100 MHz), enable and reset generation.
clk <= clk_sig;
rst <= '1', '0' after 40 ns;
en <= '0', '1' after 100 ns;
process begin
clk_sig <= '0';
loop
wait for 5 ns; clk_sig <= not(clk_sig);
end loop;
end process;
-- Input generation
input_gen_proc: process begin
wait until en = '1';
for i in 0 to ROWS*COLS-1 loop
wait until clk = '0';
input <= std_logic_vector(to_unsigned(i, input'length));
end loop;
wait;
end process;
-- Output testing
output_test_proc: process begin
wait until en = '1';
wait until output = std_logic_vector(to_unsigned(0, output'length));
for col in 0 to COLS-1 loop
for row in 0 to ROWS-1 loop
wait until clk = '0';
check(output = std_logic_vector(to_unsigned(row*COLS + col, output'length)));
end loop;
end loop;
done <= true;
wait;
end process;
end architecture behav;
--
-- 3x3 memory based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_memory_3x3_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_memory_3x3_tb;
architecture behav of streaming_matrix_transposition_memory_3x3_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_memory_3x3
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>3, COLS=>3) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 7x7 memory based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_memory_7x7_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_memory_7x7_tb;
architecture behav of streaming_matrix_transposition_memory_7x7_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_memory_7x7
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>7, COLS=>7) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 7x7 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_7x7_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_7x7_tb;
architecture behav of streaming_matrix_transposition_register_7x7_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_7x7
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>7, COLS=>7) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 5x5 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_5x5_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_5x5_tb;
architecture behav of streaming_matrix_transposition_register_5x5_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_5x5
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>5, COLS=>5) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 4x4 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_4x4_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_4x4_tb;
architecture behav of streaming_matrix_transposition_register_4x4_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_4x4
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>4, COLS=>4) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 3x3 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_3x3_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_3x3_tb;
architecture behav of streaming_matrix_transposition_register_3x3_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_3x3
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>3, COLS=>3) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 2x2 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_2x2_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_2x2_tb;
architecture behav of streaming_matrix_transposition_register_2x2_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_2x2
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>2, COLS=>2) port map(clk, rst, en, input, output, done);
end architecture behav;
#!/usr/bin/env python3
from vunit import VUnit
vu = VUnit.from_argv()
lib = vu.add_library("lib")
lib.add_source_files(
[
"*.vhdl",
]
)
lib.set_compile_option("modelsim.vcom_flags", ["-2008"])
vu.main()
...@@ -5,4 +5,4 @@ Module for basic VHDL code generation. ...@@ -5,4 +5,4 @@ Module for basic VHDL code generation.
# VHDL code generation tab length # VHDL code generation tab length
VHDL_TAB = r" " VHDL_TAB = r" "
from b_asic.codegen.vhdl_src import architecture, common, entity from b_asic.codegen.vhdl import architecture, common, entity
...@@ -8,12 +8,17 @@ from typing import Dict, Optional, Set, cast ...@@ -8,12 +8,17 @@ from typing import Dict, Optional, Set, cast
from b_asic.codegen import vhdl from b_asic.codegen import vhdl
from b_asic.codegen.vhdl import VHDL_TAB from b_asic.codegen.vhdl import VHDL_TAB
from b_asic.process import MemoryVariable, PlainMemoryVariable from b_asic.process import MemoryVariable, PlainMemoryVariable
from b_asic.resources import ProcessCollection from b_asic.resources import (
ProcessCollection,
_ForwardBackwardEntry,
_ForwardBackwardTable,
)
def write_memory_based_architecture( def write_memory_based_storage(
f: TextIOWrapper, f: TextIOWrapper,
assignment: Set[ProcessCollection], assignment: Set[ProcessCollection],
entity_name: str,
word_length: int, word_length: int,
read_ports: int, read_ports: int,
write_ports: int, write_ports: int,
...@@ -24,7 +29,7 @@ def write_memory_based_architecture( ...@@ -24,7 +29,7 @@ def write_memory_based_architecture(
Parameters Parameters
---------- ----------
assignment: dictionary assignment : dict
A possible cell assignment to use when generating the memory based storage. A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process corresponds to the cell to assign all MemoryVariables in corresponding process
...@@ -32,19 +37,18 @@ def write_memory_based_architecture( ...@@ -32,19 +37,18 @@ def write_memory_based_architecture(
If unset, each MemoryVariable will be assigned to a unique cell. If unset, each MemoryVariable will be assigned to a unique cell.
f : TextIOWrapper f : TextIOWrapper
File object (or other TextIOWrapper object) to write the architecture onto. File object (or other TextIOWrapper object) to write the architecture onto.
word_length: int word_length : int
Word length of the memory variable objects. Word length of the memory variable objects.
read_ports: read_ports : int
Number of read ports. Number of read ports.
write_ports: write_ports : int
Number of write ports. Number of write ports.
total_ports: total_ports : int
Total concurrent memory accesses possible. Total concurrent memory accesses possible.
""" """
# Code settings # Code settings
mem_depth = len(assignment) mem_depth = len(assignment)
entity_name = "some_name"
architecture_name = "rtl" architecture_name = "rtl"
schedule_time = next(iter(assignment))._schedule_time schedule_time = next(iter(assignment))._schedule_time
...@@ -117,7 +121,7 @@ def write_memory_based_architecture( ...@@ -117,7 +121,7 @@ def write_memory_based_architecture(
# Infer memory # Infer memory
f.write('\n') f.write('\n')
f.write(f'{VHDL_TAB}-- Memory\n') f.write(f'{VHDL_TAB}-- Memory\n')
vhdl.common.write_synchronous_memory( vhdl.common.write_asynchronous_read_memory(
f=f, f=f,
clk='clk', clk='clk',
name=f'mem_{0}_proc', name=f'mem_{0}_proc',
...@@ -139,10 +143,11 @@ def write_memory_based_architecture( ...@@ -139,10 +143,11 @@ def write_memory_based_architecture(
for i, collection in enumerate(assignment): for i, collection in enumerate(assignment):
for mv in collection: for mv in collection:
mv = cast(MemoryVariable, mv) mv = cast(MemoryVariable, mv)
f.write(f'{3*VHDL_TAB}-- {mv!r}\n') if mv.execution_time:
f.write(f'{3*VHDL_TAB}when {mv.start_time} =>\n') f.write(f'{3*VHDL_TAB}-- {mv!r}\n')
f.write(f'{4*VHDL_TAB}write_adr_0 <= {i};\n') f.write(f'{3*VHDL_TAB}when {mv.start_time} =>\n')
f.write(f'{4*VHDL_TAB}write_en_0 <= \'1\';\n') f.write(f'{4*VHDL_TAB}write_adr_0 <= {i};\n')
f.write(f'{4*VHDL_TAB}write_en_0 <= \'1\';\n')
f.write(f'{3*VHDL_TAB}when others =>\n') f.write(f'{3*VHDL_TAB}when others =>\n')
f.write(f'{4*VHDL_TAB}write_adr_0 <= 0;\n') f.write(f'{4*VHDL_TAB}write_adr_0 <= 0;\n')
f.write(f'{4*VHDL_TAB}write_en_0 <= \'0\';\n') f.write(f'{4*VHDL_TAB}write_en_0 <= \'0\';\n')
...@@ -170,8 +175,175 @@ def write_memory_based_architecture( ...@@ -170,8 +175,175 @@ def write_memory_based_architecture(
f.write(f'{4*VHDL_TAB}read_adr_0 <= 0;\n') f.write(f'{4*VHDL_TAB}read_adr_0 <= 0;\n')
f.write(f'{4*VHDL_TAB}read_en_0 <= \'0\';\n') f.write(f'{4*VHDL_TAB}read_en_0 <= \'0\';\n')
f.write(f'{2*VHDL_TAB}end case;\n') f.write(f'{2*VHDL_TAB}end case;\n')
f.write(f'{1*VHDL_TAB}end process;\n\n')
f.write(f'{1*VHDL_TAB}end process;\n') f.write(f'{1*VHDL_TAB}-- Input and output assignment\n')
f.write(f'{1*VHDL_TAB}write_port_0 <= p_0_in;\n')
p_zero_exec = filter(
lambda p: p.execution_time == 0, (p for pc in assignment for p in pc)
)
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
indent=len(VHDL_TAB),
name='output_reg_proc',
)
f.write(f'{3*VHDL_TAB}case schedule_cnt is\n')
for p in p_zero_exec:
f.write(f'{4*VHDL_TAB}when {p.start_time} => p_0_out <= p_0_in;\n')
f.write(f'{4*VHDL_TAB}when others => p_0_out <= read_port_0;\n')
f.write(f'{3*VHDL_TAB}end case;\n')
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
indent=len(VHDL_TAB),
name='output_reg_proc',
)
# if p_zero_exec:
# f.write(f'{1*VHDL_TAB}p_0_out <=\n')
# for p in filter(lambda p: p.execution_time==0, (p for pc in assignment for p in pc)):
# f.write(f'{2*VHDL_TAB}p_0_in when schedule_cnt = {(p.start_time+1)%schedule_time} else\n')
# f.write(f'{2*VHDL_TAB}read_port_0;\n')
# else:
# f.write(f'{1*VHDL_TAB}p_0_out <= read_port_0;\n')
f.write('\n')
f.write(f'end architecture {architecture_name};')
def write_register_based_storage(
f: TextIOWrapper,
forward_backward_table: _ForwardBackwardTable,
entity_name: str,
word_length: int,
read_ports: int,
write_ports: int,
total_ports: int,
):
architecture_name = "rtl"
schedule_time = len(forward_backward_table)
reg_cnt = len(forward_backward_table[0].regs)
#
# Architecture declerative region begin
#
# Write architecture header
f.write(f'architecture {architecture_name} of {entity_name} is\n\n')
# Schedule time counter
f.write(f'{VHDL_TAB}-- Schedule counter\n')
vhdl.common.write_signal_decl(
f,
name='schedule_cnt',
type=f'integer range 0 to {schedule_time}-1',
name_pad=14,
default_value='0',
)
f.write('\n') f.write('\n')
# Shift register
f.write(f'{VHDL_TAB}-- Shift register\n')
vhdl.common.write_type_decl(
f,
name='shift_reg_type',
alias=f'array(0 to {reg_cnt}-1) of std_logic_vector(WL-1 downto 0)',
)
vhdl.common.write_signal_decl(
f,
name='shift_reg',
type='shift_reg_type',
name_pad=14,
)
#
# Architecture body begin
#
f.write(f'begin\n\n')
f.write(f'{VHDL_TAB}-- Schedule counter\n')
vhdl.common.write_synchronous_process(
f=f,
name='schedule_cnt_proc',
clk='clk',
indent=len(1 * VHDL_TAB),
body=(
f'{0*VHDL_TAB}if en = \'1\' then\n'
f'{1*VHDL_TAB}if schedule_cnt = {schedule_time}-1 then\n'
f'{2*VHDL_TAB}schedule_cnt <= 0;\n'
f'{1*VHDL_TAB}else\n'
f'{2*VHDL_TAB}schedule_cnt <= schedule_cnt + 1;\n'
f'{1*VHDL_TAB}end if;\n'
f'{0*VHDL_TAB}end if;\n'
),
)
f.write(f'\n{VHDL_TAB}-- Multiplexers for shift register\n')
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='shift_reg_proc',
indent=len(VHDL_TAB),
)
# Default for all register
f.write(f'{3*VHDL_TAB}-- Default case\n')
f.write(f'{3*VHDL_TAB}shift_reg(0) <= p_0_in;\n')
for reg_idx in range(1, reg_cnt):
f.write(f'{3*VHDL_TAB}shift_reg({reg_idx}) <= shift_reg({reg_idx-1});\n')
f.write(f'{3*VHDL_TAB}case schedule_cnt is\n')
for i, entry in enumerate(forward_backward_table):
if entry.back_edge_from:
f.write(f'{4*VHDL_TAB} when {schedule_time-1 if (i-1)<0 else (i-1)} =>\n')
for dst, src in entry.back_edge_from.items():
f.write(f'{5*VHDL_TAB} shift_reg({dst}) <= shift_reg({src});\n')
f.write(f'{4*VHDL_TAB}when others => null;\n')
f.write(f'{3*VHDL_TAB}end case;\n')
# for i in range(5):
# forward_backward_table[
# f.write(f'{3*VHDL_TAB}abc\n')
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='shift_reg_proc',
indent=len(VHDL_TAB),
)
f.write(f'\n{VHDL_TAB}-- Output muliplexer\n')
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='out_mux_proc',
indent=len(VHDL_TAB),
)
f.write(f'{3*VHDL_TAB}-- Default case\n')
f.write(f'{3*VHDL_TAB}p_0_out <= shift_reg({reg_cnt-1});\n')
f.write(f'{3*VHDL_TAB}case schedule_cnt is\n')
for i, entry in enumerate(forward_backward_table):
if entry.outputs_from is not None:
if entry.outputs_from != reg_cnt - 1:
f.write(f'{4*VHDL_TAB} when {i} =>\n')
if entry.outputs_from < 0:
f.write(f'{5*VHDL_TAB} p_0_out <= p_{-1-entry.outputs_from}_in;\n')
else:
f.write(
f'{5*VHDL_TAB} p_0_out <= shift_reg({entry.outputs_from});\n'
)
f.write(f'{4*VHDL_TAB}when others => null;\n')
f.write(f'{3*VHDL_TAB}end case;\n')
# for i in range(5):
# forward_backward_table[
# f.write(f'{3*VHDL_TAB}abc\n')
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='out_mux_proc',
indent=len(VHDL_TAB),
)
f.write(f'end architecture {architecture_name};') f.write(f'end architecture {architecture_name};')
...@@ -11,12 +11,12 @@ from b_asic.codegen.vhdl import VHDL_TAB ...@@ -11,12 +11,12 @@ from b_asic.codegen.vhdl import VHDL_TAB
def write_b_asic_vhdl_preamble(f: TextIOWrapper): def write_b_asic_vhdl_preamble(f: TextIOWrapper):
""" """
Write a standard BASIC VHDL preamble comment Write a standard BASIC VHDL preamble comment.
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The fileobject to write the header to. The file object to write the header to.
""" """
f.write(f'--\n') f.write(f'--\n')
f.write(f'-- This code was automatically generated by the B-ASIC toolbox.\n') f.write(f'-- This code was automatically generated by the B-ASIC toolbox.\n')
...@@ -35,7 +35,7 @@ def write_ieee_header( ...@@ -35,7 +35,7 @@ def write_ieee_header(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the IEEE header to. The TextIOWrapper object to write the IEEE header to.
std_logic_1164 : bool, default: True std_logic_1164 : bool, default: True
Include the std_logic_1164 header. Include the std_logic_1164 header.
...@@ -48,7 +48,6 @@ def write_ieee_header( ...@@ -48,7 +48,6 @@ def write_ieee_header(
if numeric_std: if numeric_std:
f.write('use ieee.numeric_std.all;\n') f.write('use ieee.numeric_std.all;\n')
f.write('\n') f.write('\n')
write_signal_decl
def write_signal_decl( def write_signal_decl(
...@@ -65,7 +64,7 @@ def write_signal_decl( ...@@ -65,7 +64,7 @@ def write_signal_decl(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the IEEE header to. The TextIOWrapper object to write the IEEE header to.
name : str name : str
Signal name. Signal name.
...@@ -97,7 +96,7 @@ def write_constant_decl( ...@@ -97,7 +96,7 @@ def write_constant_decl(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the constant declaration to. The TextIOWrapper object to write the constant declaration to.
name : str name : str
Signal name. Signal name.
...@@ -122,7 +121,7 @@ def write_type_decl( ...@@ -122,7 +121,7 @@ def write_type_decl(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the type declaration to. The TextIOWrapper object to write the type declaration to.
name : str name : str
Type name alias. Type name alias.
...@@ -145,7 +144,7 @@ def write_synchronous_process( ...@@ -145,7 +144,7 @@ def write_synchronous_process(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto. The TextIOWrapper to write the VHDL code onto.
clk : str clk : str
Name of the clock. Name of the clock.
...@@ -177,7 +176,7 @@ def write_synchronous_process_prologue( ...@@ -177,7 +176,7 @@ def write_synchronous_process_prologue(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto. The TextIOWrapper to write the VHDL code onto.
clk : str clk : str
Name of the clock. Name of the clock.
...@@ -195,6 +194,37 @@ def write_synchronous_process_prologue( ...@@ -195,6 +194,37 @@ def write_synchronous_process_prologue(
f.write(f'{space}{VHDL_TAB}if rising_edge(clk) then\n') f.write(f'{space}{VHDL_TAB}if rising_edge(clk) then\n')
def write_synchronous_process_epilogue(
f: TextIOWrapper,
clk: Optional[str],
indent: Optional[int] = 0,
name: Optional[str] = None,
):
"""
Write only the epilogue of a regular VHDL synchronous process with a single clock object in the sensitivity list
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
indent : Optional[int]
Indent this process block with `indent` columns
name : Optional[str]
An optional name for the process
"""
_ = clk
space = '' if indent is None else ' ' * indent
f.write(f'{space}{VHDL_TAB}end if;\n')
f.write(f'{space}end process')
if name is not None:
f.write(' ' + name)
f.write(';\n')
def write_synchronous_memory( def write_synchronous_memory(
f: TextIOWrapper, f: TextIOWrapper,
clk: str, clk: str,
...@@ -207,7 +237,7 @@ def write_synchronous_memory( ...@@ -207,7 +237,7 @@ def write_synchronous_memory(
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto. The TextIOWrapper to write the VHDL code onto.
clk : str clk : str
Name of clock identifier to the synchronous memory. Name of clock identifier to the synchronous memory.
...@@ -227,37 +257,41 @@ def write_synchronous_memory( ...@@ -227,37 +257,41 @@ def write_synchronous_memory(
f.write(f'{3*VHDL_TAB}end if;\n') f.write(f'{3*VHDL_TAB}end if;\n')
for write_name, address, we in write_ports: for write_name, address, we in write_ports:
f.write(f'{3*VHDL_TAB}if {we} = \'1\' then\n') f.write(f'{3*VHDL_TAB}if {we} = \'1\' then\n')
f.write(f'{4*VHDL_TAB}{write_name} <= memory({address});\n') f.write(f'{4*VHDL_TAB}memory({address}) <= {write_name};\n')
f.write(f'{3*VHDL_TAB}end if;\n') f.write(f'{3*VHDL_TAB}end if;\n')
write_synchronous_process_epilogue(f, clk=clk, name=name, indent=len(VHDL_TAB)) write_synchronous_process_epilogue(f, clk=clk, name=name, indent=len(VHDL_TAB))
def write_synchronous_process_epilogue( def write_asynchronous_read_memory(
f: TextIOWrapper, f: TextIOWrapper,
clk: Optional[str], clk: str,
indent: Optional[int] = 0, read_ports: Set[Tuple[str, str, str]],
write_ports: Set[Tuple[str, str, str]],
name: Optional[str] = None, name: Optional[str] = None,
): ):
""" """
Write only the prologue of a regular VHDL synchronous process with a single clock object in the sensitivity list Infer a VHDL synchronous reads and writes.
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters Parameters
---------- ----------
f : TextIOWrapper f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto. The TextIOWrapper to write the VHDL code onto.
clk : str clk : str
Name of the clock. Name of clock identifier to the synchronous memory.
indent : Optional[int] read_ports : Set[Tuple[str,str]]
Indent this process block with `indent` columns A set of strings used as identifiers for the read ports of the memory.
write_ports : Set[Tuple[str,str,str]]
A set of strings used as identifiers for the write ports of the memory.
name : Optional[str] name : Optional[str]
An optional name for the process An optional name for the memory process.
""" """
_ = clk assert len(read_ports) >= 1
space = '' if indent is None else ' ' * indent assert len(write_ports) >= 1
f.write(f'{space}{VHDL_TAB}end if;\n') write_synchronous_process_prologue(f, clk=clk, name=name, indent=len(VHDL_TAB))
f.write(f'{space}end process') for write_name, address, we in write_ports:
if name is not None: f.write(f'{3*VHDL_TAB}if {we} = \'1\' then\n')
f.write(' ' + name) f.write(f'{4*VHDL_TAB}memory({address}) <= {write_name};\n')
f.write(';\n') f.write(f'{3*VHDL_TAB}end if;\n')
write_synchronous_process_epilogue(f, clk=clk, name=name, indent=len(VHDL_TAB))
for read_name, address, _ in read_ports:
f.write(f'{1*VHDL_TAB}{read_name} <= memory({address});\n')
...@@ -10,8 +10,8 @@ from b_asic.process import MemoryVariable, PlainMemoryVariable ...@@ -10,8 +10,8 @@ from b_asic.process import MemoryVariable, PlainMemoryVariable
from b_asic.resources import ProcessCollection from b_asic.resources import ProcessCollection
def write_memory_based_architecture( def write_memory_based_storage(
f: TextIOWrapper, collection: ProcessCollection, word_length: int f: TextIOWrapper, entity_name: str, collection: ProcessCollection, word_length: int
): ):
# Check that this is a ProcessCollection of (Plain)MemoryVariables # Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all( is_memory_variable = all(
...@@ -25,14 +25,18 @@ def write_memory_based_architecture( ...@@ -25,14 +25,18 @@ def write_memory_based_architecture(
"HDL can only be generated for ProcessCollection of (Plain)MemoryVariables" "HDL can only be generated for ProcessCollection of (Plain)MemoryVariables"
) )
entity_name = 'some_name' entity_name = entity_name
# Write the entity header # Write the entity header
f.write(f'entity {entity_name} is\n') f.write(f'entity {entity_name} is\n')
f.write(f'{VHDL_TAB}generic(\n')
f.write(f'{2*VHDL_TAB}-- Data word length\n')
f.write(f'{2*VHDL_TAB}WL : integer := {word_length}\n')
f.write(f'{VHDL_TAB});\n')
f.write(f'{VHDL_TAB}port(\n') f.write(f'{VHDL_TAB}port(\n')
# Write the clock and reset signal # Write the clock and reset signal
f.write(f'{2*VHDL_TAB}-- Clock, sycnhronous reset and enable signals\n') f.write(f'{2*VHDL_TAB}-- Clock, synchronous reset and enable signals\n')
f.write(f'{2*VHDL_TAB}clk : in std_logic;\n') f.write(f'{2*VHDL_TAB}clk : in std_logic;\n')
f.write(f'{2*VHDL_TAB}rst : in std_logic;\n') f.write(f'{2*VHDL_TAB}rst : in std_logic;\n')
f.write(f'{2*VHDL_TAB}en : in std_logic;\n') f.write(f'{2*VHDL_TAB}en : in std_logic;\n')
...@@ -44,19 +48,14 @@ def write_memory_based_architecture( ...@@ -44,19 +48,14 @@ def write_memory_based_architecture(
for idx, read_port in enumerate(read_ports): for idx, read_port in enumerate(read_ports):
port_name = read_port if isinstance(read_port, int) else read_port.name port_name = read_port if isinstance(read_port, int) else read_port.name
port_name = 'p_' + str(port_name) + '_in' port_name = 'p_' + str(port_name) + '_in'
f.write( f.write(f'{2*VHDL_TAB}{port_name} : in std_logic_vector(WL-1 downto 0);\n')
f'{2*VHDL_TAB}{port_name} : in std_logic_vector({word_length}-1 downto'
' 0);\n'
)
# Write the output port specification # Write the output port specification
write_ports: Set[Port] = {mv.write_port for mv in collection} # type: ignore write_ports: Set[Port] = {mv.write_port for mv in collection} # type: ignore
for idx, write_port in enumerate(write_ports): for idx, write_port in enumerate(write_ports):
port_name = write_port if isinstance(write_port, int) else write_port.name port_name = write_port if isinstance(write_port, int) else write_port.name
port_name = 'p_' + str(port_name) + '_out' port_name = 'p_' + str(port_name) + '_out'
f.write( f.write(f'{2*VHDL_TAB}{port_name} : out std_logic_vector(WL-1 downto 0)')
f'{2*VHDL_TAB}{port_name} : out std_logic_vector({word_length}-1 downto 0)'
)
if idx == len(write_ports) - 1: if idx == len(write_ports) - 1:
f.write('\n') f.write('\n')
else: else:
...@@ -65,3 +64,9 @@ def write_memory_based_architecture( ...@@ -65,3 +64,9 @@ def write_memory_based_architecture(
# Write ending of the port header # Write ending of the port header
f.write(f'{VHDL_TAB});\n') f.write(f'{VHDL_TAB});\n')
f.write(f'end entity {entity_name};\n\n') f.write(f'end entity {entity_name};\n\n')
def write_register_based_storage(
f: TextIOWrapper, entity_name: str, collection: ProcessCollection, word_length: int
):
write_memory_based_storage(f, entity_name, collection, word_length)
import io import io
import re import re
from functools import reduce
from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
...@@ -40,7 +41,7 @@ def _sanitize_port_option( ...@@ -40,7 +41,7 @@ def _sanitize_port_option(
total_ports: Optional[int] = None, total_ports: Optional[int] = None,
) -> Tuple[int, int, int]: ) -> Tuple[int, int, int]:
""" """
General port sanitization function, to test if a port specification makes sense. General port sanitization function used to test if a port specification makes sense.
Raises ValueError if the port specification is in-proper. Raises ValueError if the port specification is in-proper.
Parameters Parameters
...@@ -148,6 +149,246 @@ def draw_exclusion_graph_coloring( ...@@ -148,6 +149,246 @@ def draw_exclusion_graph_coloring(
) )
class _ForwardBackwardEntry:
def __init__(
self,
inputs: Optional[List[Process]] = None,
outputs: Optional[List[Process]] = None,
regs: Optional[List[Optional[Process]]] = None,
back_edge_to: Optional[Dict[int, int]] = None,
back_edge_from: Optional[Dict[int, int]] = None,
outputs_from: Optional[int] = None,
):
"""
Single entry in a _ForwardBackwardTable. Aggregate type of input, output and list of registers.
Parameters
----------
inputs : List[Process], optional
input
outputs : List[Process], optional
output
regs : List[Optional[Process]], optional
regs
back_edge_to : dict, optional
back_edge_from : List[Optional[Process]], optional
outputs_from : int, optional
"""
self.inputs: List[Process] = [] if inputs is None else inputs
self.outputs: List[Process] = [] if outputs is None else outputs
self.regs: List[Optional[Process]] = [] if regs is None else regs
self.back_edge_to: Dict[int, int] = {} if back_edge_to is None else back_edge_to
self.back_edge_from: Dict[int, int] = (
{} if back_edge_from is None else back_edge_from
)
self.outputs_from = outputs_from
class _ForwardBackwardTable:
def __init__(self, collection: 'ProcessCollection'):
"""
Forward-Backward allocation table for ProcessCollections. This structure implements the forward-backward
register allocation algorithm, which is used to generate hardware from MemoryVariables in a ProcessCollection.
Parameters
----------
collection : ProcessCollection
ProcessCollection to apply forward-backward allocation on
"""
# Generate an alive variable list
self._collection = collection
self._live_variables: List[int] = [0] * collection._schedule_time
for mv in self._collection:
stop_time = mv.start_time + mv.execution_time
for alive_time in range(mv.start_time, stop_time):
self._live_variables[alive_time % collection._schedule_time] += 1
# First, create an empty forward-backward table with the right dimensions
self.table: List[_ForwardBackwardEntry] = []
for _ in range(collection.schedule_time):
entry = _ForwardBackwardEntry()
# https://github.com/microsoft/pyright/issues/1073
for _ in range(max(self._live_variables)):
entry.regs.append(None)
self.table.append(entry)
# Insert all processes (one per time-slot) to the table input
# TODO: "Input each variable at the time step corresponding to the beginning of its lifetime. If multiple
# variables are input in a given cycle, theses are allocated to multple registers such that the variable
# with the longest lifetime is allocated to the inital register and the other variables are allocated to
# consecutive registers in decreasing order of lifetime." -- K. Parhi
for mv in collection:
self.table[mv.start_time].inputs.append(mv)
if mv.execution_time:
self.table[(mv.start_time + 1) % collection.schedule_time].regs[0] = mv
else:
self.table[mv.start_time].outputs.append(mv)
self.table[mv.start_time].outputs_from = -1
# Forward-backward allocation
forward = True
while not self._forward_backward_is_complete():
if forward:
self._do_forward_allocation()
else:
self._do_single_backward_allocation()
forward = not (forward)
def _forward_backward_is_complete(self) -> bool:
s = {proc for e in self.table for proc in e.outputs}
return len(self._collection._collection - s) == 0
def _do_forward_allocation(self):
"""
Forward all Processes as far as possible in the register chain. Processes are forwarded until they reach their
end time (at which they are added to the output list), or until they reach the end of the register chain.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
# Note that two passes of the forward allocation need to be done, since variables may loop around the schedule
# cycle boundary.
for _ in range(2):
for time, entry in enumerate(self.table):
for reg_idx, reg in enumerate(entry.regs):
if reg is not None:
reg_end_time = (reg.start_time + reg.execution_time) % rows
if reg_end_time == time:
if reg not in self.table[time].outputs:
self.table[time].outputs.append(reg)
self.table[time].outputs_from = reg_idx
elif reg_idx != cols - 1:
next_row = (time + 1) % rows
next_col = reg_idx + 1
if self.table[next_row].regs[next_col] not in (None, reg):
cell = self.table[next_row].regs[next_col]
raise ValueError(
f'Can\'t forward allocate {reg} in row={time},'
f' col={reg_idx} to next_row={next_row},'
f' next_col={next_col} (cell contains: {cell})'
)
else:
self.table[(time + 1) % rows].regs[reg_idx + 1] = reg
def _do_single_backward_allocation(self):
"""
Perform backward allocation of Processes in the allocation table.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
outputs = {out for e in self.table for out in e.outputs}
#
# Pass #1: Find any (one) non-dead variable from the last register and try to backward allocate it to a
# previous register where it is not blocking an open path. This heuristic helps minimize forward allocation
# moves later.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None and (
nreg_idx == 0 or entry.regs[nreg_idx - 1] is not None
):
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx
next_entry.back_edge_from[nreg_idx] = cols - 1
return
#
# Pass #2: Backward allocate the first non-dead variable from the last registers to an empty register.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None:
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx
return
# All passes failed, raise exception...
raise ValueError(
f"Can't backward allocate any variable. This should not happen."
)
def __getitem__(self, key):
return self.table[key]
def __iter__(self):
yield from self.table
def __len__(self):
return len(self.table)
def __str__(self):
# Text width of input and output column
lst_w = lambda proc_lst: reduce(lambda n, p: n + len(str(p)) + 1, proc_lst, 0)
input_col_w = max(5, max(lst_w(pl.inputs) for pl in self.table) + 1)
output_col_w = max(5, max(lst_w(pl.outputs) for pl in self.table) + 1)
# Text width of register columns
reg_col_w = 0
for entry in self.table:
for reg in entry.regs:
reg_col_w = max(len(str(reg)), reg_col_w)
reg_col_w = max(4, reg_col_w + 2)
# Header row of the string
res = f' T |{"In":^{input_col_w}}|'
for i in range(max(self._live_variables)):
reg = f'R{i}'
res += f'{reg:^{reg_col_w}}|'
res += f'{"Out":^{output_col_w}}|'
res += '\n'
res += (
6 + input_col_w + (reg_col_w + 1) * max(self._live_variables) + output_col_w
) * '-' + '\n'
for time, entry in enumerate(self.table):
# Time
res += f'{time:^3}| '
# Input column
inputs_str = ''
for input in entry.inputs:
inputs_str += input.name + ','
if inputs_str:
inputs_str = inputs_str[:-1]
res += f'{inputs_str:^{input_col_w-1}}|'
# Register columns
GREEN_BACKGROUND_ANSI = "\u001b[42m"
BROWN_BACKGROUND_ANSI = "\u001b[43m"
RESET_BACKGROUND_ANSI = "\033[0m"
for reg_idx, reg in enumerate(entry.regs):
if reg is None:
res += " " * reg_col_w + "|"
else:
if reg_idx in entry.back_edge_to:
res += f'{GREEN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
elif reg_idx in entry.back_edge_from:
res += f'{BROWN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
else:
res += f'{reg.name:^{reg_col_w}}' + "|"
# Output column
outputs_str = ''
for output in entry.outputs:
outputs_str += output.name + ','
if outputs_str:
outputs_str = outputs_str[:-1]
if entry.outputs_from is not None:
outputs_str += f"({entry.outputs_from})"
res += f'{outputs_str:^{output_col_w}}|'
res += '\n'
return res
class ProcessCollection: class ProcessCollection:
""" """
Collection of one or more processes Collection of one or more processes
...@@ -173,9 +414,13 @@ class ProcessCollection: ...@@ -173,9 +414,13 @@ class ProcessCollection:
self._cyclic = cyclic self._cyclic = cyclic
@property @property
def collection(self): def collection(self) -> Set[Process]:
return self._collection return self._collection
@property
def schedule_time(self) -> int:
return self._schedule_time
def __len__(self): def __len__(self):
return len(self._collection) return len(self._collection)
...@@ -229,7 +474,7 @@ class ProcessCollection: ...@@ -229,7 +474,7 @@ class ProcessCollection:
Returns Returns
------- -------
ax: Associated Matplotlib Axes (or array of Axes) object ax : Associated Matplotlib Axes (or array of Axes) object
""" """
# Set up the Axes object # Set up the Axes object
...@@ -680,6 +925,7 @@ class ProcessCollection: ...@@ -680,6 +925,7 @@ class ProcessCollection:
def generate_memory_based_storage_vhdl( def generate_memory_based_storage_vhdl(
self, self,
filename: str, filename: str,
entity_name: str,
word_length: int, word_length: int,
assignment: Set['ProcessCollection'], assignment: Set['ProcessCollection'],
read_ports: Optional[int] = None, read_ports: Optional[int] = None,
...@@ -693,9 +939,11 @@ class ProcessCollection: ...@@ -693,9 +939,11 @@ class ProcessCollection:
---------- ----------
filename : str filename : str
Filename of output file. Filename of output file.
word_length: int entity_name : str
Name used for the VHDL entity.
word_length : int
Word length of the memory variable objects. Word length of the memory variable objects.
assignment: set assignment : set
A possible cell assignment to use when generating the memory based storage. A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process corresponds to the cell to assign all MemoryVariables in corresponding process
...@@ -713,7 +961,6 @@ class ProcessCollection: ...@@ -713,7 +961,6 @@ class ProcessCollection:
The total number of ports used when splitting process collection based on The total number of ports used when splitting process collection based on
memory variable access. memory variable access.
""" """
# Check that this is a ProcessCollection of (Plain)MemoryVariables # Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all( is_memory_variable = all(
isinstance(process, MemoryVariable) for process in self._collection isinstance(process, MemoryVariable) for process in self._collection
...@@ -732,6 +979,15 @@ class ProcessCollection: ...@@ -732,6 +979,15 @@ class ProcessCollection:
read_ports, write_ports, total_ports read_ports, write_ports, total_ports
) )
# Make sure the provided assignment (Set[ProcessCollection]) only
# contains memory variables from this (self).
for collection in assignment:
for mv in collection:
if mv not in self:
raise ValueError(
f'{mv.__repr__()} is not part of {self.__repr__()}.'
)
# Make sure that concurrent reads/writes do not surpass the port setting # Make sure that concurrent reads/writes do not surpass the port setting
for mv in self: for mv in self:
filter_write = lambda p: p.start_time == mv.start_time filter_write = lambda p: p.start_time == mv.start_time
...@@ -757,12 +1013,13 @@ class ProcessCollection: ...@@ -757,12 +1013,13 @@ class ProcessCollection:
vhdl.common.write_b_asic_vhdl_preamble(f) vhdl.common.write_b_asic_vhdl_preamble(f)
vhdl.common.write_ieee_header(f) vhdl.common.write_ieee_header(f)
vhdl.entity.write_memory_based_architecture( vhdl.entity.write_memory_based_storage(
f, collection=self, word_length=word_length f, entity_name=entity_name, collection=self, word_length=word_length
) )
vhdl.architecture.write_memory_based_architecture( vhdl.architecture.write_memory_based_storage(
f, f,
assignment=assignment, assignment=assignment,
entity_name=entity_name,
word_length=word_length, word_length=word_length,
read_ports=read_ports, read_ports=read_ports,
write_ports=write_ports, write_ports=write_ports,
...@@ -773,13 +1030,13 @@ class ProcessCollection: ...@@ -773,13 +1030,13 @@ class ProcessCollection:
self, self,
filename: str, filename: str,
word_length: int, word_length: int,
assignment: Set['ProcessCollection'], entity_name: str,
read_ports: Optional[int] = None, read_ports: Optional[int] = None,
write_ports: Optional[int] = None, write_ports: Optional[int] = None,
total_ports: Optional[int] = None, total_ports: Optional[int] = None,
): ):
""" """
Generate VHDL code for register based storages of processes based on the Forward-Backward Register Allocation [1]. Generate VHDL code for register based storages of processes based on Forward-Backward Register Allocation [1].
[1]: K. Parhi: VLSI Digital Signal Processing Systems: Design and Implementation, Ch. 6.3.2 [1]: K. Parhi: VLSI Digital Signal Processing Systems: Design and Implementation, Ch. 6.3.2
...@@ -787,14 +1044,10 @@ class ProcessCollection: ...@@ -787,14 +1044,10 @@ class ProcessCollection:
---------- ----------
filename : str filename : str
Filename of output file. Filename of output file.
word_length: int word_length : int
Word length of the memory variable objects. Word length of the memory variable objects.
assignment: set entity_name : str
A possible cell assignment to use when generating the memory based storage. Name used for the VHDL entity.
The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process
collection.
If unset, each MemoryVariable will be assigned to a unique single cell.
read_ports : int, optional read_ports : int, optional
The number of read ports used when splitting process collection based on The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be set memory variable access. If total ports in unset, this parameter has to be set
...@@ -807,4 +1060,41 @@ class ProcessCollection: ...@@ -807,4 +1060,41 @@ class ProcessCollection:
The total number of ports used when splitting process collection based on The total number of ports used when splitting process collection based on
memory variable access. memory variable access.
""" """
pass # Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in self._collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in self._collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of"
" (Plain)MemoryVariables"
)
# Sanitize port settings
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Create the forward-backward table
forward_backward_table = _ForwardBackwardTable(self)
with open(filename, 'w') as f:
from b_asic.codegen import vhdl
vhdl.common.write_b_asic_vhdl_preamble(f)
vhdl.common.write_ieee_header(f)
vhdl.entity.write_register_based_storage(
f, entity_name=entity_name, collection=self, word_length=word_length
)
vhdl.architecture.write_register_based_storage(
f,
forward_backward_table=forward_backward_table,
entity_name=entity_name,
word_length=word_length,
read_ports=read_ports,
write_ports=write_ports,
total_ports=total_ports,
)
.. _codegen:
B-ASIC Code Generation
**********************
Code generation using the B-ASIC toolbox.
Codegen
=======
.. toctree::
:maxdepth: 1
vhdl.rst
***********************
``b_asic.codegen.vhdl``
***********************
.. automodule:: b_asic.codegen.vhdl.common
:members:
:undoc-members:
.. automodule:: b_asic.codegen.vhdl.entity
:members:
:undoc-members:
.. automodule:: b_asic.codegen.vhdl.architecture
:members:
:undoc-members:
...@@ -31,3 +31,4 @@ Table of Contents ...@@ -31,3 +31,4 @@ Table of Contents
gui_utils gui_utils
examples/index examples/index
research research
codegen/index
...@@ -21,7 +21,7 @@ class TestProcessCollectionPlainMemoryVariable: ...@@ -21,7 +21,7 @@ class TestProcessCollectionPlainMemoryVariable:
@pytest.mark.mpl_image_compare(style='mpl20') @pytest.mark.mpl_image_compare(style='mpl20')
def test_draw_matrix_transposer_4(self): def test_draw_matrix_transposer_4(self):
fig, ax = plt.subplots() fig, ax = plt.subplots()
generate_matrix_transposer(4).plot(ax=ax) generate_matrix_transposer(4).plot(ax=ax) # type: ignore
return fig return fig
def test_split_memory_variable(self, simple_collection: ProcessCollection): def test_split_memory_variable(self, simple_collection: ProcessCollection):
...@@ -48,21 +48,32 @@ class TestProcessCollectionPlainMemoryVariable: ...@@ -48,21 +48,32 @@ class TestProcessCollectionPlainMemoryVariable:
assert len(assignment_left_edge.keys()) == 18 assert len(assignment_left_edge.keys()) == 18
assert len(assignment_graph_color) == 16 assert len(assignment_graph_color) == 16
def test_generate_vhdl(self): def test_generate_memory_based_vhdl(self):
collection = generate_matrix_transposer(4, min_lifetime=5) for rows in [2, 3, 4, 5, 7]:
assignment = collection.graph_color_cell_assignment() collection = generate_matrix_transposer(rows, min_lifetime=0)
_, ax = plt.subplots() assignment = collection.graph_color_cell_assignment()
for cell, pc in enumerate(assignment): collection.generate_memory_based_storage_vhdl(
pc.plot(ax=ax, row=cell) filename=f'b_asic/codegen/testbench/streaming_matrix_transposition_memory_{rows}x{rows}.vhdl',
# plt.show() entity_name=f'streaming_matrix_transposition_memory_{rows}x{rows}',
collection.generate_memory_based_storage_vhdl( assignment=assignment,
"/tmp/wow.vhdl", word_length=16,
assignment=assignment, read_ports=1,
word_length=13, write_ports=1,
read_ports=1, total_ports=2,
write_ports=1, )
total_ports=2,
) def test_generate_register_based_vhdl(self):
for rows in [2, 3, 4, 5, 7]:
generate_matrix_transposer(
rows, min_lifetime=1
).generate_register_based_storage_vhdl(
filename=f'b_asic/codegen/testbench/streaming_matrix_transposition_register_{rows}x{rows}.vhdl',
entity_name=f'streaming_matrix_transposition_register_{rows}x{rows}',
word_length=16,
read_ports=1,
write_ports=1,
total_ports=2,
)
# Issue: #175 # Issue: #175
def test_interleaver_issue175(self): def test_interleaver_issue175(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment