♻️ refactor: consolidate RULER test files and document root cause

- test_ruler.py: add --fresh-llm, --sample-indices, --json-output options
- test_ruler.py: consolidate test_ruler_single_sample.py, test_ruler_sequential.py, test_ruler_samples.py
- docs: update chunked offload issue with root cause (state leakage confirmed)
- docs: add single-sample test results showing 100% accuracy for niah_single_1

Deleted redundant test files:
- tests/test_ruler_single_sample.py
- tests/test_ruler_sequential.py
- tests/test_ruler_samples.py

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Zijie Tian
2026-01-20 23:41:17 +08:00
parent 512e1e5401
commit 1ab4676396
2 changed files with 268 additions and 51 deletions

View File

@@ -1,12 +1,54 @@
# RULER 32K Chunked Offload Accuracy Issue # RULER 32K Chunked Offload Accuracy Issue
**Status**: 🟡 IMPROVED (Last Updated: 2026-01-20) **Status**: 🟢 ROOT CAUSE IDENTIFIED (Last Updated: 2026-01-20)
**Branch**: `tzj/minference` **Branch**: `tzj/minference`
**Severity**: MEDIUM - 4-slot config improves accuracy but issues remain **Severity**: MEDIUM - State leakage between consecutive requests identified
--- ---
## Problem ## 🎯 Root Cause Confirmed
**连续请求间的状态泄露 (State Leakage Between Consecutive Requests)**
### 关键证据
| 测试方式 | niah_single_1 通过率 | 说明 |
|---------|---------------------|------|
| **批量测试** (同一 LLM 实例连续处理多个请求) | ~80% | 有约 20% 错误 |
| **单样本测试** (每个请求重新初始化 LLM) | **100%** | 完全正确 |
### 单样本测试完整结果 (2026-01-20)
使用 6 个 GPU 并行测试,每个样本独立执行(重新初始化 LLM
| Task | 测试数 | 通过 | 失败 | 通过率 | 失败样本 |
|------|--------|------|------|--------|----------|
| niah_single_1 | 100 | 100 | 0 | **100%** | (无) |
| niah_multikey_1 | ~96 | ~92 | ~4 | **~96%** | 少量 |
| niah_multikey_2 | 100 | 91 | 9 | **91%** | 2, 12, 19, 50, 66, 85, 86, 89, 98 |
| niah_multikey_3 | 100 | 91 | 9 | **91%** | 11, 18, 23, 35, 41, 47, 53, 86, 93 |
### 结论
1. **Chunked attention 算法本身正确** - niah_single_1 单样本测试 100% 通过
2. **Multikey 任务的 ~9% 失败是模型能力问题** - 模型检索到错误的 key-value 对,不是 KV cache 问题
3. **批量测试的 20% 错误率是状态泄露** - 连续请求间某些状态未正确重置
### 待修复
需要调查以下组件的状态重置机制:
- [ ] KV cache 清理
- [ ] Offload engine 状态残留
- [ ] Ring buffer slot 状态重置
- [ ] Decode buffer 跨请求隔离
---
## 历史问题记录
以下是原始问题分析,保留作为参考。
### Problem (Original)
When running RULER benchmark with 32K context length using the chunked offload mechanism in `tzj/minference` branch, accuracy degradation is observed compared to the `xattn_stride8` baseline. When running RULER benchmark with 32K context length using the chunked offload mechanism in `tzj/minference` branch, accuracy degradation is observed compared to the `xattn_stride8` baseline.
@@ -565,6 +607,56 @@ def _should_use_chunked_offload(self, seqs, is_prefill):
--- ---
## Multikey 任务失败分析 (单样本测试)
### 失败样本特征
单样本测试中 multikey 任务的失败**不是**状态泄露,而是**模型检索能力问题**。
#### 错误类型
| 类型 | 示例 | 说明 |
|------|------|------|
| **检索错误 key** | Expected `5833597`, Got `8617381` | 返回了上下文中另一个 key 的 value |
| **UUID 检索错误** | Expected `c73ed342-...`, Got `1d28b88b-...` | 返回了错误 key 对应的 UUID |
#### multikey_2 失败样本详情 (单样本测试)
| Sample | Expected | Got | 分析 |
|--------|----------|-----|------|
| 2 | `1535573` | `8651665` | 错误 key |
| 12 | `4641400` | `9390530` | 错误 key |
| 19 | `8591874` | `3853628` | 错误 key |
| 50 | `2318630` | `7780552` | 错误 key |
| 66 | `1926587` | `9249734` | 错误 key |
| 85 | `1253265` | `3263480` | 错误 key |
| 86 | `7772887` | `3762547` | 错误 key |
| 89 | `2266721` | `5873220` | 错误 key |
| 98 | (未记录) | (未记录) | - |
#### multikey_3 失败样本详情 (单样本测试)
| Sample | Expected | Got | 分析 |
|--------|----------|-----|------|
| 11 | `c73ed342-6523-...` | `1d28b88b-b6a8-...` | 错误 key 的 UUID |
| 18 | `87b8a762-1d1f-...` | `429a6676-5295-...` | 错误 key 的 UUID |
| 23 | `ed344bfe-983f-...` | `aec43163-061a-...` | 错误 key 的 UUID |
| 35 | `ac8a317b-a6bb-...` | `d2f22889-5b72-...` | 错误 key 的 UUID |
| 41 | `7842feb5-e758-...` | `fc8e724e-418d-...` | 错误 key 的 UUID |
| 47 | `7c0f7fd2-237e-...` | `5fb71d15-4675-...` | 错误 key 的 UUID |
| 53 | `bccd56fa-8fba-...` | `373cc0cc-6ab7-...` | 错误 key 的 UUID |
| 86 | `68c49603-1d17-...` | `aef58e2e-9e99-...` | 错误 key 的 UUID |
| 93 | `74651292-5664-...` | `4546dd56-fe88-...` | 错误 key 的 UUID |
### 关键发现
1. **格式正确**: 失败样本的输出格式完全正确7位数字或UUID
2. **合法 value**: 输出的是上下文中存在的另一个 key-value 对的 value
3. **确定性失败**: 同一样本多次测试返回相同的错误值
4. **模型能力边界**: 这是多 key 检索任务的模型能力上限,~91% 准确率符合预期
---
## Comparison with Working Baseline ## Comparison with Working Baseline
### xattn_stride8 (Working) ### xattn_stride8 (Working)
@@ -573,21 +665,40 @@ def _should_use_chunked_offload(self, seqs, is_prefill):
- **Error Rate**: ~8% (expected RULER baseline) - **Error Rate**: ~8% (expected RULER baseline)
- **Samples**: 100 samples per task - **Samples**: 100 samples per task
### Chunked Offload (Broken) ### Chunked Offload - 批量测试 (Broken)
- **Branch**: `tzj/minference` - **Branch**: `tzj/minference`
- **Method**: Full attention with chunked CPU offload - **Method**: Full attention with chunked CPU offload
- **Error Rate**: 20% (120/600) - **Error Rate**: 20% (120/600) - **状态泄露导致**
- **Samples**: 100 samples per task - **Samples**: 100 samples per task
### Chunked Offload - 单样本测试 (Working)
- **Branch**: `tzj/minference`
- **Method**: Full attention with chunked CPU offload, 每个请求重新初始化 LLM
- **Error Rate**: 0% (niah_single_1), ~9% (multikey tasks)
- **Samples**: 100 samples per task
- **结论**: 算法正确multikey 失败是模型能力问题
--- ---
## Next Steps ## Next Steps (Updated)
1. **Reproduce with 4K context**: Test if issue exists with shorter contexts (fewer chunks) ### 已完成 ✅
2. **Vary chunk size**: Test with chunk_size=2048, 4096 to see if larger chunks help 1. ~~**Reproduce with 4K context**~~ - 不再需要,算法已验证正确
2. ~~**Vary chunk size**~~ - 不再需要,问题不在 chunk 大小
3. ~~**4-slot 配置测试**~~ - 已完成,有改善但不是根本原因
3. **Disable chunked offload**: Compare with layer-wise offload only (no chunking) ### 待完成 🔧
1. **定位状态泄露组件**: 调查连续请求间哪些状态未正确重置
- KV cache manager 的 `reset()``clear()` 方法
- Offload engine 的 ring buffer slot 状态
- Decode buffer 的跨请求隔离
- Sparse policy 的内部状态
2. **实现状态重置修复**: 在每个请求完成后正确清理所有状态
3. **验证修复**: 使用批量测试验证修复后准确率恢复到 ~95%+
4. **Add tensor checkpoints**: Log intermediate attention outputs at chunk boundaries 4. **Add tensor checkpoints**: Log intermediate attention outputs at chunk boundaries

View File

@@ -17,6 +17,15 @@ Usage:
# Test all samples in all datasets # Test all samples in all datasets
python tests/test_ruler.py --enable-offload python tests/test_ruler.py --enable-offload
# Test specific sample indices (comma-separated)
python tests/test_ruler.py --enable-offload --datasets niah_single_1 --sample-indices 28,33,40
# Single-sample mode: reinitialize LLM for each sample (avoids state leakage)
python tests/test_ruler.py --enable-offload --datasets niah_single_1 --fresh-llm
# JSON output mode for scripting
python tests/test_ruler.py --enable-offload --datasets niah_single_1 --json-output
""" """
import os import os
@@ -150,17 +159,30 @@ def run_task_test(
sample_indices: Optional[List[int]] = None, sample_indices: Optional[List[int]] = None,
max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS,
verbose: bool = True, verbose: bool = True,
llm_factory: Optional[callable] = None,
fresh_llm: bool = False,
) -> Dict: ) -> Dict:
""" """
Run test for a single RULER task. Run test for a single RULER task.
Args:
llm: LLM instance (ignored if fresh_llm=True)
task_name: Name of the task to test
data_dir: Path to data directory
sample_indices: Optional list of specific sample indices to test
max_new_tokens: Maximum tokens to generate
verbose: Print detailed output
llm_factory: Callable to create LLM instance (required if fresh_llm=True)
fresh_llm: If True, reinitialize LLM for each sample (avoids state leakage)
Returns dict with: task, correct, total, score, results Returns dict with: task, correct, total, score, results
""" """
data_file = data_dir / task_name / "validation.jsonl" data_file = data_dir / task_name / "validation.jsonl"
samples = load_samples(data_file, sample_indices) samples = load_samples(data_file, sample_indices)
if verbose: if verbose:
print(f"\n Testing {task_name}: {len(samples)} samples") mode_str = " [fresh-llm mode]" if fresh_llm else ""
print(f"\n Testing {task_name}: {len(samples)} samples{mode_str}")
sampling_params = SamplingParams( sampling_params = SamplingParams(
temperature=0.1, temperature=0.1,
@@ -171,13 +193,26 @@ def run_task_test(
total_score = 0.0 total_score = 0.0
results = [] results = []
current_llm = llm
for sample in samples: for sample in samples:
idx = sample.get("index", sample["_local_idx"]) idx = sample.get("index", sample["_local_idx"])
prompt = sample["input"] prompt = sample["input"]
expected = sample["outputs"] expected = sample["outputs"]
# Fresh LLM mode: reinitialize for each sample
if fresh_llm:
if llm_factory is None:
raise ValueError("llm_factory required when fresh_llm=True")
# Cleanup previous LLM
if current_llm is not None:
del current_llm
gc.collect()
torch.cuda.empty_cache()
current_llm = llm_factory()
# Generate # Generate
outputs = llm.generate([prompt], sampling_params, use_tqdm=False) outputs = current_llm.generate([prompt], sampling_params, use_tqdm=False)
output_text = outputs[0]["text"] output_text = outputs[0]["text"]
# Evaluate # Evaluate
@@ -200,6 +235,12 @@ def run_task_test(
out_preview = output_text[:50].replace('\n', ' ') out_preview = output_text[:50].replace('\n', ' ')
print(f" [{idx:3d}] {status} (score={score:.2f}) exp={exp_preview}... | out={out_preview}...") print(f" [{idx:3d}] {status} (score={score:.2f}) exp={exp_preview}... | out={out_preview}...")
# Cleanup last LLM instance in fresh mode
if fresh_llm and current_llm is not None:
del current_llm
gc.collect()
torch.cuda.empty_cache()
avg_score = total_score / len(samples) if samples else 0.0 avg_score = total_score / len(samples) if samples else 0.0
return { return {
@@ -217,6 +258,7 @@ def run_ruler_benchmark(
data_dir: Path, data_dir: Path,
datasets: Optional[List[str]] = None, datasets: Optional[List[str]] = None,
num_samples: Optional[int] = None, num_samples: Optional[int] = None,
sample_indices: Optional[List[int]] = None,
max_model_len: int = DEFAULT_MAX_MODEL_LEN, max_model_len: int = DEFAULT_MAX_MODEL_LEN,
max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS,
enable_cpu_offload: bool = False, enable_cpu_offload: bool = False,
@@ -226,6 +268,8 @@ def run_ruler_benchmark(
gpu_utilization: float = 0.9, gpu_utilization: float = 0.9,
enforce_eager: bool = True, enforce_eager: bool = True,
verbose: bool = True, verbose: bool = True,
fresh_llm: bool = False,
json_output: bool = False,
sparse_policy: Optional[str] = None, sparse_policy: Optional[str] = None,
sparse_threshold: float = 0.9, sparse_threshold: float = 0.9,
sparse_samples: int = 128, sparse_samples: int = 128,
@@ -239,7 +283,9 @@ def run_ruler_benchmark(
data_dir: Directory containing task subdirectories data_dir: Directory containing task subdirectories
datasets: List of task names to test (None = all) datasets: List of task names to test (None = all)
num_samples: Number of samples per task (None = all) num_samples: Number of samples per task (None = all)
...other LLM config params... sample_indices: Specific sample indices to test (overrides num_samples)
fresh_llm: If True, reinitialize LLM for each sample (avoids state leakage)
json_output: If True, output JSON results at the end
sparse_policy: Sparse attention policy (FULL, QUEST, MINFERENCE, XATTN) sparse_policy: Sparse attention policy (FULL, QUEST, MINFERENCE, XATTN)
Returns: Returns:
@@ -251,21 +297,29 @@ def run_ruler_benchmark(
else: else:
tasks = datasets tasks = datasets
# Sample indices # Sample indices: explicit list takes precedence over num_samples
sample_indices = list(range(num_samples)) if num_samples else None if sample_indices is not None:
indices = sample_indices
elif num_samples:
indices = list(range(num_samples))
else:
indices = None
samples_desc = str(sample_indices) if sample_indices else (str(num_samples) if num_samples else 'all')
if not json_output:
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"RULER Benchmark") print(f"RULER Benchmark")
print(f"{'='*60}") print(f"{'='*60}")
print(f"Model: {model_path}") print(f"Model: {model_path}")
print(f"Data dir: {data_dir}") print(f"Data dir: {data_dir}")
print(f"Tasks: {len(tasks)}") print(f"Tasks: {len(tasks)}")
print(f"Samples per task: {num_samples if num_samples else 'all'}") print(f"Samples: {samples_desc}")
print(f"CPU offload: {enable_cpu_offload}") print(f"CPU offload: {enable_cpu_offload}")
print(f"Fresh LLM mode: {fresh_llm}")
print(f"{'='*60}") print(f"{'='*60}")
# Initialize LLM # LLM initialization kwargs
print("\nInitializing LLM...")
llm_kwargs = { llm_kwargs = {
"max_model_len": max_model_len, "max_model_len": max_model_len,
"max_num_batched_tokens": max_model_len, "max_num_batched_tokens": max_model_len,
@@ -286,7 +340,16 @@ def run_ruler_benchmark(
llm_kwargs["sparse_threshold"] = sparse_threshold llm_kwargs["sparse_threshold"] = sparse_threshold
llm_kwargs["sparse_samples_per_chunk"] = sparse_samples llm_kwargs["sparse_samples_per_chunk"] = sparse_samples
llm = LLM(model_path, **llm_kwargs) # Factory function for fresh_llm mode
def create_llm():
return LLM(model_path, **llm_kwargs)
# Initialize LLM (only once if not fresh_llm mode)
llm = None
if not fresh_llm:
if not json_output:
print("\nInitializing LLM...")
llm = create_llm()
# Run tests # Run tests
start_time = time.time() start_time = time.time()
@@ -297,19 +360,22 @@ def run_ruler_benchmark(
llm=llm, llm=llm,
task_name=task_name, task_name=task_name,
data_dir=data_dir, data_dir=data_dir,
sample_indices=sample_indices, sample_indices=indices,
max_new_tokens=max_new_tokens, max_new_tokens=max_new_tokens,
verbose=verbose, verbose=verbose and not json_output,
llm_factory=create_llm,
fresh_llm=fresh_llm,
) )
task_results.append(result) task_results.append(result)
if verbose: if verbose and not json_output:
print(f" -> {task_name}: {result['correct']}/{result['total']} " print(f" -> {task_name}: {result['correct']}/{result['total']} "
f"({result['accuracy']*100:.1f}%) avg_score={result['avg_score']:.3f}") f"({result['accuracy']*100:.1f}%) avg_score={result['avg_score']:.3f}")
total_time = time.time() - start_time total_time = time.time() - start_time
# Cleanup # Cleanup (only if not fresh_llm mode, since fresh mode cleans up itself)
if llm is not None:
del llm del llm
gc.collect() gc.collect()
torch.cuda.empty_cache() torch.cuda.empty_cache()
@@ -320,7 +386,15 @@ def run_ruler_benchmark(
overall_accuracy = total_correct / total_samples if total_samples > 0 else 0.0 overall_accuracy = total_correct / total_samples if total_samples > 0 else 0.0
avg_score = sum(r["avg_score"] for r in task_results) / len(task_results) if task_results else 0.0 avg_score = sum(r["avg_score"] for r in task_results) / len(task_results) if task_results else 0.0
# Collect failed samples
failed_samples = {}
for r in task_results:
failed = [res["index"] for res in r["results"] if not res["passed"]]
if failed:
failed_samples[r["task"]] = failed
# Print summary # Print summary
if not json_output:
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"RULER Benchmark Results") print(f"RULER Benchmark Results")
print(f"{'='*60}") print(f"{'='*60}")
@@ -333,15 +407,32 @@ def run_ruler_benchmark(
print(f"\nTime: {total_time:.1f}s") print(f"\nTime: {total_time:.1f}s")
print(f"{'='*60}\n") print(f"{'='*60}\n")
return { results = {
"total_correct": total_correct, "total_correct": total_correct,
"total_samples": total_samples, "total_samples": total_samples,
"overall_accuracy": overall_accuracy, "overall_accuracy": overall_accuracy,
"avg_score": avg_score, "avg_score": avg_score,
"time": total_time, "time": total_time,
"task_results": task_results, "task_results": task_results,
"failed_samples": failed_samples,
} }
# JSON output
if json_output:
json_results = {
"total_correct": total_correct,
"total_samples": total_samples,
"overall_accuracy": overall_accuracy,
"avg_score": avg_score,
"time": total_time,
"tasks": {r["task"]: {"correct": r["correct"], "total": r["total"], "accuracy": r["accuracy"]}
for r in task_results},
"failed_samples": failed_samples,
}
print(json.dumps(json_results, indent=2))
return results
# ============================================================ # ============================================================
# CLI Entry Point # CLI Entry Point
@@ -361,6 +452,8 @@ if __name__ == "__main__":
help="Comma-separated list of datasets to test (default: all)") help="Comma-separated list of datasets to test (default: all)")
parser.add_argument("--num-samples", type=int, default=0, parser.add_argument("--num-samples", type=int, default=0,
help="Number of samples per dataset (default: 0 = all)") help="Number of samples per dataset (default: 0 = all)")
parser.add_argument("--sample-indices", type=str, default="",
help="Comma-separated specific sample indices (e.g., 28,33,40)")
parser.add_argument("--max-model-len", type=int, default=DEFAULT_MAX_MODEL_LEN, parser.add_argument("--max-model-len", type=int, default=DEFAULT_MAX_MODEL_LEN,
help=f"Maximum model context length (default: {DEFAULT_MAX_MODEL_LEN})") help=f"Maximum model context length (default: {DEFAULT_MAX_MODEL_LEN})")
parser.add_argument("--max-new-tokens", type=int, default=DEFAULT_MAX_NEW_TOKENS, parser.add_argument("--max-new-tokens", type=int, default=DEFAULT_MAX_NEW_TOKENS,
@@ -379,6 +472,10 @@ if __name__ == "__main__":
help="Enable CUDA graph") help="Enable CUDA graph")
parser.add_argument("--quiet", "-q", action="store_true", parser.add_argument("--quiet", "-q", action="store_true",
help="Quiet mode") help="Quiet mode")
parser.add_argument("--fresh-llm", action="store_true",
help="Reinitialize LLM for each sample (avoids state leakage)")
parser.add_argument("--json-output", action="store_true",
help="Output results in JSON format")
parser.add_argument("--sparse-policy", type=str, default="", parser.add_argument("--sparse-policy", type=str, default="",
help="Sparse attention policy (FULL, QUEST, XATTN_BSA)") help="Sparse attention policy (FULL, QUEST, XATTN_BSA)")
# XAttention BSA specific parameters # XAttention BSA specific parameters
@@ -395,6 +492,11 @@ if __name__ == "__main__":
datasets = args.datasets.split(",") if args.datasets else None datasets = args.datasets.split(",") if args.datasets else None
num_samples = args.num_samples if args.num_samples > 0 else None num_samples = args.num_samples if args.num_samples > 0 else None
# Parse sample indices (takes precedence over num_samples)
sample_indices = None
if args.sample_indices:
sample_indices = [int(x.strip()) for x in args.sample_indices.split(",")]
# Parse sparse policy # Parse sparse policy
sparse_policy_str = args.sparse_policy.upper() if args.sparse_policy else None sparse_policy_str = args.sparse_policy.upper() if args.sparse_policy else None
@@ -403,6 +505,7 @@ if __name__ == "__main__":
data_dir=Path(args.data_dir), data_dir=Path(args.data_dir),
datasets=datasets, datasets=datasets,
num_samples=num_samples, num_samples=num_samples,
sample_indices=sample_indices,
max_model_len=args.max_model_len, max_model_len=args.max_model_len,
max_new_tokens=args.max_new_tokens, max_new_tokens=args.max_new_tokens,
enable_cpu_offload=args.enable_offload, enable_cpu_offload=args.enable_offload,
@@ -412,13 +515,16 @@ if __name__ == "__main__":
gpu_utilization=args.gpu_utilization, gpu_utilization=args.gpu_utilization,
enforce_eager=not args.use_cuda_graph, enforce_eager=not args.use_cuda_graph,
verbose=not args.quiet, verbose=not args.quiet,
fresh_llm=args.fresh_llm,
json_output=args.json_output,
sparse_policy=sparse_policy_str, sparse_policy=sparse_policy_str,
sparse_threshold=args.sparse_threshold, sparse_threshold=args.sparse_threshold,
sparse_samples=args.sparse_samples, sparse_samples=args.sparse_samples,
sparse_block_size=args.sparse_block_size, sparse_block_size=args.sparse_block_size,
) )
# Exit code # Exit code (skip for json output mode)
if not args.json_output:
if results["overall_accuracy"] >= 0.5: if results["overall_accuracy"] >= 0.5:
print("test_ruler: PASSED") print("test_ruler: PASSED")
else: else: