Files
greptimedb/tests-fuzz/src/utils/csv_dump_writer.rs
Weny Xu 20f38d8a6a test(fuzz): add metric table repartition fuzz target (#7754)
* test: add fuzz_repartition_metric_table target scaffold

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add metric logical lifecycle in repartition fuzz target

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: support partitioned metric tables in repartition fuzz

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add repartition loop and partition assertions for metric target

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: use shared timestamp clock in metric repartition writes

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: unify string value and bound generation for fuzzing

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: use fixed physical table name in metric repartition fuzz

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fmt

Signed-off-by: WenyXu <wenymedia@gmail.com>

* ci: update ci config

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: use btreemap

Signed-off-by: WenyXu <wenymedia@gmail.com>

* print count result

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add csv translator for insert expr

Introduce a dedicated top-level csv translator so fuzz insert expressions can be converted into writer-ready records through a structured path instead of ad-hoc formatting in targets.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add csv dump session utilities

Introduce CSV dump env helpers and a session writer that creates run directories, emits seed metadata, and flushes staged CSV records for fuzz workflows.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: bound csv dump buffer with auto flush

Parse readable buffer sizes from env and flush staged CSV records automatically when the in-memory threshold is reached to prevent unbounded growth during long fuzz runs.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: flush csv dump before repartition validation

Wire csv dump session into the metric repartition fuzz flow so successful inserts are translated from insert expressions into CSV records during write loops and flushed to disk right before row validation.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: keep csv dumps on failure and cleanup on pass

Capture run outcomes in metric repartition fuzz, remove dump directories only after successful validation, and retain dump paths on failures so CI and local investigations can use the same artifacts.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: align partial csv records with table headers

Keep append payload compact by storing partial insert-expression columns, then expand to full table-context headers at flush time and fill missing values with empty strings.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* dump csv

Signed-off-by: WenyXu <wenymedia@gmail.com>

* ci: dump csv

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add table-scoped sql dump writer primitives

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: capture table-scoped sql traces after execution

Record insert and repartition SQL only after successful execution, include started_at_ms and elapsed_ms in trace comments, and broadcast repartition events into every logical-table trace file for consistent debugging context.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: harden sql trace comments and include create sql

Normalize multiline trace comments into valid SQL comment lines and append logical-table CREATE SQL to per-table traces for better timeline reconstruction during repartition debugging.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: dump physical create and repartition SQL traces

Signed-off-by: WenyXu <wenymedia@gmail.com>

* dump repartition sql

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: scaffold writer control channel for barrier flow

Add Barrier/Resume/Stop control skeleton and channel wiring in write_loop to prepare per-repartition validation barriers. Also align SQL dump tests with broadcast SQL payload behavior.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: implement writer barrier pause and resume control

Make writer control messages effective by pausing writes on barrier, resuming on resume, and stopping via channel signaling so the next commit can enforce deterministic per-repartition validation boundaries.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: validate rows after each repartition barrier

Add per-action barrier/ack synchronization with timeout, run immediate logical-table row validation after each repartition, and resume writer only after validation completes to improve minimal failure localization.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: flush dump sessions before per-epoch validation

Extract a shared flush-and-snapshot helper and call it before each immediate row validation so CSV/SQL artifacts are persisted at the same epoch boundary being validated.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add retry

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-03-13 08:00:09 +00:00

384 lines
12 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions, create_dir_all, remove_dir_all};
use std::io::Write;
use std::path::{Path, PathBuf};
use common_telemetry::{info, warn};
use common_time::util::current_time_millis;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::translator::csv::CsvRecords;
use crate::utils::{
get_gt_fuzz_dump_buffer_max_bytes, get_gt_fuzz_dump_dir, get_gt_fuzz_dump_suffix,
};
/// Metadata for one CSV dump session.
#[derive(Debug, Clone)]
pub struct CsvDumpMetadata {
/// Fuzz target name.
pub target: String,
/// Seed used by current fuzz input.
pub seed: u64,
/// Repartition action count.
pub actions: usize,
/// Initial partition count.
pub partitions: usize,
/// Logical table count.
pub tables: usize,
/// Session start time in unix milliseconds.
pub started_at_unix_ms: i64,
}
impl CsvDumpMetadata {
/// Builds dump metadata with current timestamp.
pub fn new(
target: impl Into<String>,
seed: u64,
actions: usize,
partitions: usize,
tables: usize,
) -> Self {
Self {
target: target.into(),
seed,
actions,
partitions,
tables,
started_at_unix_ms: current_time_millis(),
}
}
}
/// Session writer for staged CSV dump records.
#[derive(Debug)]
pub struct CsvDumpSession {
/// Session metadata.
pub metadata: CsvDumpMetadata,
/// Session directory path.
pub run_dir: PathBuf,
/// Max in-memory buffer size before auto flush.
pub max_buffer_bytes: usize,
records: Vec<CsvRecords>,
buffered_bytes: usize,
written_tables: HashSet<String>,
full_headers_by_table: HashMap<String, Vec<String>>,
}
impl CsvDumpSession {
/// Creates session directory and writes seed metadata file.
pub fn new(metadata: CsvDumpMetadata) -> Result<Self> {
Self::new_with_buffer_limit(metadata, get_gt_fuzz_dump_buffer_max_bytes())
}
/// Creates session with a custom in-memory buffer limit.
pub fn new_with_buffer_limit(
metadata: CsvDumpMetadata,
max_buffer_bytes: usize,
) -> Result<Self> {
let run_dir = build_run_dir(&metadata);
create_dir_all(&run_dir).context(error::CreateFileSnafu {
path: run_dir.to_string_lossy().to_string(),
})?;
write_seed_meta(&run_dir, &metadata)?;
info!(
"Create csv dump session, target: {}, run_dir: {}, max_buffer_bytes: {}",
metadata.target,
run_dir.display(),
max_buffer_bytes
);
Ok(Self {
metadata,
run_dir,
max_buffer_bytes,
records: Vec::new(),
buffered_bytes: 0,
written_tables: HashSet::new(),
full_headers_by_table: HashMap::new(),
})
}
/// Appends one table CSV records batch with full table headers.
pub fn append(&mut self, records: CsvRecords, full_headers: Vec<String>) -> Result<()> {
self.full_headers_by_table
.entry(records.table_name.clone())
.or_insert(full_headers);
self.buffered_bytes += estimate_csv_records_size(&records);
self.records.push(records);
if self.buffered_bytes >= self.max_buffer_bytes {
self.flush_buffered_records()?;
}
Ok(())
}
/// Flushes all appended batches to CSV files.
pub fn flush_all(&mut self) -> Result<()> {
self.flush_buffered_records()
}
/// Removes session directory after successful validation.
pub fn cleanup_on_success(&self) -> std::io::Result<()> {
match remove_dir_all(&self.run_dir) {
Ok(_) => {
info!(
"Cleanup csv dump directory on success: {}",
self.run_dir.display()
);
Ok(())
}
Err(err) => {
warn!(
"Cleanup csv dump directory failed: {}, error: {:?}",
self.run_dir.display(),
err
);
Err(err)
}
}
}
fn flush_buffered_records(&mut self) -> Result<()> {
if self.records.is_empty() {
return Ok(());
}
for batch in &self.records {
write_batch_csv(
&self.run_dir,
batch,
&mut self.written_tables,
&self.full_headers_by_table,
)?;
}
self.records.clear();
self.buffered_bytes = 0;
Ok(())
}
}
fn write_seed_meta(run_dir: &Path, metadata: &CsvDumpMetadata) -> Result<()> {
let path = run_dir.join("seed.meta");
let mut file = File::create(&path).context(error::CreateFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
let content = format!(
"target={}\nseed={}\nactions={}\npartitions={}\ntables={}\nstarted_at_unix_ms={}\n",
metadata.target,
metadata.seed,
metadata.actions,
metadata.partitions,
metadata.tables,
metadata.started_at_unix_ms,
);
file.write_all(content.as_bytes())
.context(error::WriteFileSnafu {
path: path.to_string_lossy().to_string(),
})
}
fn write_batch_csv(
run_dir: &Path,
batch: &CsvRecords,
written_tables: &mut HashSet<String>,
full_headers_by_table: &HashMap<String, Vec<String>>,
) -> Result<()> {
let output_headers = full_headers_by_table
.get(&batch.table_name)
.cloned()
.unwrap_or_else(|| batch.headers.clone());
let file_name = format!("{}.table-data.csv", sanitize_file_name(&batch.table_name));
let path = run_dir.join(file_name);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.context(error::CreateFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
if written_tables.insert(batch.table_name.clone()) {
file.write_all(join_line(&output_headers).as_bytes())
.context(error::WriteFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
file.write_all(b"\n").context(error::WriteFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
}
let header_index = batch
.headers
.iter()
.enumerate()
.map(|(idx, header)| (header.as_str(), idx))
.collect::<HashMap<_, _>>();
for record in &batch.records {
let aligned_values = output_headers
.iter()
.map(|header| {
header_index
.get(header.as_str())
.and_then(|idx| record.values.get(*idx))
.cloned()
.unwrap_or_default()
})
.collect::<Vec<_>>();
file.write_all(join_line(&aligned_values).as_bytes())
.context(error::WriteFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
file.write_all(b"\n").context(error::WriteFileSnafu {
path: path.to_string_lossy().to_string(),
})?;
}
Ok(())
}
fn estimate_csv_records_size(records: &CsvRecords) -> usize {
let headers = records.headers.iter().map(String::len).sum::<usize>();
let rows = records
.records
.iter()
.flat_map(|record| record.values.iter())
.map(String::len)
.sum::<usize>();
headers + rows
}
fn join_line(cells: &[String]) -> String {
cells
.iter()
.map(|cell| escape_csv_cell(cell))
.collect::<Vec<_>>()
.join(",")
}
fn escape_csv_cell(value: &str) -> String {
if value.contains([',', '"', '\n', '\r']) {
format!("\"{}\"", value.replace('"', "\"\""))
} else {
value.to_string()
}
}
fn sanitize_file_name(raw: &str) -> String {
raw.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
ch
} else {
'_'
}
})
.collect()
}
fn build_run_dir(metadata: &CsvDumpMetadata) -> PathBuf {
let base = PathBuf::from(get_gt_fuzz_dump_dir());
let suffix = get_gt_fuzz_dump_suffix();
let name = format!(
"{}_seed_{}_actions_{}_ts_{}{}",
metadata.target, metadata.seed, metadata.actions, metadata.started_at_unix_ms, suffix
);
base.join(name)
}
#[cfg(test)]
mod tests {
use super::{CsvDumpMetadata, CsvDumpSession};
use crate::translator::csv::{CsvRecord, CsvRecords};
#[test]
fn test_create_session_and_flush() {
let mut session = CsvDumpSession::new_with_buffer_limit(
CsvDumpMetadata::new("fuzz_case", 1, 2, 3, 4),
1024,
)
.unwrap();
session
.append(
CsvRecords {
table_name: "metric-a".to_string(),
headers: vec!["host".to_string(), "value".to_string()],
records: vec![CsvRecord {
values: vec!["web-1".to_string(), "10".to_string()],
}],
},
vec!["host".to_string(), "value".to_string()],
)
.unwrap();
session.flush_all().unwrap();
assert!(session.run_dir.exists());
assert!(session.run_dir.join("seed.meta").exists());
assert!(session.run_dir.join("metric-a.table-data.csv").exists());
}
#[test]
fn test_auto_flush_on_buffer_limit() {
let mut session =
CsvDumpSession::new_with_buffer_limit(CsvDumpMetadata::new("fuzz_case", 5, 2, 3, 4), 1)
.unwrap();
session
.append(
CsvRecords {
table_name: "metric-b".to_string(),
headers: vec!["host".to_string()],
records: vec![CsvRecord {
values: vec!["web-2".to_string()],
}],
},
vec!["host".to_string()],
)
.unwrap();
assert!(session.run_dir.join("metric-b.table-data.csv").exists());
assert_eq!(session.buffered_bytes, 0);
}
#[test]
fn test_flush_with_partial_headers_uses_full_headers() {
let mut session = CsvDumpSession::new_with_buffer_limit(
CsvDumpMetadata::new("fuzz_case", 7, 2, 3, 4),
1024,
)
.unwrap();
session
.append(
CsvRecords {
table_name: "metric-c".to_string(),
headers: vec!["host".to_string(), "value".to_string()],
records: vec![CsvRecord {
values: vec!["web-3".to_string(), "12".to_string()],
}],
},
vec!["host".to_string(), "idc".to_string(), "value".to_string()],
)
.unwrap();
session.flush_all().unwrap();
let file =
std::fs::read_to_string(session.run_dir.join("metric-c.table-data.csv")).unwrap();
let mut lines = file.lines();
assert_eq!(lines.next().unwrap(), "host,idc,value");
assert_eq!(lines.next().unwrap(), "web-3,,12");
}
}