Test File: test_trapeziod_mesh.py

Back to Test: Systolic Gauss-Jordan.

Source file: test/systolic_gauss_jordan/test_trapeziod_mesh.py

RTL counterpart

What this test checks

This is the direct forward-flow mesh regression. It does not assert the whole reduce trace; instead, it checks the structural array behavior on a checked-in sample case.

The test:

  • loads the sample case from JSON

  • builds the staggered top-edge input streams in Python

  • drives the mesh without reduce mode

  • compares bottom-row outputs against the Python model cycle by cycle

  • checks that opcode launch and horizontal propagation occur at the expected times

How to run it

make -C test/systolic_gauss_jordan TEST=trapeziod_mesh

What to inspect while reading it

  • build_top_input_streams() and sample_streams_at_cycle() usage

  • the per-cycle bottom-output comparison

  • the lock/add propagation checks across row 0

Python source

 1import cocotb
 2from cocotb.clock import Clock
 3from cocotb.triggers import FallingEdge, ReadOnly, RisingEdge
 4
 5from trapeziod_test_utils import (
 6    OP_ADD,
 7    OP_LOCK,
 8    OPCODE_NAMES,
 9    TrapezoidModel,
10    build_top_input_streams,
11    get_apply_scope,
12    get_diag_scope,
13    load_sample_case,
14    pack_bits,
15    sample_streams_at_cycle,
16)
17
18
19def first_cycle_with_opcode(trace, key, opcode):
20    for cycle, snapshot in enumerate(trace):
21        if snapshot[key] == opcode:
22            return cycle
23    raise AssertionError(f"Did not observe {OPCODE_NAMES[opcode]} on {key}")
24
25
26@cocotb.test()
27async def trapeziod_mesh_forward_regression(dut):
28    case = load_sample_case()
29    l = len(dut.data_bottom_o)
30    total_cols = len(dut.data_top_i)
31    n = total_cols - l
32
33    assert case["N"] == n
34    assert case["L"] == l
35
36    cocotb.start_soon(Clock(dut.clk, 10, unit="ns").start())
37
38    dut.rst.value = 1
39    dut.en_i.value = 1
40    dut.reduce_i.value = 0
41    dut.data_top_i.value = 0
42
43    for _ in range(2):
44        await RisingEdge(dut.clk)
45
46    dut.rst.value = 0
47
48    model = TrapezoidModel(n=n, l=l)
49    streams = build_top_input_streams(case["A"], case["B"])
50    feed_cycles = max(len(stream) for stream in streams)
51    event_trace = []
52
53    for cycle in range(feed_cycles):
54        bits = sample_streams_at_cycle(streams, cycle)
55        expected_bottom = model.eval_cycle(bits)
56
57        await FallingEdge(dut.clk)
58        dut.reduce_i.value = 0
59        dut.data_top_i.value = pack_bits(bits)
60        await RisingEdge(dut.clk)
61        await ReadOnly()
62
63        got_bottom = [(int(dut.data_bottom_o.value) >> idx) & 1 for idx in range(l)]
64        assert got_bottom == expected_bottom, (
65            f"cycle {cycle}: bottom-row readout mismatch, "
66            f"expected {expected_bottom}, got {got_bottom}"
67        )
68
69        snapshot = {"cycle": cycle, "f00_op_o": int(get_diag_scope(dut, 0).op_o.value)}
70        for col in range(1, total_cols):
71            scope = get_apply_scope(dut, 0, col)
72            snapshot[f"g0{col}_op_i"] = int(scope.op_i.value)
73            snapshot[f"g0{col}_op_o"] = int(scope.op_o.value)
74        event_trace.append(snapshot)
75
76    lock_launch_cycle = first_cycle_with_opcode(event_trace, "f00_op_o", OP_LOCK)
77    add_launch_cycle = first_cycle_with_opcode(event_trace, "f00_op_o", OP_ADD)
78
79    for col in range(1, total_cols):
80        expected_cycle = lock_launch_cycle + (col - 1)
81        assert expected_cycle < len(event_trace), (
82            f"LOCK did not have enough cycles to reach g[0,{col}]"
83        )
84        observed = event_trace[expected_cycle][f"g0{col}_op_i"]
85        assert observed == OP_LOCK, (
86            f"Expected LOCK at g[0,{col}].op_i on cycle {expected_cycle}, "
87            f"got {OPCODE_NAMES.get(observed, observed)}"
88        )
89
90    for col in range(1, total_cols):
91        expected_cycle = add_launch_cycle + (col - 1)
92        assert expected_cycle < len(event_trace), (
93            f"ADD did not have enough cycles to reach g[0,{col}]"
94        )
95        observed = event_trace[expected_cycle][f"g0{col}_op_i"]
96        assert observed == OP_ADD, (
97            f"Expected ADD at g[0,{col}].op_i on cycle {expected_cycle}, "
98            f"got {OPCODE_NAMES.get(observed, observed)}"
99        )