From 5c8ece27e04ec4f334a7c021a76bc11b60590311 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 3 Mar 2026 17:00:41 +0800 Subject: [PATCH] feat: improve filter support for scanbench (#7736) * feat: cast filters type for scanbench Signed-off-by: evenyag * chore: pub file_range mod So we can use the pub struct FileRange in other places Signed-off-by: evenyag * fix: add api as dev-dependency to cmd for clippy Signed-off-by: evenyag * feat: support profiling after warmup Signed-off-by: evenyag --------- Signed-off-by: evenyag --- Cargo.lock | 1 + docs/scanbench.md | 2 + src/cmd/Cargo.toml | 1 + src/cmd/src/datanode/scanbench.rs | 215 ++++++++++++++++++++++++++++-- src/mito2/src/sst/parquet.rs | 2 +- 5 files changed, 210 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a084f7f5c..64093278e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2034,6 +2034,7 @@ dependencies = [ name = "cmd" version = "1.0.0-rc.1" dependencies = [ + "api", "async-trait", "auth", "base64 0.22.1", diff --git a/docs/scanbench.md b/docs/scanbench.md index afe4734fee..cb83f2ef76 100644 --- a/docs/scanbench.md +++ b/docs/scanbench.md @@ -27,6 +27,7 @@ cargo build -p cmd --bin greptime [--force-flat-format] \ [--enable-wal] \ [--pprof-file ] \ + [--pprof-after-warmup] \ [--verbose] ``` @@ -51,6 +52,7 @@ cargo build -p cmd --bin greptime - `--force-flat-format`: Force reading the region in flat format. Default: disabled. - `--enable-wal`: Enable WAL replay when opening the region. Default: disabled. When enabled, scanbench uses the log store configured in the `[wal]` section of the config TOML (raft-engine or Kafka). When disabled or when no WAL is configured, a `NoopLogStore` is used. - `--pprof-file`: Output flamegraph path (Unix only). +- `--pprof-after-warmup`: Start profiling after the first iteration, using it as a warmup. Requires `--pprof-file`. Default: disabled. - `--verbose` / `-v`: Enable verbose output. ## Scan Config JSON diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index f3a633bf17..74309e2024 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -106,6 +106,7 @@ pprof = { version = "0.14", features = [ tikv-jemallocator = "0.6" [dev-dependencies] +api.workspace = true client = { workspace = true, features = ["testing"] } common-test-util.workspace = true common-version.workspace = true diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index 657c97395a..fdda1d97bb 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -27,9 +27,11 @@ use common_meta::key::SchemaMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_wal::config::DatanodeWalConfig; use datafusion::execution::SessionStateBuilder; -use datafusion::logical_expr::Expr as DfExpr; -use datafusion_common::ToDFSchema; +use datafusion::logical_expr::{BinaryExpr, Expr as DfExpr, ExprSchemable, Operator}; +use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::{DFSchemaRef, ScalarValue, ToDFSchema}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datatypes::arrow::compute; use futures::StreamExt; use futures::stream::FuturesUnordered; use log_store::kafka::log_store::KafkaLogStore; @@ -107,6 +109,10 @@ pub struct ScanbenchCommand { /// Enable WAL replay when opening the region. #[clap(long, default_value_t = false)] enable_wal: bool, + + /// Start pprof after the first iteration (use first iteration as warmup). + #[clap(long, default_value_t = false)] + pprof_after_warmup: bool, } /// JSON config for scan request parameters. @@ -162,6 +168,21 @@ fn resolve_projection( Ok(None) } +fn format_bytes(bytes: u64) -> String { + const KIB: u64 = 1024; + const MIB: u64 = 1024 * KIB; + const GIB: u64 = 1024 * MIB; + if bytes >= GIB { + format!("{:.2} GiB", bytes as f64 / GIB as f64) + } else if bytes >= MIB { + format!("{:.2} MiB", bytes as f64 / MIB as f64) + } else if bytes >= KIB { + format!("{:.2} KiB", bytes as f64 / KIB as f64) + } else { + format!("{} B", bytes) + } +} + fn parse_region_id(s: &str) -> error::Result { if s.contains(':') { let parts: Vec<&str> = s.splitn(2, ':').collect(); @@ -201,6 +222,85 @@ fn parse_path_type(s: &str) -> error::Result { } } +/// Rewrites literal values in comparison expressions to match the column's arrow type. +struct LiteralTypeCaster { + schema: DFSchemaRef, +} + +impl TreeNodeRewriter for LiteralTypeCaster { + type Node = DfExpr; + + fn f_up(&mut self, expr: DfExpr) -> datafusion_common::Result> { + let DfExpr::BinaryExpr(BinaryExpr { left, op, right }) = &expr else { + return Ok(Transformed::no(expr)); + }; + + if !matches!( + op, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + ) { + return Ok(Transformed::no(expr)); + } + + let (col_expr, lit_expr, col_left) = match (left.as_ref(), right.as_ref()) { + (col @ DfExpr::Column(_), lit @ DfExpr::Literal(_, _)) => (col, lit, true), + (lit @ DfExpr::Literal(_, _), col @ DfExpr::Column(_)) => (col, lit, false), + _ => return Ok(Transformed::no(expr)), + }; + + let col_type = col_expr.get_type(self.schema.as_ref())?; + let DfExpr::Literal(scalar, _) = lit_expr else { + unreachable!() + }; + + if scalar.data_type() == col_type { + return Ok(Transformed::no(expr)); + } + + let lit_array = scalar.to_array()?; + let casted = compute::cast(lit_array.as_ref(), &col_type).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Failed to cast literal {:?} to {:?}: {}", + scalar, col_type, e + )) + })?; + let casted_scalar = ScalarValue::try_from_array(&casted, 0)?; + + let new_lit = DfExpr::Literal(casted_scalar, None); + let (new_left, new_right) = if col_left { + (left.clone(), Box::new(new_lit)) + } else { + (Box::new(new_lit), right.clone()) + }; + + Ok(Transformed::yes(DfExpr::BinaryExpr(BinaryExpr { + left: new_left, + op: *op, + right: new_right, + }))) + } +} + +fn convert_literal_types( + exprs: Vec, + schema: &DFSchemaRef, +) -> datafusion_common::Result> { + use datafusion_common::tree_node::TreeNode; + + let mut caster = LiteralTypeCaster { + schema: schema.clone(), + }; + exprs + .into_iter() + .map(|e| e.rewrite(&mut caster).map(|x| x.data)) + .collect() +} + fn resolve_filters( scan_config: &ScanConfig, metadata: &RegionMetadata, @@ -227,7 +327,7 @@ fn resolve_filters( .with_default_features() .build(); - filters + let exprs: Vec = filters .iter() .enumerate() .map(|(idx, filter)| { @@ -264,7 +364,15 @@ fn resolve_filters( .build() }) }) - .collect() + .collect::>>()?; + + let df_schema_ref = Arc::new(df_schema); + convert_literal_types(exprs, &df_schema_ref).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to convert filter expression types: {e}"), + } + .build() + }) } fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef { @@ -480,9 +588,9 @@ impl ScanbenchCommand { self.force_flat_format, ); - // Start profiling if pprof_file is specified + // Start profiling if pprof_file is specified (unless pprof_after_warmup is set) #[cfg(unix)] - let profiler_guard = if self.pprof_file.is_some() { + let mut profiler_guard = if self.pprof_file.is_some() && !self.pprof_after_warmup { println!("{} Starting profiling...", "⚡".yellow()); Some( pprof::ProfilerGuardBuilder::default() @@ -582,21 +690,29 @@ impl ScanbenchCommand { scan_futures.push(tokio::spawn(async move { let mut rows = 0u64; + let mut array_mem_size = 0u64; + let mut estimated_size = 0u64; while let Some(batch_result) = stream.next().await { match batch_result { Ok(batch) => { rows += batch.num_rows() as u64; + let df_batch = batch.df_record_batch(); + array_mem_size += df_batch.get_array_memory_size() as u64; + estimated_size += + mito2::memtable::record_batch_estimated_size(df_batch) as u64; } Err(e) => { return Err(BoxedError::new(e)); } } } - Ok::(rows) + Ok::<(u64, u64, u64), BoxedError>((rows, array_mem_size, estimated_size)) })); } let mut total_rows = 0u64; + let mut total_array_mem_size = 0u64; + let mut total_estimated_size = 0u64; while let Some(task) = scan_futures.next().await { let result = task .map_err(|e| { @@ -606,8 +722,11 @@ impl ScanbenchCommand { )) }) .context(error::BuildCliSnafu)?; - let rows = result.context(error::BuildCliSnafu)?; + let (rows, array_mem_size, estimated_size) = + result.context(error::BuildCliSnafu)?; total_rows += rows; + total_array_mem_size += array_mem_size; + total_estimated_size += estimated_size; } let elapsed = start.elapsed(); @@ -615,12 +734,40 @@ impl ScanbenchCommand { total_elapsed_all += elapsed; println!( - " [iter {}] {} rows in {:?} ({} partitions)", + " [iter {}] {} rows in {:?} ({} partitions), array_mem_size: {}, estimated_size: {}", iteration + 1, total_rows.to_string().cyan(), elapsed, num_partitions, + format_bytes(total_array_mem_size), + format_bytes(total_estimated_size), ); + + // Start profiling after the first iteration (warmup) if pprof_after_warmup is set + #[cfg(unix)] + if iteration == 0 + && self.pprof_after_warmup + && self.pprof_file.is_some() + && profiler_guard.is_none() + { + println!( + "{} Starting profiling after warmup iteration...", + "⚡".yellow() + ); + profiler_guard = Some( + pprof::ProfilerGuardBuilder::default() + .frequency(99) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Failed to start profiler: {e}"), + StatusCode::Unexpected, + )) + }) + .context(error::BuildCliSnafu)?, + ); + } } // Stop profiling and generate flamegraph if enabled @@ -673,11 +820,15 @@ impl ScanbenchCommand { #[cfg(test)] mod tests { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; use sqlparser::ast::{BinaryOperator, Expr}; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; - use super::{ScanConfig, resolve_projection}; + use super::{ScanConfig, resolve_filters, resolve_projection}; use crate::error; #[test] @@ -752,6 +903,50 @@ mod tests { } } + #[test] + fn test_resolve_filters_uint32_type_conversion() { + use api::v1::SemanticType; + + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "table_id", + ConcreteDataType::uint32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![1]); + let metadata = builder.build().unwrap(); + + let config = ScanConfig { + projection: None, + projection_names: None, + filters: Some(vec!["table_id = 1117".to_string()]), + series_row_selector: None, + }; + + let exprs = resolve_filters(&config, &metadata).unwrap(); + assert_eq!(exprs.len(), 1); + // The expression should contain a UInt32 literal after type conversion. + let expr_str = format!("{}", exprs[0]); + assert!( + expr_str.contains("UInt32(1117)"), + "Expected UInt32(1117) in expression, got: {expr_str}" + ); + } + #[test] fn test_parse_scan_config_filters() { let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#; diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 989aeb812b..4d09d84c73 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -24,7 +24,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::file::FileTimeRange; use crate::sst::index::IndexOutput; -pub(crate) mod file_range; +pub mod file_range; pub mod flat_format; pub mod format; pub(crate) mod helper;