Test File: test_trapeziod_full_trace_reduce.py

Back to Test: Systolic Gauss-Jordan.

Source file: test/systolic_gauss_jordan/test_trapeziod_full_trace_reduce.py

RTL counterpart

What this test checks

This is the most readable whole-mesh test in the lifted suite. It drives the sample case through the mesh, launches reduce at the configured cycle, checks timing expectations row by row, and writes a structured JSON trace that records internal mesh state over time.

The test:

  • loads the sample case and derives M, N, and L

  • drives the mesh with the staggered top-edge stream

  • pulses reduce at reduce_pulse_cycle = M

  • captures data_bottom_row, diagonal data out, diagonal reduce mode, and the register matrix for every cycle

  • writes the full trace JSON artifact

  • checks that the reduce pulse reaches each diagonal row at the expected cycle

How to run it

make -C test/systolic_gauss_jordan TEST=trapeziod_full_trace_reduce

What to inspect while reading it

  • reduce_pulse_cycle and last_cycle setup

  • the per-cycle trace payload fields

  • the final assertions on diagonal reduce arrival

Python source

  1import json
  2import os
  3from pathlib import Path
  4
  5import cocotb
  6from cocotb.clock import Clock
  7from cocotb.triggers import FallingEdge, ReadOnly, RisingEdge
  8
  9from trapeziod_test_utils import (
 10    build_top_input_streams,
 11    collect_diag_data_out_row,
 12    collect_diag_reduce_inputs,
 13    collect_register_matrix,
 14    format_reduce_mode_row,
 15    format_register_matrix,
 16    load_sample_case,
 17    pack_bits,
 18    sample_streams_at_cycle,
 19)
 20
 21def collect_bottom_data(dut, cycle, first_valid_cycle):
 22    if cycle < first_valid_cycle:
 23        return None
 24
 25    packed = int(dut.data_bottom_o.value)
 26    return [(packed >> idx) & 1 for idx in range(len(dut.data_bottom_o))]
 27
 28
 29def expected_reduce_cycle(reduce_pulse_cycle, row):
 30    if row == 0:
 31        return reduce_pulse_cycle
 32    return reduce_pulse_cycle + ((2 * row) - 1)
 33
 34
 35@cocotb.test()
 36async def trapeziod_full_trace_reduce(dut):
 37    trace_json = Path(os.environ["FULL_TRACE_REDUCE_JSON"])
 38    case = load_sample_case()
 39
 40    total_cols = len(dut.data_top_i)
 41    l = len(dut.data_bottom_o)
 42    n = total_cols - l
 43    m = case["M"]
 44
 45    assert case["N"] == n, f"JSON N={case['N']} does not match DUT N={n}"
 46    assert case["L"] == l, f"JSON L={case['L']} does not match DUT L={l}"
 47
 48    streams = build_top_input_streams(case["A"], case["B"])
 49    feed_cycles = max(len(stream) for stream in streams)
 50    reduce_pulse_cycle = m
 51    last_cycle = (3 * n) +  m - 1
 52    first_bottom_cycle = n - 1
 53
 54    cocotb.start_soon(Clock(dut.clk, 10, unit="ns").start())
 55
 56    dut.rst.value = 1
 57    dut.en_i.value = 1
 58    dut.reduce_i.value = 0
 59    dut.data_top_i.value = 0
 60
 61    for _ in range(2):
 62        await RisingEdge(dut.clk)
 63
 64    dut.rst.value = 0
 65
 66    payload = {
 67        "name": case["name"],
 68        "matrix_json": case["path"],
 69        "M": m,
 70        "N": n,
 71        "L": l,
 72        "feed_cycles": feed_cycles,
 73        "last_input_cycle": feed_cycles - 1,
 74        "reduce_pulse_cycle": reduce_pulse_cycle,
 75        "last_cycle": last_cycle,
 76        "cycles": {},
 77    }
 78    reduce_mode_by_cycle = {}
 79
 80    for cycle in range(last_cycle + 1):
 81        top_bits = sample_streams_at_cycle(streams, cycle)
 82        reduce_bit = 1 if cycle == reduce_pulse_cycle else 0
 83
 84        await FallingEdge(dut.clk)
 85        dut.reduce_i.value = reduce_bit
 86        dut.data_top_i.value = pack_bits(top_bits)
 87        await RisingEdge(dut.clk)
 88        await ReadOnly()
 89
 90        bottom_data = collect_bottom_data(dut, cycle, first_bottom_cycle)
 91        register_matrix = collect_register_matrix(dut, n=n, total_cols=total_cols)
 92        register_matrix_readable = format_register_matrix(register_matrix)
 93        diag_data_out = collect_diag_data_out_row(dut, n=n)
 94        diag_reduce_inputs = collect_diag_reduce_inputs(dut, n=n)
 95        reduce_mode_row = format_reduce_mode_row(diag_reduce_inputs)
 96        reduce_mode_by_cycle[cycle] = reduce_mode_row
 97
 98        cocotb.log.info(
 99            "t=%02d reduce_i=%d data_bottom=%s f_data_out=%s f_reduce_mode=%s",
100            cycle,
101            reduce_bit,
102            bottom_data,
103            diag_data_out,
104            reduce_mode_row,
105        )
106        for row_idx, row_values in enumerate(register_matrix):
107            cocotb.log.info("t=%02d row %d: %s", cycle, row_idx, row_values)
108
109        payload["cycles"][f"t={cycle}"] = {
110            "reduce_i": reduce_bit,
111            "data_in_row": top_bits,
112            "data_bottom_row": bottom_data,
113            "diag_f_data_out_row": diag_data_out,
114            "f_reduce_mode_row": reduce_mode_row,
115            "register_matrix": register_matrix_readable,
116        }
117
118    for row in range(n):
119        expected_cycle = expected_reduce_cycle(reduce_pulse_cycle, row)
120        assert reduce_mode_by_cycle[expected_cycle][row] == 0, (
121            f"Expected reduce to reach f[{row},{row}] at t={expected_cycle}"
122        )
123        if expected_cycle > 0:
124            assert reduce_mode_by_cycle[expected_cycle - 1][row] == 1, (
125                f"Expected reduce to be inactive at f[{row},{row}] before t={expected_cycle}"
126            )
127        if expected_cycle < last_cycle:
128            assert reduce_mode_by_cycle[expected_cycle + 1][row] == 1, (
129                f"Expected reduce pulse at f[{row},{row}] to last one cycle"
130            )
131
132    trace_json.parent.mkdir(parents=True, exist_ok=True)
133    trace_json.write_text(json.dumps(payload, indent=2) + "\n")