Skip to content
Snippets Groups Projects
Commit fbd2612c authored by Mikael Henriksson's avatar Mikael Henriksson :runner:
Browse files

codegen: VHDL generation for memory based storage

parent 4b997e02
No related branches found
No related tags found
1 merge request!244WIP: VHDL code generation for register and memory based storages
"""
Module for basic VHDL code generation.
"""
# VHDL code generation tab length
VHDL_TAB = r" "
from b_asic.codegen.vhdl_src import architecture, common, entity
"""
Module for code generation of VHDL architectures.
"""
from io import TextIOWrapper
from typing import Dict, Optional, Set, cast
# from b_asic.codegen.vhdl_src import common
from b_asic.codegen import vhdl
from b_asic.codegen.vhdl import VHDL_TAB
from b_asic.process import MemoryVariable, PlainMemoryVariable
from b_asic.resources import ProcessCollection
def write_memory_based_architecture(
f: TextIOWrapper,
assignment: Set[ProcessCollection],
word_length: int,
read_ports: int,
write_ports: int,
total_ports: int,
):
"""
Generate the VHDL architecture for a memory based architecture from a process collection of memory variables.
Parameters
----------
assignment: dictionary
A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process
collection.
If unset, each MemoryVariable will be assigned to a unique cell.
f : TextIOWrapper
File object (or other TextIOWrapper object) to write the architecture onto.
word_length: int
Word length of the memory variable objects.
read_ports:
Number of read ports.
write_ports:
Number of write ports.
total_ports:
Total concurrent memory accesses possible.
"""
# Code settings
mem_depth = len(assignment)
entity_name = "some_name"
architecture_name = "rtl"
schedule_time = next(iter(assignment))._schedule_time
# Write architecture header
f.write(f'architecture {architecture_name} of {entity_name} is\n\n')
#
# Architecture declerative region begin
#
f.write(f'{VHDL_TAB}-- HDL memory description\n')
vhdl.common.write_constant_decl(
f, name='MEM_WL', type='integer', value=word_length, name_pad=12
)
vhdl.common.write_constant_decl(
f, name='MEM_DEPTH', type='integer', value=mem_depth, name_pad=12
)
vhdl.common.write_type_decl(
f, 'mem_type', 'array(0 to MEM_DEPTH-1) of std_logic_vector(MEM_WL-1 downto 0)'
)
vhdl.common.write_signal_decl(f, 'memory', 'mem_type', name_pad=14)
for i in range(read_ports):
vhdl.common.write_signal_decl(
f, f'read_port_{i}', 'std_logic_vector(MEM_WL-1 downto 0)', name_pad=14
)
vhdl.common.write_signal_decl(
f, f'read_adr_{i}', f'integer range 0 to {schedule_time}-1', name_pad=14
)
vhdl.common.write_signal_decl(f, f'read_en_{i}', 'std_logic', name_pad=14)
for i in range(write_ports):
vhdl.common.write_signal_decl(
f, f'write_port_{i}', 'std_logic_vector(MEM_WL-1 downto 0)', name_pad=14
)
vhdl.common.write_signal_decl(
f, f'write_adr_{i}', f'integer range 0 to {schedule_time}-1', name_pad=14
)
vhdl.common.write_signal_decl(f, f'write_en_{i}', 'std_logic', name_pad=14)
# Schedule time counter
f.write('\n')
f.write(f'{VHDL_TAB}-- Schedule counter\n')
vhdl.common.write_signal_decl(
f,
name='schedule_cnt',
type=f'integer range 0 to {schedule_time}-1',
name_pad=14,
)
f.write('\n')
#
# Architecture body begin
#
f.write(f'begin\n\n')
f.write(f'{VHDL_TAB}-- Schedule counter\n')
vhdl.common.write_synchronous_process(
f=f,
name='schedule_cnt_proc',
clk='clk',
indent=len(1 * VHDL_TAB),
body=(
f'{0*VHDL_TAB}if en = \'1\' then\n'
f'{1*VHDL_TAB}if schedule_cnt = {schedule_time}-1 then\n'
f'{2*VHDL_TAB}schedule_cnt <= 0;\n'
f'{1*VHDL_TAB}else\n'
f'{2*VHDL_TAB}schedule_cnt <= schedule_cnt + 1;\n'
f'{1*VHDL_TAB}end if;\n'
f'{0*VHDL_TAB}end if;\n'
),
)
# Infer memory
f.write('\n')
f.write(f'{VHDL_TAB}-- Memory\n')
vhdl.common.write_synchronous_memory(
f=f,
clk='clk',
name=f'mem_{0}_proc',
read_ports={
(f'read_port_{i}', f'read_adr_{i}', f'read_en_{i}')
for i in range(read_ports)
},
write_ports={
(f'write_port_{i}', f'write_adr_{i}', f'write_en_{i}')
for i in range(write_ports)
},
)
f.write(f'\n{VHDL_TAB}-- Memory writes\n')
f.write(f'{VHDL_TAB}process(schedule_cnt)\n')
f.write(f'{VHDL_TAB}begin\n')
f.write(f'{2*VHDL_TAB}case schedule_cnt is\n')
for i, collection in enumerate(assignment):
for mv in collection:
mv = cast(MemoryVariable, mv)
f.write(f'{3*VHDL_TAB}-- {mv!r}\n')
f.write(f'{3*VHDL_TAB}when {mv.start_time} =>\n')
f.write(f'{4*VHDL_TAB}write_adr_0 <= {i};\n')
f.write(f'{4*VHDL_TAB}write_en_0 <= \'1\';\n')
f.write(f'{3*VHDL_TAB}when others =>\n')
f.write(f'{4*VHDL_TAB}write_adr_0 <= 0;\n')
f.write(f'{4*VHDL_TAB}write_en_0 <= \'0\';\n')
f.write(f'{2*VHDL_TAB}end case;\n')
f.write(f'{1*VHDL_TAB}end process;\n')
f.write(f'\n{VHDL_TAB}-- Memory reads\n')
f.write(f'{VHDL_TAB}process(schedule_cnt)\n')
f.write(f'{VHDL_TAB}begin\n')
f.write(f'{2*VHDL_TAB}case schedule_cnt is\n')
for i, collection in enumerate(assignment):
for mv in collection:
mv = cast(PlainMemoryVariable, mv)
f.write(f'{3*VHDL_TAB}-- {mv!r}\n')
for read_time in mv.reads.values():
f.write(
f'{3*VHDL_TAB}when'
f' {(mv.start_time + read_time) % schedule_time} =>\n'
)
f.write(f'{4*VHDL_TAB}read_adr_0 <= {i};\n')
f.write(f'{4*VHDL_TAB}read_en_0 <= \'1\';\n')
f.write(f'{3*VHDL_TAB}when others =>\n')
f.write(f'{4*VHDL_TAB}read_adr_0 <= 0;\n')
f.write(f'{4*VHDL_TAB}read_en_0 <= \'0\';\n')
f.write(f'{2*VHDL_TAB}end case;\n')
f.write(f'{1*VHDL_TAB}end process;\n')
f.write('\n')
f.write(f'end architecture {architecture_name};')
"""
Generation of common VHDL constructs
"""
from datetime import datetime
from io import TextIOWrapper
from typing import Any, Optional, Set, Tuple
from b_asic.codegen.vhdl import VHDL_TAB
def write_b_asic_vhdl_preamble(f: TextIOWrapper):
"""
Write a standard BASIC VHDL preamble comment
Parameters
----------
f : TextIOWrapper
The fileobject to write the header to.
"""
f.write(f'--\n')
f.write(f'-- This code was automatically generated by the B-ASIC toolbox.\n')
f.write(f'-- Code generation timestamp: ({datetime.now()})\n')
f.write(f'-- URL: https://gitlab.liu.se/da/B-ASIC\n')
f.write(f'--\n\n')
def write_ieee_header(
f: TextIOWrapper,
std_logic_1164: bool = True,
numeric_std: bool = True,
):
"""
Write the standard IEEE VHDL use header with includes of std_logic_1164 and numeric_std.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper object to write the IEEE header to.
std_logic_1164 : bool, default: True
Include the std_logic_1164 header.
numeric_std : bool, default: True
Include the numeric_std header.
"""
f.write('library ieee;\n')
if std_logic_1164:
f.write('use ieee.std_logic_1164.all;\n')
if numeric_std:
f.write('use ieee.numeric_std.all;\n')
f.write('\n')
write_signal_decl
def write_signal_decl(
f: TextIOWrapper,
name: str,
type: str,
default_value: Optional[str] = None,
name_pad: Optional[int] = None,
):
"""
Create a VHDL signal declaration: ::
signal {name} : {type} [:= {default_value}];
Parameters
----------
f : TextIOWrapper
The TextIOWrapper object to write the IEEE header to.
name : str
Signal name.
type : str
Signal type.
default_value : str, optional
An optional default value to the signal.
name_pad : int, optional
An optional left padding value applied to the name.
"""
# Spacing of VHDL signals declaration always with a single tab
name_pad = 0 if name_pad is None else name_pad
f.write(f'{VHDL_TAB}signal {name:<{name_pad}} : {type}')
if default_value is not None:
f.write(f' := {default_value}')
f.write(f';\n')
def write_constant_decl(
f: TextIOWrapper,
name: str,
type: str,
value: Any,
name_pad: Optional[int] = None,
type_pad: Optional[int] = None,
):
"""
Write a VHDL constant declaration with a name, a type and a value.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper object to write the constant declaration to.
name : str
Signal name.
type : str
Signal type.
value : anything convertable to str
Default value to the signal.
name_pad : int, optional
An optional left padding value applied to the name.
"""
name_pad = 0 if name_pad is None else name_pad
f.write(f'{VHDL_TAB}constant {name:<{name_pad}} : {type} := {str(value)};\n')
def write_type_decl(
f: TextIOWrapper,
name: str,
alias: str,
):
"""
Write a VHDL type declaration with a name tied to an alias.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper object to write the type declaration to.
name : str
Type name alias.
alias : str
The type to tie the new name to.
"""
f.write(f'{VHDL_TAB}type {name} is {alias};\n')
def write_synchronous_process(
f: TextIOWrapper,
clk: str,
body: str,
indent: Optional[int] = 0,
name: Optional[str] = None,
):
"""
Write a regular VHDL synchronous process with a single clock object in the sensitivity list triggering
a rising edge block by some body of VHDL code.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
body : str
Body of the `if rising_edge(clk) then` block.
indent : Optional[int]
Indent this process block with `indent` columns
name : Optional[str]
An optional name for the process
"""
space = '' if indent is None else ' ' * indent
write_synchronous_process_prologue(f, clk, indent, name)
for line in body.split('\n'):
if len(line):
f.write(f'{space}{2*VHDL_TAB}{line}\n')
write_synchronous_process_epilogue(f, clk, indent, name)
def write_synchronous_process_prologue(
f: TextIOWrapper,
clk: str,
indent: Optional[int] = 0,
name: Optional[str] = None,
):
"""
Write only the prologue of a regular VHDL synchronous process with a single clock object in the sensitivity list
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
indent : Optional[int]
Indent this process block with `indent` columns
name : Optional[str]
An optional name for the process
"""
space = '' if indent is None else ' ' * indent
if name is not None:
f.write(f'{space}{name}: process({clk})\n')
else:
f.write(f'{space}process({clk})\n')
f.write(f'{space}begin\n')
f.write(f'{space}{VHDL_TAB}if rising_edge(clk) then\n')
def write_synchronous_memory(
f: TextIOWrapper,
clk: str,
read_ports: Set[Tuple[str, str, str]],
write_ports: Set[Tuple[str, str, str]],
name: Optional[str] = None,
):
"""
Infer a VHDL synchronous reads and writes.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of clock identifier to the synchronous memory.
read_ports : Set[Tuple[str,str]]
A set of strings used as identifiers for the read ports of the memory.
write_ports : Set[Tuple[str,str,str]]
A set of strings used as identifiers for the write ports of the memory.
name : Optional[str]
An optional name for the memory process.
"""
assert len(read_ports) >= 1
assert len(write_ports) >= 1
write_synchronous_process_prologue(f, clk=clk, name=name, indent=len(VHDL_TAB))
for read_name, address, re in read_ports:
f.write(f'{3*VHDL_TAB}if {re} = \'1\' then\n')
f.write(f'{4*VHDL_TAB}{read_name} <= memory({address});\n')
f.write(f'{3*VHDL_TAB}end if;\n')
for write_name, address, we in write_ports:
f.write(f'{3*VHDL_TAB}if {we} = \'1\' then\n')
f.write(f'{4*VHDL_TAB}{write_name} <= memory({address});\n')
f.write(f'{3*VHDL_TAB}end if;\n')
write_synchronous_process_epilogue(f, clk=clk, name=name, indent=len(VHDL_TAB))
def write_synchronous_process_epilogue(
f: TextIOWrapper,
clk: Optional[str],
indent: Optional[int] = 0,
name: Optional[str] = None,
):
"""
Write only the prologue of a regular VHDL synchronous process with a single clock object in the sensitivity list
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters
----------
f : TextIOWrapper
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
indent : Optional[int]
Indent this process block with `indent` columns
name : Optional[str]
An optional name for the process
"""
_ = clk
space = '' if indent is None else ' ' * indent
f.write(f'{space}{VHDL_TAB}end if;\n')
f.write(f'{space}end process')
if name is not None:
f.write(' ' + name)
f.write(';\n')
"""
Module for code generation of VHDL entity declarations
"""
from io import TextIOWrapper
from typing import Set
from b_asic.codegen.vhdl import VHDL_TAB
from b_asic.port import Port
from b_asic.process import MemoryVariable, PlainMemoryVariable
from b_asic.resources import ProcessCollection
def write_memory_based_architecture(
f: TextIOWrapper, collection: ProcessCollection, word_length: int
):
# Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of (Plain)MemoryVariables"
)
entity_name = 'some_name'
# Write the entity header
f.write(f'entity {entity_name} is\n')
f.write(f'{VHDL_TAB}port(\n')
# Write the clock and reset signal
f.write(f'{2*VHDL_TAB}-- Clock, sycnhronous reset and enable signals\n')
f.write(f'{2*VHDL_TAB}clk : in std_logic;\n')
f.write(f'{2*VHDL_TAB}rst : in std_logic;\n')
f.write(f'{2*VHDL_TAB}en : in std_logic;\n')
f.write(f'\n')
# Write the input port specification
f.write(f'{2*VHDL_TAB}-- Memory port I/O\n')
read_ports: set[Port] = set(sum((mv.read_ports for mv in collection), ())) # type: ignore
for idx, read_port in enumerate(read_ports):
port_name = read_port if isinstance(read_port, int) else read_port.name
port_name = 'p_' + str(port_name) + '_in'
f.write(
f'{2*VHDL_TAB}{port_name} : in std_logic_vector({word_length}-1 downto'
' 0);\n'
)
# Write the output port specification
write_ports: Set[Port] = {mv.write_port for mv in collection} # type: ignore
for idx, write_port in enumerate(write_ports):
port_name = write_port if isinstance(write_port, int) else write_port.name
port_name = 'p_' + str(port_name) + '_out'
f.write(
f'{2*VHDL_TAB}{port_name} : out std_logic_vector({word_length}-1 downto 0)'
)
if idx == len(write_ports) - 1:
f.write('\n')
else:
f.write(';\n')
# Write ending of the port header
f.write(f'{VHDL_TAB});\n')
f.write(f'end entity {entity_name};\n\n')
......@@ -124,6 +124,7 @@ class MemoryVariable(Process):
):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._reads = reads
self._write_port = write_port
super().__init__(
start_time=write_time,
......@@ -131,6 +132,10 @@ class MemoryVariable(Process):
name=name,
)
@property
def reads(self) -> Dict[InputPort, int]:
return self._reads
@property
def life_times(self) -> Tuple[int, ...]:
return self._life_times
......@@ -182,12 +187,17 @@ class PlainMemoryVariable(Process):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._write_port = write_port
self._reads = reads
super().__init__(
start_time=write_time,
execution_time=max(self._life_times),
name=name,
)
@property
def reads(self) -> Dict[int, int]:
return self._reads
@property
def life_times(self) -> Tuple[int, ...]:
return self._life_times
......
......@@ -610,7 +610,7 @@ class ProcessCollection:
def graph_color_cell_assignment(
self,
coloring_strategy: str = "saturation_largest_first",
) -> Dict[int, "ProcessCollection"]:
) -> Set["ProcessCollection"]:
"""
Perform cell assignment of the processes in this collection using graph coloring with networkx.coloring.greedy_color.
Two or more processes can share a single cell if, and only if, they have no overlaping time alive.
......@@ -622,7 +622,7 @@ class ProcessCollection:
Returns
-------
Dict[int, ProcessCollection]
A set of ProcessCollection
"""
cell_assignment: Dict[int, ProcessCollection] = dict()
......@@ -636,7 +636,7 @@ class ProcessCollection:
except:
cell_assignment[cell] = ProcessCollection(set(), self._schedule_time)
cell_assignment[cell].add_process(process)
return cell_assignment
return set(cell_assignment.values())
def left_edge_cell_assignment(self) -> Dict[int, "ProcessCollection"]:
"""
......@@ -680,6 +680,8 @@ class ProcessCollection:
def generate_memory_based_storage_vhdl(
self,
filename: str,
word_length: int,
assignment: Set['ProcessCollection'],
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
......@@ -691,6 +693,14 @@ class ProcessCollection:
----------
filename : str
Filename of output file.
word_length: int
Word length of the memory variable objects.
assignment: set
A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process
collection.
If unset, each MemoryVariable will be assigned to a unique single cell.
read_ports : int, optional
The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be set
......@@ -726,18 +736,75 @@ class ProcessCollection:
for mv in self:
filter_write = lambda p: p.start_time == mv.start_time
filter_read = (
lambda p: (p.start_time + p.execution_time) & self._schedule_time
lambda p: (p.start_time + p.execution_time) % self._schedule_time
== mv.start_time + mv.execution_time % self._schedule_time
)
if len(list(filter(filter_write, self))) > write_ports + 1:
needed_write_ports = len(list(filter(filter_write, self)))
needed_read_ports = len(list(filter(filter_read, self)))
if needed_write_ports > write_ports + 1:
raise ValueError(
f'More than {write_ports} write ports needed to generate HDL for'
' this ProcessCollection'
f'More than {write_ports} write ports needed ({needed_write_ports})'
' to generate HDL for this ProcessCollection'
)
if len(list(filter(filter_read, self))) > read_ports + 1:
if needed_read_ports > read_ports + 1:
raise ValueError(
f'More than {read_ports} read ports needed to generate HDL for this'
' ProcessCollection'
f'More than {read_ports} read ports needed ({needed_read_ports}) to'
' generate HDL for this ProcessCollection'
)
# raise NotImplementedError("Not implemented yet!")
with open(filename, 'w') as f:
from b_asic.codegen import vhdl
vhdl.common.write_b_asic_vhdl_preamble(f)
vhdl.common.write_ieee_header(f)
vhdl.entity.write_memory_based_architecture(
f, collection=self, word_length=word_length
)
vhdl.architecture.write_memory_based_architecture(
f,
assignment=assignment,
word_length=word_length,
read_ports=read_ports,
write_ports=write_ports,
total_ports=total_ports,
)
def generate_register_based_storage_vhdl(
self,
filename: str,
word_length: int,
assignment: Set['ProcessCollection'],
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
):
"""
Generate VHDL code for register based storages of processes based on the Forward-Backward Register Allocation [1].
[1]: K. Parhi: VLSI Digital Signal Processing Systems: Design and Implementation, Ch. 6.3.2
Parameters
----------
filename : str
Filename of output file.
word_length: int
Word length of the memory variable objects.
assignment: set
A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process
collection.
If unset, each MemoryVariable will be assigned to a unique single cell.
read_ports : int, optional
The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be set
and total_ports is assumed to be read_ports + write_ports.
write_ports : int, optional
The number of write ports used when splitting process collection based on
memory variable access. If total ports is unset, this parameter has to be set
and total_ports is assumed to be read_ports + write_ports.
total_ports : int, optional
The total number of ports used when splitting process collection based on
memory variable access.
"""
pass
......@@ -46,7 +46,23 @@ class TestProcessCollectionPlainMemoryVariable:
coloring_strategy='saturation_largest_first'
)
assert len(assignment_left_edge.keys()) == 18
assert len(assignment_graph_color.keys()) == 16
assert len(assignment_graph_color) == 16
def test_generate_vhdl(self):
collection = generate_matrix_transposer(4, min_lifetime=5)
assignment = collection.graph_color_cell_assignment()
_, ax = plt.subplots()
for cell, pc in enumerate(assignment):
pc.plot(ax=ax, row=cell)
# plt.show()
collection.generate_memory_based_storage_vhdl(
"/tmp/wow.vhdl",
assignment=assignment,
word_length=13,
read_ports=1,
write_ports=1,
total_ports=2,
)
# Issue: #175
def test_interleaver_issue175(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment