feat: add a subcommand to bench scan (#7722)

* feat: support scan bench

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support projection by name

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support force flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: spawn tasks to poll streams

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support filter config

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: scan bench support wal

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: support not providing provider in wal

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip wal replay

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: wrap EngineComponents

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: add scanbench doc

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: change --skip-wal-replay to --enable-wal

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove limit from config

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-02-26 14:37:40 +08:00
committed by GitHub
parent 46683f908a
commit 0c30bf1a10
10 changed files with 1035 additions and 10 deletions

5
Cargo.lock generated
View File

@@ -2169,6 +2169,9 @@ dependencies = [
"common-time",
"common-version",
"common-wal",
"datafusion",
"datafusion-common",
"datafusion-physical-plan",
"datanode",
"datatypes",
"either",
@@ -2180,6 +2183,7 @@ dependencies = [
"human-panic",
"humantime",
"lazy_static",
"log-store",
"meta-client",
"meta-srv",
"metric-engine",
@@ -2202,6 +2206,7 @@ dependencies = [
"session",
"similar-asserts",
"snafu 0.8.6",
"sqlparser",
"standalone",
"store-api",
"table",

127
docs/scanbench.md Normal file
View File

@@ -0,0 +1,127 @@
# Scanbench Usage
`scanbench` benchmarks region scans directly from storage through:
```bash
greptime datanode scanbench ...
```
## Build
```bash
cargo build -p cmd --bin greptime
```
## Command
```bash
./target/debug/greptime datanode scanbench \
--config <CONFIG_TOML> \
--region-id <REGION_ID> \
--table-dir <TABLE_DIR> \
[--scanner <seq|unordered|series>] \
[--scan-config <SCAN_CONFIG_JSON>] \
[--parallelism <N>] \
[--iterations <N>] \
[--path-type <bare|data|metadata>] \
[--force-flat-format] \
[--enable-wal] \
[--pprof-file <FLAMEGRAPH_SVG>] \
[--verbose]
```
## Required Arguments
- `--config`: Datanode/standalone TOML config.
- `--region-id`: Region ID in one of:
- `<u64>` (example: `4398046511104`)
- `<table_id>:<region_number>` (example: `1024:0`)
- `--table-dir`: Table directory used in open request (example: `greptime/public/1024`).
## Optional Arguments
- `--scanner`: Scan strategy. Default: `seq`.
- `seq`: default scan
- `unordered`: time-windowed distribution
- `series`: per-series distribution
- `--scan-config`: JSON file to tune scan request.
- `--parallelism`: Simulated scan parallelism. Default: `1`.
- `--iterations`: Benchmark iterations. Default: `1`.
- `--path-type`: Region path type (`bare`, `data`, `metadata`). Default: `bare`.
- `--force-flat-format`: Force reading the region in flat format. Default: disabled.
- `--enable-wal`: Enable WAL replay when opening the region. Default: disabled. When enabled, scanbench uses the log store configured in the `[wal]` section of the config TOML (raft-engine or Kafka). When disabled or when no WAL is configured, a `NoopLogStore` is used.
- `--pprof-file`: Output flamegraph path (Unix only).
- `--verbose` / `-v`: Enable verbose output.
## Scan Config JSON
```json
{
"projection": [0, 1, 2],
"projection_names": ["host", "cpu"],
"filters": ["host = 'web-1'", "cpu > 80"],
"series_row_selector": "last_row"
}
```
Notes:
- All fields are optional.
- Use either `projection` (indexes) or `projection_names` (column names), not both.
- `projection_names` uses exact (case-sensitive) column name matching.
- `filters` is a list of SQL expressions (not full SQL statements), e.g. `"host = 'web-1'"`.
- `series_row_selector` currently supports only `"last_row"`.
## Examples
Default sequential scan:
```bash
./target/debug/greptime datanode scanbench \
--config /path/to/config.toml \
--region-id 1024:0 \
--table-dir greptime/public/1024
```
Unordered scan with parallelism:
```bash
./target/debug/greptime datanode scanbench \
--config /path/to/config.toml \
--region-id 1024:0 \
--table-dir greptime/public/1024 \
--scanner unordered \
--parallelism 8 \
--iterations 5
```
Series scan with scan config and flamegraph:
```bash
./target/debug/greptime datanode scanbench \
--config /path/to/config.toml \
--region-id 1024:0 \
--table-dir greptime/public/1024 \
--scanner series \
--scan-config /path/to/scan-config.json \
--pprof-file /tmp/scanbench.svg
```
Force flat-format read:
```bash
./target/debug/greptime datanode scanbench \
--config /path/to/config.toml \
--region-id 1024:0 \
--table-dir greptime/public/1024 \
--force-flat-format
```
Scan with WAL replay enabled (uses `[wal]` config from TOML):
```bash
./target/debug/greptime datanode scanbench \
--config /path/to/config.toml \
--region-id 1024:0 \
--table-dir greptime/public/1024 \
--enable-wal
```

View File

@@ -30,6 +30,9 @@ base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
datafusion-physical-plan.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
either = "1.15"
clap.workspace = true
cli.workspace = true
@@ -63,6 +66,7 @@ futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
log-store.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true
@@ -91,6 +95,7 @@ tokio.workspace = true
toml.workspace = true
tonic.workspace = true
tracing-appender.workspace = true
sqlparser.workspace = true
[target.'cfg(unix)'.dependencies]
pprof = { version = "0.14", features = [

View File

@@ -111,6 +111,7 @@ async fn start(cli: Command) -> Result<()> {
cmd.build_with(builder).await?.run().await
}
datanode::SubCommand::Objbench(ref bench) => bench.run().await,
datanode::SubCommand::Scanbench(ref bench) => bench.run().await,
},
SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)

View File

@@ -15,6 +15,8 @@
pub mod builder;
#[allow(clippy::print_stdout)]
mod objbench;
#[allow(clippy::print_stdout)]
mod scanbench;
use std::path::Path;
use std::time::Duration;
@@ -35,6 +37,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::App;
use crate::datanode::builder::InstanceBuilder;
use crate::datanode::objbench::ObjbenchCommand;
use crate::datanode::scanbench::ScanbenchCommand;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
@@ -105,9 +108,9 @@ impl Command {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
match &self.subcmd {
SubCommand::Start(cmd) => cmd.load_options(global_options),
SubCommand::Objbench(_) => {
// For objbench command, we don't need to load DatanodeOptions
// It's a standalone utility command
SubCommand::Objbench(_) | SubCommand::Scanbench(_) => {
// For bench commands, we don't need to load DatanodeOptions
// They are standalone utility commands
let mut opts = datanode::config::DatanodeOptions::default();
opts.sanitize();
Ok(DatanodeOptions {
@@ -125,6 +128,8 @@ pub enum SubCommand {
Start(StartCommand),
/// Object storage benchmark tool
Objbench(ObjbenchCommand),
/// Scan benchmark tool - benchmarks scanning a region directly from storage
Scanbench(ScanbenchCommand),
}
impl SubCommand {
@@ -138,6 +143,10 @@ impl SubCommand {
cmd.run().await?;
std::process::exit(0);
}
SubCommand::Scanbench(cmd) => {
cmd.run().await?;
std::process::exit(0);
}
}
}
}
@@ -157,6 +166,30 @@ pub struct StorageConfig {
struct StorageConfigWrapper {
storage: StorageConfig,
region_engine: Vec<RegionEngineConfig>,
#[serde(default, deserialize_with = "deserialize_wal_config")]
wal: DatanodeWalConfig,
}
/// Deserializes [`DatanodeWalConfig`], defaulting `provider` to `"raft_engine"` when
/// the `[wal]` section is present but omits it. This mirrors the behavior of the
/// datanode's layered config loader which merges over a default that already contains
/// `provider`.
fn deserialize_wal_config<'de, D>(
deserializer: D,
) -> std::result::Result<DatanodeWalConfig, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error as _;
let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
if !table.contains_key("provider") {
table.insert(
"provider".to_string(),
toml::Value::String("raft_engine".to_string()),
);
}
DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
}
#[derive(Debug, Parser, Default)]

View File

@@ -66,7 +66,13 @@ pub struct ObjbenchCommand {
pub pprof_file: Option<PathBuf>,
}
fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
pub(super) fn parse_config(
config_path: &PathBuf,
) -> error::Result<(
StorageConfig,
MitoConfig,
common_wal::config::DatanodeWalConfig,
)> {
let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", config_path.display()),
@@ -81,6 +87,7 @@ fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConf
.build()
})?;
let wal_config = store_cfg.wal;
let storage_config = store_cfg.storage;
let mito_engine_config = store_cfg
.region_engine
@@ -96,7 +103,7 @@ fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConf
.with_context(|| error::IllegalConfigSnafu {
msg: format!("Engine config not found in {:?}", config_path),
})?;
Ok((storage_config, mito_engine_config))
Ok((storage_config, mito_engine_config, wal_config))
}
impl ObjbenchCommand {
@@ -108,7 +115,7 @@ impl ObjbenchCommand {
println!("{}", "Starting objbench with config:".cyan().bold());
// Build object store from config
let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
let (store_cfg, mut mito_engine_config, _wal_config) = parse_config(&self.config)?;
let object_store = build_object_store(&store_cfg).await?;
println!("{} Object store initialized", "".green());
@@ -490,7 +497,7 @@ fn extract_region_metadata(
Ok(Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
pub(super) async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
store::new_object_store(sc.store.clone(), &sc.data_home)
.await
.map_err(|e| {
@@ -685,7 +692,7 @@ mod tests {
#[test]
fn test_parse_config() {
let path = "../../config/datanode.example.toml";
let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
let (storage, engine, _wal) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
assert_eq!(storage.data_home, "./greptimedb_data");
assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
}

View File

@@ -0,0 +1,765 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use clap::Parser;
use colored::Colorize;
use common_base::Plugins;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::SchemaMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_wal::config::DatanodeWalConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::Expr as DfExpr;
use datafusion_common::ToDFSchema;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::sst::file_ref::FileReferenceManager;
use moka::future::CacheBuilder;
use object_store::manager::ObjectStoreManager;
use object_store::util::normalize_dir;
use query::optimizer::parallelize_scan::ParallelizeScan;
use serde::Deserialize;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser as SqlParser;
use store_api::metadata::RegionMetadata;
use store_api::path_utils::WAL_DIR;
use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use tokio::fs;
use crate::datanode::objbench::{build_object_store, parse_config};
use crate::error;
/// Scan benchmark command - benchmarks scanning a region directly from storage.
#[derive(Debug, Parser)]
pub struct ScanbenchCommand {
/// Path to config TOML file (same format as standalone/datanode config)
#[clap(long, value_name = "FILE")]
config: PathBuf,
/// Region ID: either numeric u64 (e.g. "4398046511104") or "table_id:region_num" (e.g. "1024:0")
#[clap(long)]
region_id: String,
/// Table directory relative to data home (e.g. "data/greptime/public/1024/")
#[clap(long)]
table_dir: String,
/// Scanner type: seq, unordered, series
#[clap(long, default_value = "seq")]
scanner: String,
/// Path to scan request JSON config file (optional)
#[clap(long, value_name = "FILE")]
scan_config: Option<PathBuf>,
/// Number of partitions for parallel scan (simulates parallelism)
#[clap(long, default_value = "1")]
parallelism: usize,
/// Number of iterations for benchmarking
#[clap(long, default_value = "1")]
iterations: usize,
/// Path type for the region: bare, data, metadata
#[clap(long, default_value = "bare")]
path_type: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
verbose: bool,
/// Output pprof flamegraph
#[clap(long, value_name = "FILE")]
pprof_file: Option<PathBuf>,
/// Force reading the region in flat format.
#[clap(long, default_value_t = false)]
force_flat_format: bool,
/// Enable WAL replay when opening the region.
#[clap(long, default_value_t = false)]
enable_wal: bool,
}
/// JSON config for scan request parameters.
#[derive(Debug, Deserialize, Default)]
struct ScanConfig {
projection: Option<Vec<usize>>,
projection_names: Option<Vec<String>>,
filters: Option<Vec<String>>,
series_row_selector: Option<String>,
}
fn resolve_projection(
scan_config: &ScanConfig,
metadata: Option<&RegionMetadata>,
) -> error::Result<Option<Vec<usize>>> {
if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
return Err(error::IllegalConfigSnafu {
msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
}
.build());
}
if let Some(projection) = &scan_config.projection {
return Ok(Some(projection.clone()));
}
if let Some(projection_names) = &scan_config.projection_names {
let metadata = metadata.context(error::IllegalConfigSnafu {
msg: "Missing region metadata while resolving 'projection_names'".to_string(),
})?;
let available_columns = metadata
.column_metadatas
.iter()
.map(|column| column.column_schema.name.as_str())
.collect::<Vec<_>>()
.join(", ");
let projection = projection_names
.iter()
.map(|name| {
metadata
.column_index_by_name(name)
.with_context(|| error::IllegalConfigSnafu {
msg: format!(
"Unknown column '{}' in projection_names, available columns: [{}]",
name, available_columns
),
})
})
.collect::<error::Result<Vec<_>>>()?;
return Ok(Some(projection));
}
Ok(None)
}
fn parse_region_id(s: &str) -> error::Result<RegionId> {
if s.contains(':') {
let parts: Vec<&str> = s.splitn(2, ':').collect();
let table_id: u32 = parts[0].parse().map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid table_id in region_id '{}': {}", s, e),
}
.build()
})?;
let region_num: u32 = parts[1].parse().map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region_num in region_id '{}': {}", s, e),
}
.build()
})?;
Ok(RegionId::new(table_id, region_num))
} else {
let id: u64 = s.parse().map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region_id '{}': {}", s, e),
}
.build()
})?;
Ok(RegionId::from_u64(id))
}
}
fn parse_path_type(s: &str) -> error::Result<PathType> {
match s.to_lowercase().as_str() {
"bare" => Ok(PathType::Bare),
"data" => Ok(PathType::Data),
"metadata" => Ok(PathType::Metadata),
_ => Err(error::IllegalConfigSnafu {
msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
}
.build()),
}
}
fn resolve_filters(
scan_config: &ScanConfig,
metadata: &RegionMetadata,
) -> error::Result<Vec<DfExpr>> {
let Some(filters) = &scan_config.filters else {
return Ok(Vec::new());
};
let df_schema = metadata
.schema
.arrow_schema()
.clone()
.to_dfschema()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
}
.build()
})?;
let state = SessionStateBuilder::new()
.with_config(Default::default())
.with_runtime_env(Default::default())
.with_default_features()
.build();
filters
.iter()
.enumerate()
.map(|(idx, filter)| {
let mut parser = SqlParser::new(&GenericDialect {})
.try_with_sql(filter)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
}
.build()
})?;
let sql_expr = parser.parse_expr().map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
}
.build()
})?;
state
.create_logical_expr_from_sql_expr(
SqlExprWithAlias {
expr: sql_expr,
alias: None,
},
&df_schema,
)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!(
"Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
),
}
.build()
})
})
.collect()
}
fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
struct NoopPartitionExprFetcher;
#[async_trait::async_trait]
impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
None
}
}
Arc::new(NoopPartitionExprFetcher)
}
struct EngineComponents {
data_home: String,
mito_config: MitoConfig,
object_store_manager: Arc<ObjectStoreManager>,
schema_metadata_manager: Arc<SchemaMetadataManager>,
file_ref_manager: Arc<FileReferenceManager>,
partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
}
impl EngineComponents {
async fn build<S: store_api::logstore::LogStore>(
self,
log_store: Arc<S>,
) -> error::Result<MitoEngine> {
MitoEngine::new(
&self.data_home,
self.mito_config,
log_store,
self.object_store_manager,
self.schema_metadata_manager,
self.file_ref_manager,
self.partition_expr_fetcher,
Plugins::default(),
)
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)
}
}
fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_schema_cache = Arc::new(new_table_schema_cache(
"table_schema_name_cache".to_string(),
CacheBuilder::default().build(),
kv_backend.clone(),
));
let schema_cache = Arc::new(new_schema_cache(
"schema_cache".to_string(),
CacheBuilder::default().build(),
kv_backend.clone(),
));
Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
}
impl ScanbenchCommand {
pub async fn run(&self) -> error::Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting scanbench...".cyan().bold());
let region_id = parse_region_id(&self.region_id)?;
let path_type = parse_path_type(&self.path_type)?;
println!(
"{} Region ID: {} (u64: {})",
"".green(),
self.region_id,
region_id.as_u64()
);
// Parse config and build object store
let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
println!("{} Config parsed", "".green());
let object_store = build_object_store(&store_cfg).await?;
println!("{} Object store initialized", "".green());
let object_store_manager =
Arc::new(ObjectStoreManager::new("default", object_store.clone()));
// Create mock dependencies
let schema_metadata_manager = mock_schema_metadata_manager();
let file_ref_manager = Arc::new(FileReferenceManager::new(None));
let partition_expr_fetcher = noop_partition_expr_fetcher();
// Create MitoEngine with appropriate log store
let components = EngineComponents {
data_home: store_cfg.data_home.clone(),
mito_config,
object_store_manager,
schema_metadata_manager,
file_ref_manager,
partition_expr_fetcher,
};
let engine = match &wal_config {
DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
let data_home = normalize_dir(&store_cfg.data_home);
let wal_dir = match &raft_engine_config.dir {
Some(dir) => dir.clone(),
None => format!("{}{WAL_DIR}", data_home),
};
fs::create_dir_all(&wal_dir).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to create WAL directory {}: {e}", wal_dir),
}
.build()
})?;
let log_store = Arc::new(
RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?,
);
println!("{} Using RaftEngine WAL", "".green());
components.build(log_store).await?
}
DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
let log_store = Arc::new(
KafkaLogStore::try_new(kafka_config, None)
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?,
);
println!("{} Using Kafka WAL", "".green());
components.build(log_store).await?
}
_ => {
let log_store = Arc::new(NoopLogStore);
println!(
"{} Using NoopLogStore (enable_wal={})",
"".green(),
self.enable_wal
);
components.build(log_store).await?
}
};
// Open region
let open_request = RegionOpenRequest {
engine: "mito".to_string(),
table_dir: self.table_dir.clone(),
path_type,
options: HashMap::default(),
skip_wal_replay: !self.enable_wal,
checkpoint: None,
};
engine
.handle_request(region_id, RegionRequest::Open(open_request))
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
println!("{} Region opened", "".green());
// Load scan config
let scan_config = if let Some(path) = &self.scan_config {
let content = tokio::fs::read_to_string(path)
.await
.context(error::FileIoSnafu)?;
serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
} else {
ScanConfig::default()
};
let metadata = engine
.get_metadata(region_id)
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
let projection = resolve_projection(&scan_config, Some(&metadata))?;
let filters = resolve_filters(&scan_config, &metadata)?;
// Build scan request
let distribution = match self.scanner.as_str() {
"seq" => None,
"unordered" => Some(TimeSeriesDistribution::TimeWindowed),
"series" => Some(TimeSeriesDistribution::PerSeries),
other => {
return Err(error::IllegalConfigSnafu {
msg: format!(
"Unknown scanner type '{}', expected: seq, unordered, series",
other
),
}
.build());
}
};
let series_row_selector = match scan_config.series_row_selector.as_deref() {
Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
Some(other) => {
return Err(error::IllegalConfigSnafu {
msg: format!("Unknown series_row_selector '{}'", other),
}
.build());
}
None => None,
};
println!(
"{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}",
"".blue(),
self.scanner,
self.parallelism,
self.iterations,
self.force_flat_format,
);
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Failed to start profiler: {e}"),
StatusCode::Unexpected,
))
})
.context(error::BuildCliSnafu)?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let mut total_rows_all = 0u64;
let mut total_elapsed_all = std::time::Duration::ZERO;
for iteration in 0..self.iterations {
let request = ScanRequest {
projection: projection.clone(),
filters: filters.clone(),
series_row_selector,
distribution,
force_flat_format: self.force_flat_format,
..Default::default()
};
let start = Instant::now();
// Get scanner
let mut scanner = engine
.handle_query(region_id, request)
.await
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
// Get partition ranges and apply parallelism
let original_partitions = scanner.properties().partitions.clone();
let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
if self.verbose {
println!(
" {} Original partitions: {}, total ranges: {}",
"".blue(),
original_partitions.len(),
total_ranges
);
}
if self.parallelism > 1 {
// Flatten all ranges
let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
// Distribute ranges across partitions
let mut partitions =
ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
// Sort ranges within each partition by start time ascending
for partition in &mut partitions {
partition.sort_by(|a, b| a.start.cmp(&b.start));
}
scanner
.prepare(
PrepareRequest::default()
.with_ranges(partitions)
.with_target_partitions(self.parallelism),
)
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
}
// Scan all partitions
let num_partitions = scanner.properties().partitions.len();
let ctx = QueryScanContext::default();
let metrics_set = ExecutionPlanMetricsSet::new();
let mut scan_futures = FuturesUnordered::new();
for partition_idx in 0..num_partitions {
let mut stream = scanner
.scan_partition(&ctx, &metrics_set, partition_idx)
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
scan_futures.push(tokio::spawn(async move {
let mut rows = 0u64;
while let Some(batch_result) = stream.next().await {
match batch_result {
Ok(batch) => {
rows += batch.num_rows() as u64;
}
Err(e) => {
return Err(BoxedError::new(e));
}
}
}
Ok::<u64, BoxedError>(rows)
}));
}
let mut total_rows = 0u64;
while let Some(task) = scan_futures.next().await {
let result = task
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("scan task failed: {e}"),
StatusCode::Unexpected,
))
})
.context(error::BuildCliSnafu)?;
let rows = result.context(error::BuildCliSnafu)?;
total_rows += rows;
}
let elapsed = start.elapsed();
total_rows_all += total_rows;
total_elapsed_all += elapsed;
println!(
" [iter {}] {} rows in {:?} ({} partitions)",
iteration + 1,
total_rows.to_string().cyan(),
elapsed,
num_partitions,
);
}
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
println!(
"{}: Failed to write flamegraph to {}: {}",
"Error".red(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
}
}
}
// Summary
if self.iterations > 1 {
let avg_elapsed = total_elapsed_all / self.iterations as u32;
let avg_rows = total_rows_all / self.iterations as u64;
println!(
"\n{} Average: {} rows in {:?} over {} iterations",
"Summary".green().bold(),
avg_rows.to_string().cyan(),
avg_elapsed,
self.iterations,
);
}
println!("\n{}", "Benchmark completed!".green().bold());
Ok(())
}
}
#[cfg(test)]
mod tests {
use sqlparser::ast::{BinaryOperator, Expr};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use super::{ScanConfig, resolve_projection};
use crate::error;
#[test]
fn test_parse_scan_config_projection_names() {
let json = r#"{"projection_names":["host","ts"]}"#;
let config: ScanConfig = serde_json::from_str(json).unwrap();
assert_eq!(
config.projection_names,
Some(vec!["host".to_string(), "ts".to_string()])
);
assert_eq!(config.projection, None);
}
#[test]
fn test_resolve_projection_by_indexes() -> error::Result<()> {
let config = ScanConfig {
projection: Some(vec![0, 2]),
projection_names: None,
filters: None,
series_row_selector: None,
};
let projection = resolve_projection(&config, None)?;
assert_eq!(projection, Some(vec![0, 2]));
Ok(())
}
#[test]
fn test_resolve_projection_by_names_without_metadata() {
let config = ScanConfig {
projection: None,
projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
filters: None,
series_row_selector: None,
};
let err = resolve_projection(&config, None).unwrap_err();
assert!(
err.to_string()
.contains("Missing region metadata while resolving 'projection_names'")
);
}
#[test]
fn test_resolve_projection_conflict_fields() {
let config = ScanConfig {
projection: Some(vec![0]),
projection_names: Some(vec!["host".to_string()]),
filters: None,
series_row_selector: None,
};
let err = resolve_projection(&config, None).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("projection"));
assert!(msg.contains("projection_names"));
}
#[test]
fn test_sqlparser_parse_expr_string() {
let dialect = GenericDialect {};
let mut parser = Parser::new(&dialect)
.try_with_sql("host = 'web-1' AND cpu > 80")
.unwrap();
let expr = parser.parse_expr().unwrap();
match expr {
Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
other => panic!("expected BinaryOp, got: {other:?}"),
}
}
#[test]
fn test_parse_scan_config_filters() {
let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
let config: ScanConfig = serde_json::from_str(json).unwrap();
assert_eq!(
config.filters,
Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
);
}
}

View File

@@ -399,7 +399,8 @@ impl ScanRegion {
/// Returns true if the region use flat format.
fn use_flat_format(&self) -> bool {
self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
self.request.force_flat_format
|| self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
}
/// Creates a scan input.
@@ -1720,7 +1721,9 @@ mod tests {
use super::*;
use crate::memtable::time_partition::TimePartitions;
use crate::region::options::RegionOptions;
use crate::region::version::VersionBuilder;
use crate::sst::FormatType;
use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
use crate::test_util::scheduler_util::SchedulerEnv;
@@ -1734,6 +1737,27 @@ mod tests {
Arc::new(VersionBuilder::new(metadata, mutable).build())
}
fn new_version_with_sst_format(
metadata: RegionMetadataRef,
sst_format: Option<FormatType>,
) -> VersionRef {
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(EmptyMemtableBuilder::default()),
0,
None,
));
let options = RegionOptions {
sst_format,
..Default::default()
};
Arc::new(
VersionBuilder::new(metadata, mutable)
.options(options)
.build(),
)
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
@@ -1813,4 +1837,43 @@ mod tests {
// Projection order preserved, extra columns appended in schema order.
assert_eq!(vec![4, 1, 3], read_ids);
}
#[tokio::test]
async fn test_use_flat_format_honors_request_override() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let env = SchedulerEnv::new().await;
let primary_key_version =
new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
let request = ScanRequest::default();
let scan_region = ScanRegion::new(
primary_key_version.clone(),
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(!scan_region.use_flat_format());
let request = ScanRequest {
force_flat_format: true,
..Default::default()
};
let scan_region = ScanRegion::new(
primary_key_version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(scan_region.use_flat_format());
let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
let request = ScanRequest::default();
let scan_region = ScanRegion::new(
flat_version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
assert!(scan_region.use_flat_format());
}
}

View File

@@ -123,7 +123,7 @@ impl ParallelizeScan {
///
/// Currently we assign ranges to partitions according to their rows so each partition
/// has similar number of rows. This method always return `expected_partition_num` partitions.
fn assign_partition_range(
pub fn assign_partition_range(
mut ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {

View File

@@ -124,6 +124,8 @@ pub struct ScanRequest {
/// Optional hint for KNN vector search. When set, the scan should use
/// vector index to find the k nearest neighbors.
pub vector_search: Option<VectorSearchRequest>,
/// Whether to force reading region data in flat format.
pub force_flat_format: bool,
}
impl Display for ScanRequest {
@@ -206,6 +208,14 @@ impl Display for ScanRequest {
vector_search.metric
)?;
}
if self.force_flat_format {
write!(
f,
"{}force_flat_format: {}",
delimiter.as_str(),
self.force_flat_format
)?;
}
write!(f, " }}")
}
}
@@ -259,5 +269,14 @@ mod tests {
request.to_string(),
"ScanRequest { projection: [1, 2], limit: 10 }"
);
let request = ScanRequest {
force_flat_format: true,
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { force_flat_format: true }"
);
}
}