def _call_hls_flow(node, node_dir):
"""
Synthesize the module using HLS
"""
if self.github_ci:
return 0
try:
import torch_mlir
except:
print("TORCH-MLIR is not imported")
if "torch_mlir" not in sys.modules:
raise RuntimeError(f"TORCH_MLIR is required for synthesis.")
# ----------------------------------
# Torch-MLIR
# ----------------------------------
arg_count = len(node.all_input_nodes)
if arg_count == 1:
x = torch.randn(
node.meta["mase"].parameters["common"]["args"]["data_in"]["size"]
)
else:
x = []
for i in range(0, arg_count):
x.append(
torch.randn(
node.meta["mase"].parameters["common"]["args"][f"data_in_{i}"][
"size"
]
)
)
x = tuple(x)
try:
module = torch_mlir.compile(
node.meta["mase"].module, x, output_type="linalg-on-tensors"
)
except:
logger.error(node)
return 0
mlir_dir = os.path.join(node_dir, f"{node.name}.linalg.mlir")
with open(mlir_dir, "w", encoding="utf-8") as outf:
outf.write(str(module))
logger.debug(f"MLIR of module {node.name} successfully written into {mlir_dir}")
assert os.path.isfile(mlir_dir), "Linalg MLIR generation failed."
# ----------------------------------
# MLIR-Lowering
# ----------------------------------
lowered_dir = os.path.join(node_dir, f"{node.name}.affine.mlir")
node_name = vf(node.name)
# Lower Linalg MLIR to Affine MLIR
cmd = [
"mlir-opt",
mlir_dir,
"--linalg-bufferize",
"--convert-linalg-to-affine-loops",
"--canonicalize",
"-o",
lowered_dir,
]
# if self.to_debug:
# cmd += ["--debug"]
result = execute_cli(cmd, log_output=self.to_debug)
assert os.path.isfile(lowered_dir), "Affine MLIR generation failed."
logger.debug(
f"MLIR Affine code of module {node.name} successfully written into {lowered_dir}"
)
mlir_dir = lowered_dir
lowered_dir = os.path.join(node_dir, f"{node_name}.mase.mlir")
hls_dir = os.path.join(node_dir, f"{node_name}.cpp")
# Transform Affine MLIR for hardware generation and emit HLS code
hls_param = _get_hls_parameters(node)
cmd = [
"mase-opt",
mlir_dir,
f"--preprocess-func=func-name={node_name}",
"--canonicalize",
f"--emit-hls=file-name={hls_dir} hls-param={hls_param}",
"-o",
lowered_dir,
]
# if self.to_debug:
# cmd += ["--debug"]
result = execute_cli(cmd, log_output=self.to_debug)
assert os.path.isfile(hls_dir), "HLS code generation failed."
logger.debug(f"HLS code of module {node.name} successfully written into {hls_dir}")
# Emit tcl for Vitis HLS
hls_tcl = f"""
open_project -reset {node_name}
set_top {node_name}
add_files {node_name}.cpp
open_solution -reset "solution1"
set_part {self.target}
create_clock -period 4 -name default
config_bind -effort high
config_compile -pipeline_loops 1
config_interface -clock_enable
csynth_design
# export_design -flow syn -rtl vhdl -format ip_catalog
"""
hls_tcl_dir = os.path.join(node_dir, f"{node_name}.tcl")
with open(hls_tcl_dir, "w", encoding="utf-8") as outf:
outf.write(hls_tcl)
logger.debug(
f"HLS tcl of module {node.name} successfully written into {hls_tcl_dir}"
)
# Format HLS code so it is more readable
cmd = [
"clang-format",
"-i",
hls_dir,
]
result = execute_cli(cmd, log_output=self.to_debug)
assert not result, f"HLS code is invalid: {node_name}"
# Call Vitis HLS for synthesis
vitis_hls = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"..",
"scripts",
"run-vitis-hls.sh",
)
)
assert os.path.isfile(
vitis_hls
), f"Vitis HLS not found. Please make sure if {vitis_hls} exists."
cmd = [
"bash",
vitis_hls,
hls_tcl_dir,
]
if not self.github_ci:
result = execute_cli(cmd, log_output=self.to_debug, cwd=node_dir)
if result:
logger.error(f"Vitis HLS synthesis failed. {node.name}")
else:
logger.debug(f"Hardware of module {node.name} successfully generated by HLS")
return result
def _emit_hls_component(node, queue):
"""
Emit HLS component using MLIR
"""
logger.debug(f"Synthesizing {node.name} using HLS")
rtl_dir = os.path.join(self.project_dir, "hardware", "rtl")
emit_parameters_in_rom_hls(node, rtl_dir)
# Clean the HLS directory
hls_dir = os.path.join(self.project_dir, "hardware", "hls")
if not os.path.exists(hls_dir):
os.mkdir(hls_dir)
node_dir = os.path.join(hls_dir, node.name)
_create_new_dir(node_dir)
result = self._call_hls_flow(node, node_dir)
queue.put(result)
return result
def _emit_hls_components(nodes, parallel=False):
"""
Run HLS in parallel
"""
hls_count = len(nodes)
jobs = [None] * hls_count
queue = Queue(hls_count)
if parallel:
for i, node in enumerate(nodes):
jobs[i] = Process(target=self._emit_hls_component, args=(node, queue))
jobs[i].start()
for job in jobs:
job.join()
err = 0
for _ in range(hls_count):
err += queue.get()
else:
err = 0
for i, node in enumerate(nodes):
err += self._emit_hls_component(node, queue)
if err:
logger.error(f"HLS generation finished. {err} errors.")
else:
logger.info(f"HLS components generated. {err} errors.")
assert not err