feat: improve filter support for scanbench (#7736)

* feat: cast filters type for scanbench

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: pub file_range mod

So we can use the pub struct FileRange in other places

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: add api as dev-dependency to cmd for clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support profiling after warmup

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-03 17:00:41 +08:00
committed by GitHub
parent b2074e3863
commit 5c8ece27e0
5 changed files with 210 additions and 11 deletions

1
Cargo.lock generated
View File

@@ -2034,6 +2034,7 @@ dependencies = [
name = "cmd"
version = "1.0.0-rc.1"
dependencies = [
"api",
"async-trait",
"auth",
"base64 0.22.1",

View File

@@ -27,6 +27,7 @@ cargo build -p cmd --bin greptime
[--force-flat-format] \
[--enable-wal] \
[--pprof-file <FLAMEGRAPH_SVG>] \
[--pprof-after-warmup] \
[--verbose]
```
@@ -51,6 +52,7 @@ cargo build -p cmd --bin greptime
- `--force-flat-format`: Force reading the region in flat format. Default: disabled.
- `--enable-wal`: Enable WAL replay when opening the region. Default: disabled. When enabled, scanbench uses the log store configured in the `[wal]` section of the config TOML (raft-engine or Kafka). When disabled or when no WAL is configured, a `NoopLogStore` is used.
- `--pprof-file`: Output flamegraph path (Unix only).
- `--pprof-after-warmup`: Start profiling after the first iteration, using it as a warmup. Requires `--pprof-file`. Default: disabled.
- `--verbose` / `-v`: Enable verbose output.
## Scan Config JSON

View File

@@ -106,6 +106,7 @@ pprof = { version = "0.14", features = [
tikv-jemallocator = "0.6"
[dev-dependencies]
api.workspace = true
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true

View File

@@ -27,9 +27,11 @@ use common_meta::key::SchemaMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_wal::config::DatanodeWalConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::Expr as DfExpr;
use datafusion_common::ToDFSchema;
use datafusion::logical_expr::{BinaryExpr, Expr as DfExpr, ExprSchemable, Operator};
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, ScalarValue, ToDFSchema};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::compute;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use log_store::kafka::log_store::KafkaLogStore;
@@ -107,6 +109,10 @@ pub struct ScanbenchCommand {
/// Enable WAL replay when opening the region.
#[clap(long, default_value_t = false)]
enable_wal: bool,
/// Start pprof after the first iteration (use first iteration as warmup).
#[clap(long, default_value_t = false)]
pprof_after_warmup: bool,
}
/// JSON config for scan request parameters.
@@ -162,6 +168,21 @@ fn resolve_projection(
Ok(None)
}
fn format_bytes(bytes: u64) -> String {
const KIB: u64 = 1024;
const MIB: u64 = 1024 * KIB;
const GIB: u64 = 1024 * MIB;
if bytes >= GIB {
format!("{:.2} GiB", bytes as f64 / GIB as f64)
} else if bytes >= MIB {
format!("{:.2} MiB", bytes as f64 / MIB as f64)
} else if bytes >= KIB {
format!("{:.2} KiB", bytes as f64 / KIB as f64)
} else {
format!("{} B", bytes)
}
}
fn parse_region_id(s: &str) -> error::Result<RegionId> {
if s.contains(':') {
let parts: Vec<&str> = s.splitn(2, ':').collect();
@@ -201,6 +222,85 @@ fn parse_path_type(s: &str) -> error::Result<PathType> {
}
}
/// Rewrites literal values in comparison expressions to match the column's arrow type.
struct LiteralTypeCaster {
schema: DFSchemaRef,
}
impl TreeNodeRewriter for LiteralTypeCaster {
type Node = DfExpr;
fn f_up(&mut self, expr: DfExpr) -> datafusion_common::Result<Transformed<DfExpr>> {
let DfExpr::BinaryExpr(BinaryExpr { left, op, right }) = &expr else {
return Ok(Transformed::no(expr));
};
if !matches!(
op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
) {
return Ok(Transformed::no(expr));
}
let (col_expr, lit_expr, col_left) = match (left.as_ref(), right.as_ref()) {
(col @ DfExpr::Column(_), lit @ DfExpr::Literal(_, _)) => (col, lit, true),
(lit @ DfExpr::Literal(_, _), col @ DfExpr::Column(_)) => (col, lit, false),
_ => return Ok(Transformed::no(expr)),
};
let col_type = col_expr.get_type(self.schema.as_ref())?;
let DfExpr::Literal(scalar, _) = lit_expr else {
unreachable!()
};
if scalar.data_type() == col_type {
return Ok(Transformed::no(expr));
}
let lit_array = scalar.to_array()?;
let casted = compute::cast(lit_array.as_ref(), &col_type).map_err(|e| {
datafusion_common::DataFusionError::Internal(format!(
"Failed to cast literal {:?} to {:?}: {}",
scalar, col_type, e
))
})?;
let casted_scalar = ScalarValue::try_from_array(&casted, 0)?;
let new_lit = DfExpr::Literal(casted_scalar, None);
let (new_left, new_right) = if col_left {
(left.clone(), Box::new(new_lit))
} else {
(Box::new(new_lit), right.clone())
};
Ok(Transformed::yes(DfExpr::BinaryExpr(BinaryExpr {
left: new_left,
op: *op,
right: new_right,
})))
}
}
fn convert_literal_types(
exprs: Vec<DfExpr>,
schema: &DFSchemaRef,
) -> datafusion_common::Result<Vec<DfExpr>> {
use datafusion_common::tree_node::TreeNode;
let mut caster = LiteralTypeCaster {
schema: schema.clone(),
};
exprs
.into_iter()
.map(|e| e.rewrite(&mut caster).map(|x| x.data))
.collect()
}
fn resolve_filters(
scan_config: &ScanConfig,
metadata: &RegionMetadata,
@@ -227,7 +327,7 @@ fn resolve_filters(
.with_default_features()
.build();
filters
let exprs: Vec<DfExpr> = filters
.iter()
.enumerate()
.map(|(idx, filter)| {
@@ -264,7 +364,15 @@ fn resolve_filters(
.build()
})
})
.collect()
.collect::<error::Result<Vec<_>>>()?;
let df_schema_ref = Arc::new(df_schema);
convert_literal_types(exprs, &df_schema_ref).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to convert filter expression types: {e}"),
}
.build()
})
}
fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
@@ -480,9 +588,9 @@ impl ScanbenchCommand {
self.force_flat_format,
);
// Start profiling if pprof_file is specified
// Start profiling if pprof_file is specified (unless pprof_after_warmup is set)
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
let mut profiler_guard = if self.pprof_file.is_some() && !self.pprof_after_warmup {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
@@ -582,21 +690,29 @@ impl ScanbenchCommand {
scan_futures.push(tokio::spawn(async move {
let mut rows = 0u64;
let mut array_mem_size = 0u64;
let mut estimated_size = 0u64;
while let Some(batch_result) = stream.next().await {
match batch_result {
Ok(batch) => {
rows += batch.num_rows() as u64;
let df_batch = batch.df_record_batch();
array_mem_size += df_batch.get_array_memory_size() as u64;
estimated_size +=
mito2::memtable::record_batch_estimated_size(df_batch) as u64;
}
Err(e) => {
return Err(BoxedError::new(e));
}
}
}
Ok::<u64, BoxedError>(rows)
Ok::<(u64, u64, u64), BoxedError>((rows, array_mem_size, estimated_size))
}));
}
let mut total_rows = 0u64;
let mut total_array_mem_size = 0u64;
let mut total_estimated_size = 0u64;
while let Some(task) = scan_futures.next().await {
let result = task
.map_err(|e| {
@@ -606,8 +722,11 @@ impl ScanbenchCommand {
))
})
.context(error::BuildCliSnafu)?;
let rows = result.context(error::BuildCliSnafu)?;
let (rows, array_mem_size, estimated_size) =
result.context(error::BuildCliSnafu)?;
total_rows += rows;
total_array_mem_size += array_mem_size;
total_estimated_size += estimated_size;
}
let elapsed = start.elapsed();
@@ -615,12 +734,40 @@ impl ScanbenchCommand {
total_elapsed_all += elapsed;
println!(
" [iter {}] {} rows in {:?} ({} partitions)",
" [iter {}] {} rows in {:?} ({} partitions), array_mem_size: {}, estimated_size: {}",
iteration + 1,
total_rows.to_string().cyan(),
elapsed,
num_partitions,
format_bytes(total_array_mem_size),
format_bytes(total_estimated_size),
);
// Start profiling after the first iteration (warmup) if pprof_after_warmup is set
#[cfg(unix)]
if iteration == 0
&& self.pprof_after_warmup
&& self.pprof_file.is_some()
&& profiler_guard.is_none()
{
println!(
"{} Starting profiling after warmup iteration...",
"".yellow()
);
profiler_guard = Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Failed to start profiler: {e}"),
StatusCode::Unexpected,
))
})
.context(error::BuildCliSnafu)?,
);
}
}
// Stop profiling and generate flamegraph if enabled
@@ -673,11 +820,15 @@ impl ScanbenchCommand {
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use sqlparser::ast::{BinaryOperator, Expr};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::{ScanConfig, resolve_projection};
use super::{ScanConfig, resolve_filters, resolve_projection};
use crate::error;
#[test]
@@ -752,6 +903,50 @@ mod tests {
}
}
#[test]
fn test_resolve_filters_uint32_type_conversion() {
use api::v1::SemanticType;
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"table_id",
ConcreteDataType::uint32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1]);
let metadata = builder.build().unwrap();
let config = ScanConfig {
projection: None,
projection_names: None,
filters: Some(vec!["table_id = 1117".to_string()]),
series_row_selector: None,
};
let exprs = resolve_filters(&config, &metadata).unwrap();
assert_eq!(exprs.len(), 1);
// The expression should contain a UInt32 literal after type conversion.
let expr_str = format!("{}", exprs[0]);
assert!(
expr_str.contains("UInt32(1117)"),
"Expected UInt32(1117) in expression, got: {expr_str}"
);
}
#[test]
fn test_parse_scan_config_filters() {
let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;

View File

@@ -24,7 +24,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput;
pub(crate) mod file_range;
pub mod file_range;
pub mod flat_format;
pub mod format;
pub(crate) mod helper;