Test File: test_trapeziod_mesh.py
Back to Test: Systolic Gauss-Jordan.
Source file: test/systolic_gauss_jordan/test_trapeziod_mesh.py
RTL counterpart
What this test checks
This is the direct forward-flow mesh regression. It does not assert the whole reduce trace; instead, it checks the structural array behavior on a checked-in sample case.
The test:
loads the sample case from JSON
builds the staggered top-edge input streams in Python
drives the mesh without reduce mode
compares bottom-row outputs against the Python model cycle by cycle
checks that opcode launch and horizontal propagation occur at the expected times
How to run it
make -C test/systolic_gauss_jordan TEST=trapeziod_mesh
What to inspect while reading it
build_top_input_streams()andsample_streams_at_cycle()usagethe per-cycle bottom-output comparison
the lock/add propagation checks across row
0
Python source
1import cocotb
2from cocotb.clock import Clock
3from cocotb.triggers import FallingEdge, ReadOnly, RisingEdge
4
5from trapeziod_test_utils import (
6 OP_ADD,
7 OP_LOCK,
8 OPCODE_NAMES,
9 TrapezoidModel,
10 build_top_input_streams,
11 get_apply_scope,
12 get_diag_scope,
13 load_sample_case,
14 pack_bits,
15 sample_streams_at_cycle,
16)
17
18
19def first_cycle_with_opcode(trace, key, opcode):
20 for cycle, snapshot in enumerate(trace):
21 if snapshot[key] == opcode:
22 return cycle
23 raise AssertionError(f"Did not observe {OPCODE_NAMES[opcode]} on {key}")
24
25
26@cocotb.test()
27async def trapeziod_mesh_forward_regression(dut):
28 case = load_sample_case()
29 l = len(dut.data_bottom_o)
30 total_cols = len(dut.data_top_i)
31 n = total_cols - l
32
33 assert case["N"] == n
34 assert case["L"] == l
35
36 cocotb.start_soon(Clock(dut.clk, 10, unit="ns").start())
37
38 dut.rst.value = 1
39 dut.en_i.value = 1
40 dut.reduce_i.value = 0
41 dut.data_top_i.value = 0
42
43 for _ in range(2):
44 await RisingEdge(dut.clk)
45
46 dut.rst.value = 0
47
48 model = TrapezoidModel(n=n, l=l)
49 streams = build_top_input_streams(case["A"], case["B"])
50 feed_cycles = max(len(stream) for stream in streams)
51 event_trace = []
52
53 for cycle in range(feed_cycles):
54 bits = sample_streams_at_cycle(streams, cycle)
55 expected_bottom = model.eval_cycle(bits)
56
57 await FallingEdge(dut.clk)
58 dut.reduce_i.value = 0
59 dut.data_top_i.value = pack_bits(bits)
60 await RisingEdge(dut.clk)
61 await ReadOnly()
62
63 got_bottom = [(int(dut.data_bottom_o.value) >> idx) & 1 for idx in range(l)]
64 assert got_bottom == expected_bottom, (
65 f"cycle {cycle}: bottom-row readout mismatch, "
66 f"expected {expected_bottom}, got {got_bottom}"
67 )
68
69 snapshot = {"cycle": cycle, "f00_op_o": int(get_diag_scope(dut, 0).op_o.value)}
70 for col in range(1, total_cols):
71 scope = get_apply_scope(dut, 0, col)
72 snapshot[f"g0{col}_op_i"] = int(scope.op_i.value)
73 snapshot[f"g0{col}_op_o"] = int(scope.op_o.value)
74 event_trace.append(snapshot)
75
76 lock_launch_cycle = first_cycle_with_opcode(event_trace, "f00_op_o", OP_LOCK)
77 add_launch_cycle = first_cycle_with_opcode(event_trace, "f00_op_o", OP_ADD)
78
79 for col in range(1, total_cols):
80 expected_cycle = lock_launch_cycle + (col - 1)
81 assert expected_cycle < len(event_trace), (
82 f"LOCK did not have enough cycles to reach g[0,{col}]"
83 )
84 observed = event_trace[expected_cycle][f"g0{col}_op_i"]
85 assert observed == OP_LOCK, (
86 f"Expected LOCK at g[0,{col}].op_i on cycle {expected_cycle}, "
87 f"got {OPCODE_NAMES.get(observed, observed)}"
88 )
89
90 for col in range(1, total_cols):
91 expected_cycle = add_launch_cycle + (col - 1)
92 assert expected_cycle < len(event_trace), (
93 f"ADD did not have enough cycles to reach g[0,{col}]"
94 )
95 observed = event_trace[expected_cycle][f"g0{col}_op_i"]
96 assert observed == OP_ADD, (
97 f"Expected ADD at g[0,{col}].op_i on cycle {expected_cycle}, "
98 f"got {OPCODE_NAMES.get(observed, observed)}"
99 )