def recv_imu_window( ser: serial.Serial, A: int = 96, samples_per_frame: int = 4, # 48 payload / (6*2) = 4 samples expect_addh: int = 0x00, expect_addl: int = 0x00, expect_chan: int = 0x06, timeout_s: float = 2.0, ) -> List[Tuple[int, int, int, int, int, int]]: """ Receives a full IMU window and returns it as a list of A samples. Each sample is (ax, ay, az, gx, gy, gz) in int16 raw. Frame checks: - ADDH/ADDL/CHAN match - MAGIC == b"TE" - FC sequence (0..frame_count-1) - Consistent FID across frames """ if (PAYLOAD_LEN % (6 * 2)) != 0: raise ValueError("PAYLOAD_LEN is not a multiple of one IMU sample (12 bytes).") frame_count = (A + samples_per_frame - 1) // samples_per_frame out: List[Tuple[int, int, int, int, int, int]] = [None] * A # type: ignore # Try to sync first (best-effort) _sync_to_magic(ser, timeout_s=timeout_s) fid_expected: Optional[int] = None samples_written = 0 for fc_expected in range(frame_count): # Read one full frame frame = _read_exact(ser, FRAME_LEN, timeout_s=timeout_s) addh, addl, chan = frame[0], frame[1], frame[2] magic = frame[3:5] fid = frame[5] fc = frame[6] payload = frame[7:7 + PAYLOAD_LEN] # Validate header if (addh != expect_addh) or (addl != expect_addl) or (chan != expect_chan) or (magic != MAGIC): raise ValueError( f"Header mismatch: ADDH/ADDL/CHAN/MAGIC = " f"{addh:02X}/{addl:02X}/{chan:02X}/{magic!r}" ) # Validate FID continuity if fid_expected is None: fid_expected = fid elif fid != fid_expected: raise ValueError(f"FID mismatch: got {fid}, expected {fid_expected}") # Validate FC ordering if fc != fc_expected: raise ValueError(f"FC mismatch: got {fc}, expected {fc_expected}") # Parse payload: little-endian 6x int16 per sample # Each sample is 12 bytes -> '= A: break ax, ay, az, gx, gy, gz = struct.unpack_from("