diff --git a/Cargo.lock b/Cargo.lock index a8af990b36..937c79194f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2169,6 +2169,9 @@ dependencies = [ "common-time", "common-version", "common-wal", + "datafusion", + "datafusion-common", + "datafusion-physical-plan", "datanode", "datatypes", "either", @@ -2180,6 +2183,7 @@ dependencies = [ "human-panic", "humantime", "lazy_static", + "log-store", "meta-client", "meta-srv", "metric-engine", @@ -2202,6 +2206,7 @@ dependencies = [ "session", "similar-asserts", "snafu 0.8.6", + "sqlparser", "standalone", "store-api", "table", diff --git a/docs/scanbench.md b/docs/scanbench.md new file mode 100644 index 0000000000..afe4734fee --- /dev/null +++ b/docs/scanbench.md @@ -0,0 +1,127 @@ +# Scanbench Usage + +`scanbench` benchmarks region scans directly from storage through: + +```bash +greptime datanode scanbench ... +``` + +## Build + +```bash +cargo build -p cmd --bin greptime +``` + +## Command + +```bash +./target/debug/greptime datanode scanbench \ + --config \ + --region-id \ + --table-dir \ + [--scanner ] \ + [--scan-config ] \ + [--parallelism ] \ + [--iterations ] \ + [--path-type ] \ + [--force-flat-format] \ + [--enable-wal] \ + [--pprof-file ] \ + [--verbose] +``` + +## Required Arguments + +- `--config`: Datanode/standalone TOML config. +- `--region-id`: Region ID in one of: + - `` (example: `4398046511104`) + - `:` (example: `1024:0`) +- `--table-dir`: Table directory used in open request (example: `greptime/public/1024`). + +## Optional Arguments + +- `--scanner`: Scan strategy. Default: `seq`. + - `seq`: default scan + - `unordered`: time-windowed distribution + - `series`: per-series distribution +- `--scan-config`: JSON file to tune scan request. +- `--parallelism`: Simulated scan parallelism. Default: `1`. +- `--iterations`: Benchmark iterations. Default: `1`. +- `--path-type`: Region path type (`bare`, `data`, `metadata`). Default: `bare`. +- `--force-flat-format`: Force reading the region in flat format. Default: disabled. +- `--enable-wal`: Enable WAL replay when opening the region. Default: disabled. When enabled, scanbench uses the log store configured in the `[wal]` section of the config TOML (raft-engine or Kafka). When disabled or when no WAL is configured, a `NoopLogStore` is used. +- `--pprof-file`: Output flamegraph path (Unix only). +- `--verbose` / `-v`: Enable verbose output. + +## Scan Config JSON + +```json +{ + "projection": [0, 1, 2], + "projection_names": ["host", "cpu"], + "filters": ["host = 'web-1'", "cpu > 80"], + "series_row_selector": "last_row" +} +``` + +Notes: +- All fields are optional. +- Use either `projection` (indexes) or `projection_names` (column names), not both. +- `projection_names` uses exact (case-sensitive) column name matching. +- `filters` is a list of SQL expressions (not full SQL statements), e.g. `"host = 'web-1'"`. +- `series_row_selector` currently supports only `"last_row"`. + +## Examples + +Default sequential scan: + +```bash +./target/debug/greptime datanode scanbench \ + --config /path/to/config.toml \ + --region-id 1024:0 \ + --table-dir greptime/public/1024 +``` + +Unordered scan with parallelism: + +```bash +./target/debug/greptime datanode scanbench \ + --config /path/to/config.toml \ + --region-id 1024:0 \ + --table-dir greptime/public/1024 \ + --scanner unordered \ + --parallelism 8 \ + --iterations 5 +``` + +Series scan with scan config and flamegraph: + +```bash +./target/debug/greptime datanode scanbench \ + --config /path/to/config.toml \ + --region-id 1024:0 \ + --table-dir greptime/public/1024 \ + --scanner series \ + --scan-config /path/to/scan-config.json \ + --pprof-file /tmp/scanbench.svg +``` + +Force flat-format read: + +```bash +./target/debug/greptime datanode scanbench \ + --config /path/to/config.toml \ + --region-id 1024:0 \ + --table-dir greptime/public/1024 \ + --force-flat-format +``` + +Scan with WAL replay enabled (uses `[wal]` config from TOML): + +```bash +./target/debug/greptime datanode scanbench \ + --config /path/to/config.toml \ + --region-id 1024:0 \ + --table-dir greptime/public/1024 \ + --enable-wal +``` diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index c1d573a177..f3a633bf17 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -30,6 +30,9 @@ base64.workspace = true cache.workspace = true catalog.workspace = true chrono.workspace = true +datafusion-physical-plan.workspace = true +datafusion.workspace = true +datafusion-common.workspace = true either = "1.15" clap.workspace = true cli.workspace = true @@ -63,6 +66,7 @@ futures.workspace = true human-panic = "2.0" humantime.workspace = true lazy_static.workspace = true +log-store.workspace = true meta-client.workspace = true meta-srv.workspace = true metric-engine.workspace = true @@ -91,6 +95,7 @@ tokio.workspace = true toml.workspace = true tonic.workspace = true tracing-appender.workspace = true +sqlparser.workspace = true [target.'cfg(unix)'.dependencies] pprof = { version = "0.14", features = [ diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index f6bbebf7fb..a34d8e0f38 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -111,6 +111,7 @@ async fn start(cli: Command) -> Result<()> { cmd.build_with(builder).await?.run().await } datanode::SubCommand::Objbench(ref bench) => bench.run().await, + datanode::SubCommand::Scanbench(ref bench) => bench.run().await, }, SubCommand::Flownode(cmd) => { cmd.build(cmd.load_options(&cli.global_options)?) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 9557bba6d8..06e2568b72 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -15,6 +15,8 @@ pub mod builder; #[allow(clippy::print_stdout)] mod objbench; +#[allow(clippy::print_stdout)] +mod scanbench; use std::path::Path; use std::time::Duration; @@ -35,6 +37,7 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::App; use crate::datanode::builder::InstanceBuilder; use crate::datanode::objbench::ObjbenchCommand; +use crate::datanode::scanbench::ScanbenchCommand; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, }; @@ -105,9 +108,9 @@ impl Command { pub fn load_options(&self, global_options: &GlobalOptions) -> Result { match &self.subcmd { SubCommand::Start(cmd) => cmd.load_options(global_options), - SubCommand::Objbench(_) => { - // For objbench command, we don't need to load DatanodeOptions - // It's a standalone utility command + SubCommand::Objbench(_) | SubCommand::Scanbench(_) => { + // For bench commands, we don't need to load DatanodeOptions + // They are standalone utility commands let mut opts = datanode::config::DatanodeOptions::default(); opts.sanitize(); Ok(DatanodeOptions { @@ -125,6 +128,8 @@ pub enum SubCommand { Start(StartCommand), /// Object storage benchmark tool Objbench(ObjbenchCommand), + /// Scan benchmark tool - benchmarks scanning a region directly from storage + Scanbench(ScanbenchCommand), } impl SubCommand { @@ -138,6 +143,10 @@ impl SubCommand { cmd.run().await?; std::process::exit(0); } + SubCommand::Scanbench(cmd) => { + cmd.run().await?; + std::process::exit(0); + } } } } @@ -157,6 +166,30 @@ pub struct StorageConfig { struct StorageConfigWrapper { storage: StorageConfig, region_engine: Vec, + #[serde(default, deserialize_with = "deserialize_wal_config")] + wal: DatanodeWalConfig, +} + +/// Deserializes [`DatanodeWalConfig`], defaulting `provider` to `"raft_engine"` when +/// the `[wal]` section is present but omits it. This mirrors the behavior of the +/// datanode's layered config loader which merges over a default that already contains +/// `provider`. +fn deserialize_wal_config<'de, D>( + deserializer: D, +) -> std::result::Result +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error as _; + + let mut table = ::deserialize(deserializer)?; + if !table.contains_key("provider") { + table.insert( + "provider".to_string(), + toml::Value::String("raft_engine".to_string()), + ); + } + DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom) } #[derive(Debug, Parser, Default)] diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 59e9a58d42..65e0a400e4 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -66,7 +66,13 @@ pub struct ObjbenchCommand { pub pprof_file: Option, } -fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> { +pub(super) fn parse_config( + config_path: &PathBuf, +) -> error::Result<( + StorageConfig, + MitoConfig, + common_wal::config::DatanodeWalConfig, +)> { let cfg_str = std::fs::read_to_string(config_path).map_err(|e| { error::IllegalConfigSnafu { msg: format!("failed to read config {}: {e}", config_path.display()), @@ -81,6 +87,7 @@ fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConf .build() })?; + let wal_config = store_cfg.wal; let storage_config = store_cfg.storage; let mito_engine_config = store_cfg .region_engine @@ -96,7 +103,7 @@ fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConf .with_context(|| error::IllegalConfigSnafu { msg: format!("Engine config not found in {:?}", config_path), })?; - Ok((storage_config, mito_engine_config)) + Ok((storage_config, mito_engine_config, wal_config)) } impl ObjbenchCommand { @@ -108,7 +115,7 @@ impl ObjbenchCommand { println!("{}", "Starting objbench with config:".cyan().bold()); // Build object store from config - let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?; + let (store_cfg, mut mito_engine_config, _wal_config) = parse_config(&self.config)?; let object_store = build_object_store(&store_cfg).await?; println!("{} Object store initialized", "✓".green()); @@ -490,7 +497,7 @@ fn extract_region_metadata( Ok(Arc::new(region)) } -async fn build_object_store(sc: &StorageConfig) -> error::Result { +pub(super) async fn build_object_store(sc: &StorageConfig) -> error::Result { store::new_object_store(sc.store.clone(), &sc.data_home) .await .map_err(|e| { @@ -685,7 +692,7 @@ mod tests { #[test] fn test_parse_config() { let path = "../../config/datanode.example.toml"; - let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap(); + let (storage, engine, _wal) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap(); assert_eq!(storage.data_home, "./greptimedb_data"); assert_eq!(engine.index.staging_size, ReadableSize::gb(2)); } diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs new file mode 100644 index 0000000000..657c97395a --- /dev/null +++ b/src/cmd/src/datanode/scanbench.rs @@ -0,0 +1,765 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use clap::Parser; +use colored::Colorize; +use common_base::Plugins; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_meta::cache::{new_schema_cache, new_table_schema_cache}; +use common_meta::key::SchemaMetadataManager; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_wal::config::DatanodeWalConfig; +use datafusion::execution::SessionStateBuilder; +use datafusion::logical_expr::Expr as DfExpr; +use datafusion_common::ToDFSchema; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::StreamExt; +use futures::stream::FuturesUnordered; +use log_store::kafka::log_store::KafkaLogStore; +use log_store::noop::log_store::NoopLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use mito2::config::MitoConfig; +use mito2::engine::MitoEngine; +use mito2::sst::file_ref::FileReferenceManager; +use moka::future::CacheBuilder; +use object_store::manager::ObjectStoreManager; +use object_store::util::normalize_dir; +use query::optimizer::parallelize_scan::ParallelizeScan; +use serde::Deserialize; +use snafu::{OptionExt, ResultExt}; +use sqlparser::ast::ExprWithAlias as SqlExprWithAlias; +use sqlparser::dialect::GenericDialect; +use sqlparser::parser::Parser as SqlParser; +use store_api::metadata::RegionMetadata; +use store_api::path_utils::WAL_DIR; +use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine}; +use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; +use tokio::fs; + +use crate::datanode::objbench::{build_object_store, parse_config}; +use crate::error; + +/// Scan benchmark command - benchmarks scanning a region directly from storage. +#[derive(Debug, Parser)] +pub struct ScanbenchCommand { + /// Path to config TOML file (same format as standalone/datanode config) + #[clap(long, value_name = "FILE")] + config: PathBuf, + + /// Region ID: either numeric u64 (e.g. "4398046511104") or "table_id:region_num" (e.g. "1024:0") + #[clap(long)] + region_id: String, + + /// Table directory relative to data home (e.g. "data/greptime/public/1024/") + #[clap(long)] + table_dir: String, + + /// Scanner type: seq, unordered, series + #[clap(long, default_value = "seq")] + scanner: String, + + /// Path to scan request JSON config file (optional) + #[clap(long, value_name = "FILE")] + scan_config: Option, + + /// Number of partitions for parallel scan (simulates parallelism) + #[clap(long, default_value = "1")] + parallelism: usize, + + /// Number of iterations for benchmarking + #[clap(long, default_value = "1")] + iterations: usize, + + /// Path type for the region: bare, data, metadata + #[clap(long, default_value = "bare")] + path_type: String, + + /// Verbose output + #[clap(short, long, default_value_t = false)] + verbose: bool, + + /// Output pprof flamegraph + #[clap(long, value_name = "FILE")] + pprof_file: Option, + + /// Force reading the region in flat format. + #[clap(long, default_value_t = false)] + force_flat_format: bool, + + /// Enable WAL replay when opening the region. + #[clap(long, default_value_t = false)] + enable_wal: bool, +} + +/// JSON config for scan request parameters. +#[derive(Debug, Deserialize, Default)] +struct ScanConfig { + projection: Option>, + projection_names: Option>, + filters: Option>, + series_row_selector: Option, +} + +fn resolve_projection( + scan_config: &ScanConfig, + metadata: Option<&RegionMetadata>, +) -> error::Result>> { + if scan_config.projection.is_some() && scan_config.projection_names.is_some() { + return Err(error::IllegalConfigSnafu { + msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(), + } + .build()); + } + + if let Some(projection) = &scan_config.projection { + return Ok(Some(projection.clone())); + } + + if let Some(projection_names) = &scan_config.projection_names { + let metadata = metadata.context(error::IllegalConfigSnafu { + msg: "Missing region metadata while resolving 'projection_names'".to_string(), + })?; + let available_columns = metadata + .column_metadatas + .iter() + .map(|column| column.column_schema.name.as_str()) + .collect::>() + .join(", "); + let projection = projection_names + .iter() + .map(|name| { + metadata + .column_index_by_name(name) + .with_context(|| error::IllegalConfigSnafu { + msg: format!( + "Unknown column '{}' in projection_names, available columns: [{}]", + name, available_columns + ), + }) + }) + .collect::>>()?; + return Ok(Some(projection)); + } + + Ok(None) +} + +fn parse_region_id(s: &str) -> error::Result { + if s.contains(':') { + let parts: Vec<&str> = s.splitn(2, ':').collect(); + let table_id: u32 = parts[0].parse().map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid table_id in region_id '{}': {}", s, e), + } + .build() + })?; + let region_num: u32 = parts[1].parse().map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid region_num in region_id '{}': {}", s, e), + } + .build() + })?; + Ok(RegionId::new(table_id, region_num)) + } else { + let id: u64 = s.parse().map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid region_id '{}': {}", s, e), + } + .build() + })?; + Ok(RegionId::from_u64(id)) + } +} + +fn parse_path_type(s: &str) -> error::Result { + match s.to_lowercase().as_str() { + "bare" => Ok(PathType::Bare), + "data" => Ok(PathType::Data), + "metadata" => Ok(PathType::Metadata), + _ => Err(error::IllegalConfigSnafu { + msg: format!("invalid path_type '{}', expected: bare, data, metadata", s), + } + .build()), + } +} + +fn resolve_filters( + scan_config: &ScanConfig, + metadata: &RegionMetadata, +) -> error::Result> { + let Some(filters) = &scan_config.filters else { + return Ok(Vec::new()); + }; + + let df_schema = metadata + .schema + .arrow_schema() + .clone() + .to_dfschema() + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to convert region schema to DataFusion schema: {e}"), + } + .build() + })?; + + let state = SessionStateBuilder::new() + .with_config(Default::default()) + .with_runtime_env(Default::default()) + .with_default_features() + .build(); + + filters + .iter() + .enumerate() + .map(|(idx, filter)| { + let mut parser = SqlParser::new(&GenericDialect {}) + .try_with_sql(filter) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"), + } + .build() + })?; + + let sql_expr = parser.parse_expr().map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"), + } + .build() + })?; + + state + .create_logical_expr_from_sql_expr( + SqlExprWithAlias { + expr: sql_expr, + alias: None, + }, + &df_schema, + ) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!( + "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}" + ), + } + .build() + }) + }) + .collect() +} + +fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef { + struct NoopPartitionExprFetcher; + + #[async_trait::async_trait] + impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher { + async fn fetch_expr(&self, _region_id: RegionId) -> Option { + None + } + } + + Arc::new(NoopPartitionExprFetcher) +} + +struct EngineComponents { + data_home: String, + mito_config: MitoConfig, + object_store_manager: Arc, + schema_metadata_manager: Arc, + file_ref_manager: Arc, + partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef, +} + +impl EngineComponents { + async fn build( + self, + log_store: Arc, + ) -> error::Result { + MitoEngine::new( + &self.data_home, + self.mito_config, + log_store, + self.object_store_manager, + self.schema_metadata_manager, + self.file_ref_manager, + self.partition_expr_fetcher, + Plugins::default(), + ) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu) + } +} + +fn mock_schema_metadata_manager() -> Arc { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_schema_cache = Arc::new(new_table_schema_cache( + "table_schema_name_cache".to_string(), + CacheBuilder::default().build(), + kv_backend.clone(), + )); + let schema_cache = Arc::new(new_schema_cache( + "schema_cache".to_string(), + CacheBuilder::default().build(), + kv_backend.clone(), + )); + Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache)) +} + +impl ScanbenchCommand { + pub async fn run(&self) -> error::Result<()> { + if self.verbose { + common_telemetry::init_default_ut_logging(); + } + + println!("{}", "Starting scanbench...".cyan().bold()); + + let region_id = parse_region_id(&self.region_id)?; + let path_type = parse_path_type(&self.path_type)?; + println!( + "{} Region ID: {} (u64: {})", + "✓".green(), + self.region_id, + region_id.as_u64() + ); + + // Parse config and build object store + let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?; + println!("{} Config parsed", "✓".green()); + + let object_store = build_object_store(&store_cfg).await?; + println!("{} Object store initialized", "✓".green()); + + let object_store_manager = + Arc::new(ObjectStoreManager::new("default", object_store.clone())); + + // Create mock dependencies + let schema_metadata_manager = mock_schema_metadata_manager(); + let file_ref_manager = Arc::new(FileReferenceManager::new(None)); + let partition_expr_fetcher = noop_partition_expr_fetcher(); + + // Create MitoEngine with appropriate log store + let components = EngineComponents { + data_home: store_cfg.data_home.clone(), + mito_config, + object_store_manager, + schema_metadata_manager, + file_ref_manager, + partition_expr_fetcher, + }; + + let engine = match &wal_config { + DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => { + let data_home = normalize_dir(&store_cfg.data_home); + let wal_dir = match &raft_engine_config.dir { + Some(dir) => dir.clone(), + None => format!("{}{WAL_DIR}", data_home), + }; + fs::create_dir_all(&wal_dir).await.map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("failed to create WAL directory {}: {e}", wal_dir), + } + .build() + })?; + let log_store = Arc::new( + RaftEngineLogStore::try_new(wal_dir, raft_engine_config) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?, + ); + println!("{} Using RaftEngine WAL", "✓".green()); + components.build(log_store).await? + } + DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => { + let log_store = Arc::new( + KafkaLogStore::try_new(kafka_config, None) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?, + ); + println!("{} Using Kafka WAL", "✓".green()); + components.build(log_store).await? + } + _ => { + let log_store = Arc::new(NoopLogStore); + println!( + "{} Using NoopLogStore (enable_wal={})", + "✓".green(), + self.enable_wal + ); + components.build(log_store).await? + } + }; + + // Open region + let open_request = RegionOpenRequest { + engine: "mito".to_string(), + table_dir: self.table_dir.clone(), + path_type, + options: HashMap::default(), + skip_wal_replay: !self.enable_wal, + checkpoint: None, + }; + + engine + .handle_request(region_id, RegionRequest::Open(open_request)) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + println!("{} Region opened", "✓".green()); + + // Load scan config + let scan_config = if let Some(path) = &self.scan_config { + let content = tokio::fs::read_to_string(path) + .await + .context(error::FileIoSnafu)?; + serde_json::from_str::(&content).context(error::SerdeJsonSnafu)? + } else { + ScanConfig::default() + }; + let metadata = engine + .get_metadata(region_id) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + let projection = resolve_projection(&scan_config, Some(&metadata))?; + let filters = resolve_filters(&scan_config, &metadata)?; + + // Build scan request + let distribution = match self.scanner.as_str() { + "seq" => None, + "unordered" => Some(TimeSeriesDistribution::TimeWindowed), + "series" => Some(TimeSeriesDistribution::PerSeries), + other => { + return Err(error::IllegalConfigSnafu { + msg: format!( + "Unknown scanner type '{}', expected: seq, unordered, series", + other + ), + } + .build()); + } + }; + + let series_row_selector = match scan_config.series_row_selector.as_deref() { + Some("last_row") => Some(TimeSeriesRowSelector::LastRow), + Some(other) => { + return Err(error::IllegalConfigSnafu { + msg: format!("Unknown series_row_selector '{}'", other), + } + .build()); + } + None => None, + }; + + println!( + "{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}", + "ℹ".blue(), + self.scanner, + self.parallelism, + self.iterations, + self.force_flat_format, + ); + + // Start profiling if pprof_file is specified + #[cfg(unix)] + let profiler_guard = if self.pprof_file.is_some() { + println!("{} Starting profiling...", "⚡".yellow()); + Some( + pprof::ProfilerGuardBuilder::default() + .frequency(99) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Failed to start profiler: {e}"), + StatusCode::Unexpected, + )) + }) + .context(error::BuildCliSnafu)?, + ) + } else { + None + }; + + #[cfg(not(unix))] + if self.pprof_file.is_some() { + eprintln!( + "{}: Profiling is not supported on this platform", + "Warning".yellow() + ); + } + + let mut total_rows_all = 0u64; + let mut total_elapsed_all = std::time::Duration::ZERO; + + for iteration in 0..self.iterations { + let request = ScanRequest { + projection: projection.clone(), + filters: filters.clone(), + series_row_selector, + distribution, + force_flat_format: self.force_flat_format, + ..Default::default() + }; + + let start = Instant::now(); + + // Get scanner + let mut scanner = engine + .handle_query(region_id, request) + .await + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + + // Get partition ranges and apply parallelism + let original_partitions = scanner.properties().partitions.clone(); + let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum(); + + if self.verbose { + println!( + " {} Original partitions: {}, total ranges: {}", + "ℹ".blue(), + original_partitions.len(), + total_ranges + ); + } + + if self.parallelism > 1 { + // Flatten all ranges + let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect(); + + // Distribute ranges across partitions + let mut partitions = + ParallelizeScan::assign_partition_range(all_ranges, self.parallelism); + + // Sort ranges within each partition by start time ascending + for partition in &mut partitions { + partition.sort_by(|a, b| a.start.cmp(&b.start)); + } + + scanner + .prepare( + PrepareRequest::default() + .with_ranges(partitions) + .with_target_partitions(self.parallelism), + ) + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + } + + // Scan all partitions + let num_partitions = scanner.properties().partitions.len(); + let ctx = QueryScanContext::default(); + let metrics_set = ExecutionPlanMetricsSet::new(); + + let mut scan_futures = FuturesUnordered::new(); + + for partition_idx in 0..num_partitions { + let mut stream = scanner + .scan_partition(&ctx, &metrics_set, partition_idx) + .map_err(BoxedError::new) + .context(error::BuildCliSnafu)?; + + scan_futures.push(tokio::spawn(async move { + let mut rows = 0u64; + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + rows += batch.num_rows() as u64; + } + Err(e) => { + return Err(BoxedError::new(e)); + } + } + } + Ok::(rows) + })); + } + + let mut total_rows = 0u64; + while let Some(task) = scan_futures.next().await { + let result = task + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("scan task failed: {e}"), + StatusCode::Unexpected, + )) + }) + .context(error::BuildCliSnafu)?; + let rows = result.context(error::BuildCliSnafu)?; + total_rows += rows; + } + + let elapsed = start.elapsed(); + total_rows_all += total_rows; + total_elapsed_all += elapsed; + + println!( + " [iter {}] {} rows in {:?} ({} partitions)", + iteration + 1, + total_rows.to_string().cyan(), + elapsed, + num_partitions, + ); + } + + // Stop profiling and generate flamegraph if enabled + #[cfg(unix)] + if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) { + println!("{} Generating flamegraph...", "🔥".yellow()); + match guard.report().build() { + Ok(report) => { + let mut flamegraph_data = Vec::new(); + if let Err(e) = report.flamegraph(&mut flamegraph_data) { + println!("{}: Failed to generate flamegraph: {}", "Error".red(), e); + } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) { + println!( + "{}: Failed to write flamegraph to {}: {}", + "Error".red(), + pprof_file.display(), + e + ); + } else { + println!( + "{} Flamegraph saved to {}", + "✓".green(), + pprof_file.display().to_string().cyan() + ); + } + } + Err(e) => { + println!("{}: Failed to generate pprof report: {}", "Error".red(), e); + } + } + } + + // Summary + if self.iterations > 1 { + let avg_elapsed = total_elapsed_all / self.iterations as u32; + let avg_rows = total_rows_all / self.iterations as u64; + println!( + "\n{} Average: {} rows in {:?} over {} iterations", + "Summary".green().bold(), + avg_rows.to_string().cyan(), + avg_elapsed, + self.iterations, + ); + } + + println!("\n{}", "Benchmark completed!".green().bold()); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use sqlparser::ast::{BinaryOperator, Expr}; + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + + use super::{ScanConfig, resolve_projection}; + use crate::error; + + #[test] + fn test_parse_scan_config_projection_names() { + let json = r#"{"projection_names":["host","ts"]}"#; + let config: ScanConfig = serde_json::from_str(json).unwrap(); + + assert_eq!( + config.projection_names, + Some(vec!["host".to_string(), "ts".to_string()]) + ); + assert_eq!(config.projection, None); + } + + #[test] + fn test_resolve_projection_by_indexes() -> error::Result<()> { + let config = ScanConfig { + projection: Some(vec![0, 2]), + projection_names: None, + filters: None, + series_row_selector: None, + }; + + let projection = resolve_projection(&config, None)?; + assert_eq!(projection, Some(vec![0, 2])); + Ok(()) + } + + #[test] + fn test_resolve_projection_by_names_without_metadata() { + let config = ScanConfig { + projection: None, + projection_names: Some(vec!["cpu".to_string(), "host".to_string()]), + filters: None, + series_row_selector: None, + }; + + let err = resolve_projection(&config, None).unwrap_err(); + assert!( + err.to_string() + .contains("Missing region metadata while resolving 'projection_names'") + ); + } + + #[test] + fn test_resolve_projection_conflict_fields() { + let config = ScanConfig { + projection: Some(vec![0]), + projection_names: Some(vec!["host".to_string()]), + filters: None, + series_row_selector: None, + }; + + let err = resolve_projection(&config, None).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("projection")); + assert!(msg.contains("projection_names")); + } + + #[test] + fn test_sqlparser_parse_expr_string() { + let dialect = GenericDialect {}; + let mut parser = Parser::new(&dialect) + .try_with_sql("host = 'web-1' AND cpu > 80") + .unwrap(); + + let expr = parser.parse_expr().unwrap(); + + match expr { + Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And), + other => panic!("expected BinaryOp, got: {other:?}"), + } + } + + #[test] + fn test_parse_scan_config_filters() { + let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#; + let config: ScanConfig = serde_json::from_str(json).unwrap(); + + assert_eq!( + config.filters, + Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()]) + ); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index bc8bb987d7..4f4ec28d8d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -399,7 +399,8 @@ impl ScanRegion { /// Returns true if the region use flat format. fn use_flat_format(&self) -> bool { - self.version.options.sst_format.unwrap_or_default() == FormatType::Flat + self.request.force_flat_format + || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat } /// Creates a scan input. @@ -1720,7 +1721,9 @@ mod tests { use super::*; use crate::memtable::time_partition::TimePartitions; + use crate::region::options::RegionOptions; use crate::region::version::VersionBuilder; + use crate::sst::FormatType; use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key}; use crate::test_util::scheduler_util::SchedulerEnv; @@ -1734,6 +1737,27 @@ mod tests { Arc::new(VersionBuilder::new(metadata, mutable).build()) } + fn new_version_with_sst_format( + metadata: RegionMetadataRef, + sst_format: Option, + ) -> VersionRef { + let mutable = Arc::new(TimePartitions::new( + metadata.clone(), + Arc::new(EmptyMemtableBuilder::default()), + 0, + None, + )); + let options = RegionOptions { + sst_format, + ..Default::default() + }; + Arc::new( + VersionBuilder::new(metadata, mutable) + .options(options) + .build(), + ) + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -1813,4 +1837,43 @@ mod tests { // Projection order preserved, extra columns appended in schema order. assert_eq!(vec![4, 1, 3], read_ids); } + + #[tokio::test] + async fn test_use_flat_format_honors_request_override() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let env = SchedulerEnv::new().await; + + let primary_key_version = + new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey)); + let request = ScanRequest::default(); + let scan_region = ScanRegion::new( + primary_key_version.clone(), + env.access_layer.clone(), + request, + CacheStrategy::Disabled, + ); + assert!(!scan_region.use_flat_format()); + + let request = ScanRequest { + force_flat_format: true, + ..Default::default() + }; + let scan_region = ScanRegion::new( + primary_key_version, + env.access_layer.clone(), + request, + CacheStrategy::Disabled, + ); + assert!(scan_region.use_flat_format()); + + let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat)); + let request = ScanRequest::default(); + let scan_region = ScanRegion::new( + flat_version, + env.access_layer.clone(), + request, + CacheStrategy::Disabled, + ); + assert!(scan_region.use_flat_format()); + } } diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 26573df758..171fc6e919 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -123,7 +123,7 @@ impl ParallelizeScan { /// /// Currently we assign ranges to partitions according to their rows so each partition /// has similar number of rows. This method always return `expected_partition_num` partitions. - fn assign_partition_range( + pub fn assign_partition_range( mut ranges: Vec, expected_partition_num: usize, ) -> Vec> { diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index e538127e73..6725de92e3 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -124,6 +124,8 @@ pub struct ScanRequest { /// Optional hint for KNN vector search. When set, the scan should use /// vector index to find the k nearest neighbors. pub vector_search: Option, + /// Whether to force reading region data in flat format. + pub force_flat_format: bool, } impl Display for ScanRequest { @@ -206,6 +208,14 @@ impl Display for ScanRequest { vector_search.metric )?; } + if self.force_flat_format { + write!( + f, + "{}force_flat_format: {}", + delimiter.as_str(), + self.force_flat_format + )?; + } write!(f, " }}") } } @@ -259,5 +269,14 @@ mod tests { request.to_string(), "ScanRequest { projection: [1, 2], limit: 10 }" ); + + let request = ScanRequest { + force_flat_format: true, + ..Default::default() + }; + assert_eq!( + request.to_string(), + "ScanRequest { force_flat_format: true }" + ); } }