Test File: test_trapeziod_full_trace_reduce.py
Back to Test: Systolic Gauss-Jordan.
Source file: test/systolic_gauss_jordan/test_trapeziod_full_trace_reduce.py
RTL counterpart
What this test checks
This is the most readable whole-mesh test in the lifted suite. It drives the sample case through the mesh, launches reduce at the configured cycle, checks timing expectations row by row, and writes a structured JSON trace that records internal mesh state over time.
The test:
loads the sample case and derives
M,N, andLdrives the mesh with the staggered top-edge stream
pulses reduce at
reduce_pulse_cycle = Mcaptures
data_bottom_row, diagonal data out, diagonal reduce mode, and the register matrix for every cyclewrites the full trace JSON artifact
checks that the reduce pulse reaches each diagonal row at the expected cycle
How to run it
make -C test/systolic_gauss_jordan TEST=trapeziod_full_trace_reduce
What to inspect while reading it
reduce_pulse_cycleandlast_cyclesetupthe per-cycle trace payload fields
the final assertions on diagonal reduce arrival
Python source
1import json
2import os
3from pathlib import Path
4
5import cocotb
6from cocotb.clock import Clock
7from cocotb.triggers import FallingEdge, ReadOnly, RisingEdge
8
9from trapeziod_test_utils import (
10 build_top_input_streams,
11 collect_diag_data_out_row,
12 collect_diag_reduce_inputs,
13 collect_register_matrix,
14 format_reduce_mode_row,
15 format_register_matrix,
16 load_sample_case,
17 pack_bits,
18 sample_streams_at_cycle,
19)
20
21def collect_bottom_data(dut, cycle, first_valid_cycle):
22 if cycle < first_valid_cycle:
23 return None
24
25 packed = int(dut.data_bottom_o.value)
26 return [(packed >> idx) & 1 for idx in range(len(dut.data_bottom_o))]
27
28
29def expected_reduce_cycle(reduce_pulse_cycle, row):
30 if row == 0:
31 return reduce_pulse_cycle
32 return reduce_pulse_cycle + ((2 * row) - 1)
33
34
35@cocotb.test()
36async def trapeziod_full_trace_reduce(dut):
37 trace_json = Path(os.environ["FULL_TRACE_REDUCE_JSON"])
38 case = load_sample_case()
39
40 total_cols = len(dut.data_top_i)
41 l = len(dut.data_bottom_o)
42 n = total_cols - l
43 m = case["M"]
44
45 assert case["N"] == n, f"JSON N={case['N']} does not match DUT N={n}"
46 assert case["L"] == l, f"JSON L={case['L']} does not match DUT L={l}"
47
48 streams = build_top_input_streams(case["A"], case["B"])
49 feed_cycles = max(len(stream) for stream in streams)
50 reduce_pulse_cycle = m
51 last_cycle = (3 * n) + m - 1
52 first_bottom_cycle = n - 1
53
54 cocotb.start_soon(Clock(dut.clk, 10, unit="ns").start())
55
56 dut.rst.value = 1
57 dut.en_i.value = 1
58 dut.reduce_i.value = 0
59 dut.data_top_i.value = 0
60
61 for _ in range(2):
62 await RisingEdge(dut.clk)
63
64 dut.rst.value = 0
65
66 payload = {
67 "name": case["name"],
68 "matrix_json": case["path"],
69 "M": m,
70 "N": n,
71 "L": l,
72 "feed_cycles": feed_cycles,
73 "last_input_cycle": feed_cycles - 1,
74 "reduce_pulse_cycle": reduce_pulse_cycle,
75 "last_cycle": last_cycle,
76 "cycles": {},
77 }
78 reduce_mode_by_cycle = {}
79
80 for cycle in range(last_cycle + 1):
81 top_bits = sample_streams_at_cycle(streams, cycle)
82 reduce_bit = 1 if cycle == reduce_pulse_cycle else 0
83
84 await FallingEdge(dut.clk)
85 dut.reduce_i.value = reduce_bit
86 dut.data_top_i.value = pack_bits(top_bits)
87 await RisingEdge(dut.clk)
88 await ReadOnly()
89
90 bottom_data = collect_bottom_data(dut, cycle, first_bottom_cycle)
91 register_matrix = collect_register_matrix(dut, n=n, total_cols=total_cols)
92 register_matrix_readable = format_register_matrix(register_matrix)
93 diag_data_out = collect_diag_data_out_row(dut, n=n)
94 diag_reduce_inputs = collect_diag_reduce_inputs(dut, n=n)
95 reduce_mode_row = format_reduce_mode_row(diag_reduce_inputs)
96 reduce_mode_by_cycle[cycle] = reduce_mode_row
97
98 cocotb.log.info(
99 "t=%02d reduce_i=%d data_bottom=%s f_data_out=%s f_reduce_mode=%s",
100 cycle,
101 reduce_bit,
102 bottom_data,
103 diag_data_out,
104 reduce_mode_row,
105 )
106 for row_idx, row_values in enumerate(register_matrix):
107 cocotb.log.info("t=%02d row %d: %s", cycle, row_idx, row_values)
108
109 payload["cycles"][f"t={cycle}"] = {
110 "reduce_i": reduce_bit,
111 "data_in_row": top_bits,
112 "data_bottom_row": bottom_data,
113 "diag_f_data_out_row": diag_data_out,
114 "f_reduce_mode_row": reduce_mode_row,
115 "register_matrix": register_matrix_readable,
116 }
117
118 for row in range(n):
119 expected_cycle = expected_reduce_cycle(reduce_pulse_cycle, row)
120 assert reduce_mode_by_cycle[expected_cycle][row] == 0, (
121 f"Expected reduce to reach f[{row},{row}] at t={expected_cycle}"
122 )
123 if expected_cycle > 0:
124 assert reduce_mode_by_cycle[expected_cycle - 1][row] == 1, (
125 f"Expected reduce to be inactive at f[{row},{row}] before t={expected_cycle}"
126 )
127 if expected_cycle < last_cycle:
128 assert reduce_mode_by_cycle[expected_cycle + 1][row] == 1, (
129 f"Expected reduce pulse at f[{row},{row}] to last one cycle"
130 )
131
132 trace_json.parent.mkdir(parents=True, exist_ok=True)
133 trace_json.write_text(json.dumps(payload, indent=2) + "\n")