From b007f8598633e2ad6a5ec9b226e1dbe02a6b083d Mon Sep 17 00:00:00 2001 From: maximk777 Date: Mon, 16 Mar 2026 12:10:33 +0500 Subject: [PATCH 1/7] feat(http): improve error logging with client IP (#7503) * feat(http): improve error logging with client IP - Add logging to ErrorResponse::from_error_message() - Add middleware to log HTTP errors with client IP Closes #7328 Signed-off-by: maximk777 * fix(http): address review comments for error logging Restore rich Debug logging in from_error(), add URI/method/matched path to client IP middleware, and only log when client address is available. Signed-off-by: evenyag --------- Signed-off-by: maximk777 Signed-off-by: evenyag Co-authored-by: evenyag --- src/servers/src/http.rs | 8 +- src/servers/src/http/client_ip.rs | 109 ++++++++++++++++++++ src/servers/src/http/result/error_result.rs | 13 ++- 3 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 src/servers/src/http/client_ip.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ffd0745041..506a240cac 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -112,8 +112,8 @@ pub mod utils; use result::HttpOutputWriter; pub(crate) use timeout::DynamicTimeoutLayer; +mod client_ip; use crate::prom_remote_write::validation::PromValidationMode; - mod hints; mod read_preference; #[cfg(any(test, feature = "testing"))] @@ -883,6 +883,7 @@ impl HttpServer { authorize::check_http_auth, )) .layer(middleware::from_fn(hints::extract_hints)) + .layer(middleware::from_fn(client_ip::log_error_with_client_ip)) .layer(middleware::from_fn( read_preference::extract_read_preference, )), @@ -1247,7 +1248,10 @@ impl Server for HttpServer { error!(e; "Failed to set TCP_NODELAY on incoming connection"); } }); - let serve = axum::serve(listener, app.into_make_service()); + let serve = axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ); // FIXME(yingwen): Support keepalive. // See: diff --git a/src/servers/src/http/client_ip.rs b/src/servers/src/http/client_ip.rs new file mode 100644 index 0000000000..70df554ebb --- /dev/null +++ b/src/servers/src/http/client_ip.rs @@ -0,0 +1,109 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::SocketAddr; + +use axum::body::Body; +use axum::extract::{ConnectInfo, MatchedPath}; +use axum::http::Request; +use axum::middleware::Next; +use axum::response::Response; +use common_telemetry::warn; + +/// Middleware that logs HTTP error responses (4xx/5xx) with client IP address. +/// +/// Extracts client address from [`ConnectInfo`] if available. +pub async fn log_error_with_client_ip(req: Request, next: Next) -> Response { + let request_info = req + .extensions() + .get::>() + .map(|c| c.0) + .map(|addr| { + let method = req.method().clone(); + let uri = req.uri().clone(); + let matched_path = req.extensions().get::().cloned(); + (addr, method, uri, matched_path) + }); + + let response = next.run(req).await; + + if (response.status().is_client_error() || response.status().is_server_error()) + && let Some((addr, method, uri, matched_path)) = request_info + { + warn!( + "HTTP error response {} for {} {} (matched: {}) from client {}", + response.status(), + method, + uri, + matched_path + .as_ref() + .map(|p| p.as_str()) + .unwrap_or(""), + addr + ); + } + + response +} + +#[cfg(test)] +mod tests { + use axum::Router; + use axum::routing::get; + use http::StatusCode; + use tower::ServiceExt; + + use super::*; + + #[tokio::test] + async fn test_middleware_passes_error_response() { + async fn not_found_handler() -> StatusCode { + StatusCode::NOT_FOUND + } + + let app = Router::new() + .route("/not-found", get(not_found_handler)) + .layer(axum::middleware::from_fn(log_error_with_client_ip)); + + let response = app + .oneshot( + Request::builder() + .uri("/not-found") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn test_middleware_passes_success_response() { + async fn ok_handler() -> StatusCode { + StatusCode::OK + } + + let app = Router::new() + .route("/ok", get(ok_handler)) + .layer(axum::middleware::from_fn(log_error_with_client_ip)); + + let response = app + .oneshot(Request::builder().uri("/ok").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/src/servers/src/http/result/error_result.rs b/src/servers/src/http/result/error_result.rs index 7b70066b68..9bd6e1a7a3 100644 --- a/src/servers/src/http/result/error_result.rs +++ b/src/servers/src/http/result/error_result.rs @@ -32,17 +32,24 @@ pub struct ErrorResponse { impl ErrorResponse { pub fn from_error(error: impl ErrorExt) -> Self { let code = error.status_code(); - if code.should_log_error() { error!(error; "Failed to handle HTTP request"); } else { debug!("Failed to handle HTTP request, err: {:?}", error); } - - Self::from_error_message(code, error.output_msg()) + ErrorResponse { + code: code as u32, + error: error.output_msg(), + execution_time_ms: 0, + } } pub fn from_error_message(code: StatusCode, msg: String) -> Self { + if code.should_log_error() { + error!("Failed to handle HTTP request: {}", msg); + } else { + debug!("Failed to handle HTTP request: {}", msg); + } ErrorResponse { code: code as u32, error: msg, From be4a7a6d371f29377bcc8acfa2c1f1a24b31d7e1 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 16 Mar 2026 15:49:31 +0800 Subject: [PATCH 2/7] refactor: remove Memtable::iter (#7809) * refactor: remove Memtable::iter Signed-off-by: Lei, HUANG * fix: review comments Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/benches/memtable_bench.rs | 20 ++++- src/mito2/benches/simple_bulk_memtable.rs | 8 +- src/mito2/src/memtable.rs | 30 +++---- src/mito2/src/memtable/bulk.rs | 10 --- src/mito2/src/memtable/partition_tree.rs | 85 +++++++----------- .../src/memtable/simple_bulk_memtable.rs | 68 +++++++++----- .../simple_bulk_memtable/test_only.rs | 88 +------------------ src/mito2/src/memtable/time_partition.rs | 62 +++++++++++-- src/mito2/src/memtable/time_series.rs | 55 ++++-------- src/mito2/src/test_util/memtable_util.rs | 10 --- 10 files changed, 184 insertions(+), 252 deletions(-) diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index ebe994f861..df991f6f92 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -28,7 +28,7 @@ use mito2::memtable::bulk::part_reader::BulkPartBatchIter; use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig}; use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; -use mito2::memtable::{KeyValues, Memtable, RangesOptions}; +use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions}; use mito2::read::flat_merge::FlatMergeIterator; use mito2::read::scan_region::PredicateGroup; use mito2::region::options::MergeMode; @@ -105,7 +105,11 @@ fn full_scan(c: &mut Criterion) { } b.iter(|| { - let iter = memtable.iter(None, None, None).unwrap(); + let iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); for batch in iter { let _batch = batch.unwrap(); } @@ -145,7 +149,17 @@ fn filter_1_host(c: &mut Criterion) { let predicate = generator.random_host_filter(); b.iter(|| { - let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap(); + let iter = memtable + .ranges( + None, + RangesOptions { + predicate: PredicateGroup::new(&metadata, predicate.exprs()).unwrap(), + ..Default::default() + }, + ) + .unwrap() + .build(None) + .unwrap(); for batch in iter { let _batch = batch.unwrap(); } diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index 0277397768..05035734de 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -21,7 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable; -use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions}; +use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions}; use mito2::read; use mito2::read::Source; use mito2::read::dedup::DedupReader; @@ -156,7 +156,11 @@ async fn flush(mem: &SimpleBulkMemtable) { } async fn flush_original(mem: &SimpleBulkMemtable) { - let iter = mem.iter(None, None, None).unwrap(); + let iter = mem + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); for b in iter { black_box(b.unwrap()); } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index c39bbfa346..7494ec68ed 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -28,6 +28,7 @@ use mito_codec::key_values::KeyValue; pub use mito_codec::key_values::KeyValues; use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec}; use serde::{Deserialize, Serialize}; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber, SequenceRange}; @@ -231,10 +232,17 @@ impl MemtableRanges { impl IterBuilder for MemtableRanges { fn build(&self, _metrics: Option) -> Result { - UnsupportedOperationSnafu { - err_msg: "MemtableRanges does not support build iterator", - } - .fail() + ensure!( + self.ranges.len() == 1, + UnsupportedOperationSnafu { + err_msg: format!( + "Building an iterator from MemtableRanges expects 1 range, but got {}", + self.ranges.len() + ), + } + ); + + self.ranges.values().next().unwrap().build_iter() } fn is_record_batch(&self) -> bool { @@ -256,20 +264,6 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Writes an encoded batch of into memtable. fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>; - /// Scans the memtable. - /// `projection` selects columns to read, `None` means reading all columns. - /// `filters` are the predicates to be pushed down to memtable. - /// - /// # Note - /// This method should only be used for tests. - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - projection: Option<&[ColumnId]>, - predicate: Option, - sequence: Option, - ) -> Result; - /// Returns the ranges in the memtable. /// /// The returned map contains the range id and the range after applying the predicate. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 6056a42013..4dad4fb885 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -462,16 +462,6 @@ impl Memtable for BulkMemtable { Ok(()) } - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - _projection: Option<&[ColumnId]>, - _predicate: Option, - _sequence: Option, - ) -> Result { - todo!() - } - fn ranges( &self, projection: Option<&[ColumnId]>, diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index febae46784..662bfd99f6 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -177,16 +177,6 @@ impl Memtable for PartitionTreeMemtable { .fail() } - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - projection: Option<&[ColumnId]>, - predicate: Option, - sequence: Option, - ) -> Result { - self.tree.read(projection, predicate, sequence, None) - } - fn ranges( &self, projection: Option<&[ColumnId]>, @@ -396,8 +386,6 @@ mod tests { use api::v1::{Mutation, OpType, Rows, SemanticType}; use common_query::prelude::{greptime_timestamp, greptime_value}; use common_time::Timestamp; - use datafusion_common::Column; - use datafusion_expr::{BinaryExpr, Expr, Literal, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::Vector; use datatypes::scalars::ScalarVector; @@ -548,7 +536,10 @@ mod tests { let expect = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), None, None).unwrap(); + let ranges = memtable + .ranges(Some(&[3]), RangesOptions::default()) + .unwrap(); + let iter = ranges.build(None).unwrap(); let mut v0_all = vec![]; for res in iter { @@ -625,41 +616,6 @@ mod tests { assert_eq!(expect, read); } - #[test] - fn test_memtable_filter() { - let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false)); - // Try to build a memtable via the builder. - let memtable = PartitionTreeMemtableBuilder::new( - PartitionTreeConfig { - index_max_keys_per_shard: 40, - ..Default::default() - }, - None, - ) - .build(1, &metadata); - - for i in 0..100 { - let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); - let kvs = - memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1); - memtable.write(&kvs).unwrap(); - } - - for i in 0..100 { - let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect(); - let expr = Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column::from_name("k1"))), - op: Operator::Eq, - right: Box::new((i as u32).lit()), - }); - let iter = memtable - .iter(None, Some(Predicate::new(vec![expr])), None) - .unwrap(); - let read = collect_iter_timestamps(iter); - assert_eq!(timestamps, read); - } - } - #[test] fn test_deserialize_config() { let config = PartitionTreeConfig { @@ -811,7 +767,11 @@ mod tests { )) .unwrap(); - let mut reader = new_memtable.iter(None, None, None).unwrap(); + let mut reader = new_memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = reader.next().unwrap().unwrap(); let pk = codec.decode(batch.primary_key()).unwrap().into_dense(); if let Value::String(s) = &pk[2] { @@ -916,7 +876,14 @@ mod tests { .unwrap(); memtable.freeze().unwrap(); assert_eq!( - collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata), + collect_kvs( + memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(), + &metadata + ), ('a'..'h').map(|c| (c.to_string(), c.to_string())).collect() ); let forked = memtable.fork(2, &metadata); @@ -925,7 +892,14 @@ mod tests { forked.write(&key_values(&metadata, keys.iter())).unwrap(); forked.freeze().unwrap(); assert_eq!( - collect_kvs(forked.iter(None, None, None).unwrap(), &metadata), + collect_kvs( + forked + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(), + &metadata + ), keys.iter() .map(|c| (c.to_string(), c.to_string())) .collect() @@ -936,7 +910,14 @@ mod tests { let keys = ["g", "e", "a", "f", "b", "c", "h"]; forked2.write(&key_values(&metadata, keys.iter())).unwrap(); - let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata); + let kvs = collect_kvs( + forked2 + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(), + &metadata, + ); let expected = keys .iter() .map(|c| (c.to_string(), c.to_string())) diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 4dcaa2bac0..6d91f00361 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -213,22 +213,6 @@ impl Memtable for SimpleBulkMemtable { Ok(()) } - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - projection: Option<&[ColumnId]>, - _predicate: Option, - sequence: Option, - ) -> error::Result { - let iter = self.create_iter(projection, sequence)?.build(None)?; - if self.merge_mode == MergeMode::LastNonNull { - let iter = LastNonNullIter::new(iter); - Ok(Box::new(iter)) - } else { - Ok(Box::new(iter)) - } - } - fn ranges( &self, projection: Option<&[ColumnId]>, @@ -526,7 +510,11 @@ mod tests { )) .unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(2, batch.num_rows()); assert_eq!(2, batch.fields().len()); @@ -551,7 +539,11 @@ mod tests { )) .unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); assert_eq!(2, batch.fields().len()); @@ -565,7 +557,11 @@ mod tests { // Only project column 2 (f1) let projection = vec![2]; - let mut iter = memtable.iter(Some(&projection), None, None).unwrap(); + let mut iter = memtable + .ranges(Some(&projection), RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); @@ -592,7 +588,11 @@ mod tests { OpType::Put, )) .unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); // deduped to 1 row @@ -611,7 +611,11 @@ mod tests { let kv = kvs.iter().next().unwrap(); memtable.write_one(kv).unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); } @@ -745,7 +749,11 @@ mod tests { }; memtable.write_bulk(part).unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(2, batch.num_rows()); @@ -764,7 +772,11 @@ mod tests { OpType::Put, ); memtable.write(&kvs).unwrap(); - let mut iter = memtable.iter(None, None, None).unwrap(); + let mut iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(3, batch.num_rows()); assert_eq!( @@ -854,7 +866,15 @@ mod tests { // Filter with sequence 0 should only return first write let mut iter = memtable - .iter(None, None, Some(SequenceRange::LtEq { max: 0 })) + .ranges( + None, + RangesOptions { + sequence: Some(SequenceRange::LtEq { max: 0 }), + ..Default::default() + }, + ) + .unwrap() + .build(None) .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); diff --git a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs index b71a86c554..08edebdbb2 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs @@ -12,98 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; -use std::time::Instant; - use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceRange}; -use crate::error; -use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable}; -use crate::memtable::time_series::Values; -use crate::memtable::{BoxedBatchIterator, IterBuilder, MemScanMetrics}; -use crate::read::dedup::LastNonNullIter; -use crate::region::options::MergeMode; +use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable; impl SimpleBulkMemtable { pub fn region_metadata(&self) -> RegionMetadataRef { self.region_metadata.clone() } - - pub(crate) fn create_iter( - &self, - projection: Option<&[ColumnId]>, - sequence: Option, - ) -> error::Result { - let mut series = self.series.write().unwrap(); - - let values = if series.is_empty() { - None - } else { - Some(series.compact(&self.region_metadata)?.clone()) - }; - let projection = self.build_projection(projection); - Ok(BatchIterBuilderDeprecated { - region_metadata: self.region_metadata.clone(), - values, - projection, - dedup: self.dedup, - sequence, - merge_mode: self.merge_mode, - }) - } -} - -#[derive(Clone)] -pub(crate) struct BatchIterBuilderDeprecated { - region_metadata: RegionMetadataRef, - values: Option, - projection: HashSet, - sequence: Option, - dedup: bool, - merge_mode: MergeMode, -} - -impl IterBuilder for BatchIterBuilderDeprecated { - fn build(&self, metrics: Option) -> error::Result { - let start_time = Instant::now(); - let Some(values) = self.values.clone() else { - return Ok(Box::new(Iter { batch: None })); - }; - - let maybe_batch = values - .to_batch( - &[], - &self.region_metadata, - &self.projection, - self.sequence, - self.dedup, - self.merge_mode, - ) - .map(Some) - .transpose(); - - // Collect metrics from the batch - if let Some(metrics) = metrics { - let (num_rows, num_batches) = match &maybe_batch { - Some(Ok(batch)) => (batch.num_rows(), 1), - _ => (0, 0), - }; - let inner = crate::memtable::MemScanMetricsData { - total_series: 1, - num_rows, - num_batches, - scan_cost: start_time.elapsed(), - }; - metrics.merge_inner(&inner); - } - - let iter = Iter { batch: maybe_batch }; - - if self.merge_mode == MergeMode::LastNonNull { - Ok(Box::new(LastNonNullIter::new(iter))) - } else { - Ok(Box::new(iter)) - } - } } diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 6f11c813cb..ee695aceb8 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -827,6 +827,7 @@ mod tests { use super::*; use crate::memtable::partition_tree::PartitionTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; + use crate::memtable::{IterBuilder, RangesOptions}; use crate::test_util::memtable_util::{self, collect_iter_timestamps}; #[test] @@ -852,7 +853,11 @@ mod tests { partitions.list_memtables(&mut memtables); assert_eq!(0, memtables[0].id()); - let iter = memtables[0].iter(None, None, None).unwrap(); + let iter = memtables[0] + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]); } @@ -890,7 +895,11 @@ mod tests { let mut memtables = Vec::new(); partitions.list_memtables(&mut memtables); - let iter = memtables[0].iter(None, None, None).unwrap(); + let iter = memtables[0] + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]); let parts = partitions.list_partitions(); @@ -943,7 +952,12 @@ mod tests { let partitions = new_multi_partitions(&metadata); let parts = partitions.list_partitions(); - let iter = parts[0].memtable.iter(None, None, None).unwrap(); + let iter = parts[0] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(0, parts[0].memtable.id()); assert_eq!( @@ -955,7 +969,12 @@ mod tests { parts[0].time_range.max_timestamp ); assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]); - let iter = parts[1].memtable.iter(None, None, None).unwrap(); + let iter = parts[1] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); assert_eq!(1, parts[1].memtable.id()); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[5000, 7000], ×tamps[..]); @@ -1273,7 +1292,12 @@ mod tests { let parts = partitions.list_partitions(); assert_eq!(1, parts.len()); - let iter = parts[0].memtable.iter(None, None, None).unwrap(); + let iter = parts[0] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[1000, 2000, 3000], ×tamps[..]); @@ -1284,11 +1308,21 @@ mod tests { let parts = partitions.list_partitions(); assert_eq!(2, parts.len()); // Check first partition [0, 5000) - let iter = parts[0].memtable.iter(None, None, None).unwrap(); + let iter = parts[0] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]); // Check second partition [5000, 10000) - let iter = parts[1].memtable.iter(None, None, None).unwrap(); + let iter = parts[1] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[5000, 6000], ×tamps[..]); @@ -1301,7 +1335,12 @@ mod tests { assert_eq!(3, parts.len()); // Check new partition [10000, 15000) - let iter = parts[2].memtable.iter(None, None, None).unwrap(); + let iter = parts[2] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[11000, 12000], ×tamps[..]); @@ -1314,7 +1353,12 @@ mod tests { let parts = partitions.list_partitions(); assert_eq!(1, parts.len()); - let iter = parts[0].memtable.iter(None, None, None).unwrap(); + let iter = parts[0] + .memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[1000, 5000, 9000], ×tamps[..]); } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 271a9343eb..97f5f3c9ce 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -267,39 +267,6 @@ impl Memtable for TimeSeriesMemtable { Ok(()) } - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - projection: Option<&[ColumnId]>, - filters: Option, - sequence: Option, - ) -> Result { - let projection = if let Some(projection) = projection { - projection.iter().copied().collect() - } else { - self.region_metadata - .field_columns() - .map(|c| c.column_id) - .collect() - }; - - let iter = self.series_set.iter_series( - projection, - filters, - self.dedup, - self.merge_mode, - sequence, - None, - )?; - - if self.merge_mode == MergeMode::LastNonNull { - let iter = LastNonNullIter::new(iter); - Ok(Box::new(iter)) - } else { - Ok(Box::new(iter)) - } - } - fn ranges( &self, projection: Option<&[ColumnId]>, @@ -1798,7 +1765,9 @@ mod tests { *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 }; } - let iter = memtable.iter(None, None, None).unwrap(); + let ranges = memtable.ranges(None, RangesOptions::default()).unwrap(); + let range = ranges.ranges.into_values().next().unwrap(); + let iter = range.build_iter().unwrap(); let mut read = HashMap::new(); for ts in iter @@ -1838,7 +1807,11 @@ mod tests { let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow); memtable.write(&kvs).unwrap(); - let iter = memtable.iter(Some(&[3]), None, None).unwrap(); + let iter = memtable + .ranges(Some(&[3]), RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let mut v0_all = vec![]; @@ -1917,7 +1890,11 @@ mod tests { barrier.wait(); for _ in 0..10 { - let iter = memtable.iter(None, None, None).unwrap(); + let iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); for batch_result in iter { let _ = batch_result.unwrap(); } @@ -1936,7 +1913,11 @@ mod tests { handle.join().unwrap(); } - let iter = memtable.iter(None, None, None).unwrap(); + let iter = memtable + .ranges(None, RangesOptions::default()) + .unwrap() + .build(None) + .unwrap(); let mut series_count = 0; let mut row_count = 0; diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 7ddac4ee0d..58ea49fa41 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -83,16 +83,6 @@ impl Memtable for EmptyMemtable { Ok(()) } - #[cfg(any(test, feature = "test"))] - fn iter( - &self, - _projection: Option<&[ColumnId]>, - _filters: Option, - _sequence: Option, - ) -> Result { - Ok(Box::new(std::iter::empty())) - } - fn ranges( &self, _projection: Option<&[ColumnId]>, From dd82fcac00856a6dc3317fa4920b490bba959b84 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 16 Mar 2026 17:56:34 +0800 Subject: [PATCH 3/7] chore: update visibility of BatchToRecordBatchAdapter::new (#7817) --- src/mito2/src/read/batch_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs index 461dbeba69..4698229c5b 100644 --- a/src/mito2/src/read/batch_adapter.rs +++ b/src/mito2/src/read/batch_adapter.rs @@ -59,7 +59,7 @@ impl BatchToRecordBatchAdapter { /// - `metadata`: region metadata describing the schema. /// - `codec`: codec for decoding the encoded primary key bytes. /// - `read_column_ids`: projected column ids to read. - pub(crate) fn new( + pub fn new( iter: BoxedBatchIterator, metadata: RegionMetadataRef, codec: Arc, From 5a37e58b4f4c4475e251d15e57436bb78acfe167 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 17 Mar 2026 11:53:20 +0800 Subject: [PATCH 4/7] feat(mito2): add partition range cache infrastructure (#7798) * feat: add partition range cache infra Signed-off-by: evenyag * refactor: optimize scan request fingerprint cloning Signed-off-by: evenyag * refactor: merge loops Signed-off-by: evenyag * chore: more docs Signed-off-by: evenyag * chore: update estimated size method and comment Signed-off-by: evenyag * chore: fix clippy Signed-off-by: evenyag * feat: only cache when we scan files Signed-off-by: evenyag * fix: address PR review comments for partition range cache - Remove TimeSeriesDistribution from fingerprint as it only affects yield order - Disable range cache when dyn filters are present since they change at runtime Signed-off-by: evenyag * chore: fmt code Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/cache.rs | 134 ++++++++++++++++ src/mito2/src/read.rs | 1 + src/mito2/src/read/range_cache.rs | 252 ++++++++++++++++++++++++++++++ src/mito2/src/read/scan_region.rs | 243 +++++++++++++++++++++++++++- src/mito2/src/region/options.rs | 2 +- 5 files changed, 629 insertions(+), 3 deletions(-) create mode 100644 src/mito2/src/read/range_cache.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 3ad71d2a61..e232489768 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -49,6 +49,7 @@ use crate::cache::write_cache::WriteCacheRef; use crate::memtable::record_batch_estimated_size; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; +use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue}; use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::parquet::reader::MetadataCacheMetrics; @@ -64,6 +65,8 @@ const FILE_TYPE: &str = "file"; const INDEX_TYPE: &str = "index"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; +/// Metrics type key for range scan result cache. +const RANGE_RESULT_TYPE: &str = "range_result"; /// Cache strategies that may only enable a subset of caches. #[derive(Clone)] @@ -223,6 +226,32 @@ impl CacheStrategy { } } + /// Calls [CacheManager::get_range_result()]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn get_range_result( + &self, + key: &RangeScanCacheKey, + ) -> Option> { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + + /// Calls [CacheManager::put_range_result()]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn put_range_result( + &self, + key: RangeScanCacheKey, + result: Arc, + ) { + if let CacheStrategy::EnableAll(cache_manager) = self { + cache_manager.put_range_result(key, result); + } + } + /// Calls [CacheManager::write_cache()]. /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { @@ -324,6 +353,9 @@ pub struct CacheManager { puffin_metadata_cache: Option, /// Cache for time series selectors. selector_result_cache: Option, + /// Cache for range scan outputs in flat format. + #[cfg_attr(not(test), allow(dead_code))] + range_result_cache: Option, /// Cache for index result. index_result_cache: Option, } @@ -512,6 +544,32 @@ impl CacheManager { } } + /// Gets cached result for range scan. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn get_range_result( + &self, + key: &RangeScanCacheKey, + ) -> Option> { + self.range_result_cache + .as_ref() + .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE)) + } + + /// Puts range scan result into the cache. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn put_range_result( + &self, + key: RangeScanCacheKey, + result: Arc, + ) { + if let Some(cache) = &self.range_result_cache { + CACHE_BYTES + .with_label_values(&[RANGE_RESULT_TYPE]) + .add(range_result_cache_weight(&key, &result).into()); + cache.insert(key, result); + } + } + /// Gets the write cache. pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() @@ -562,6 +620,7 @@ pub struct CacheManagerBuilder { puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, + range_result_cache_size: u64, } impl CacheManagerBuilder { @@ -625,6 +684,12 @@ impl CacheManagerBuilder { self } + /// Sets range result cache size. + pub fn range_result_cache_size(mut self, bytes: u64) -> Self { + self.range_result_cache_size = bytes; + self + } + /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { fn to_str(cause: RemovalCause) -> &'static str { @@ -712,6 +777,21 @@ impl CacheManagerBuilder { }) .build() }); + let range_result_cache = (self.range_result_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.range_result_cache_size) + .weigher(range_result_cache_weight) + .eviction_listener(|k, v, cause| { + let size = range_result_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[RANGE_RESULT_TYPE]) + .sub(size.into()); + CACHE_EVICTION + .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)]) + .inc(); + }) + .build() + }); CacheManager { sst_meta_cache, vector_cache, @@ -723,6 +803,7 @@ impl CacheManagerBuilder { vector_index_cache, puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, + range_result_cache, index_result_cache, } } @@ -746,6 +827,10 @@ fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc) -> u32 { + (k.estimated_size() + v.estimated_size()) as u32 +} + /// Updates cache hit/miss metrics. fn update_hit_miss(value: Option, cache_type: &str) -> Option { if value.is_some() { @@ -902,6 +987,8 @@ type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>; type PageCache = Cache>; /// Maps (file id, row group id, time series row selector) to [SelectorResultValue]. type SelectorResultCache = Cache>; +/// Maps partition-range scan key to cached flat batches. +type RangeResultCache = Cache>; #[cfg(test)] mod tests { @@ -916,6 +1003,9 @@ mod tests { use crate::cache::index::bloom_filter_index::Tag; use crate::cache::index::result_cache::PredicateKey; use crate::cache::test_util::parquet_meta; + use crate::read::range_cache::{ + RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder, + }; use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] @@ -1028,6 +1118,50 @@ mod tests { assert!(cache.get_selector_result(&key).is_some()); } + #[test] + fn test_range_result_cache() { + let cache = Arc::new( + CacheManager::builder() + .range_result_cache_size(1024 * 1024) + .build(), + ); + + let key = RangeScanCacheKey { + region_id: RegionId::new(1, 1), + row_groups: vec![(FileId::random(), 0)], + scan: ScanRequestFingerprintBuilder { + read_column_ids: vec![], + read_column_types: vec![], + filters: vec!["tag_0 = 1".to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: crate::region::options::MergeMode::LastRow, + partition_expr_version: 0, + } + .build(), + }; + let value = Arc::new(RangeScanCacheValue::new(Vec::new())); + + assert!(cache.get_range_result(&key).is_none()); + cache.put_range_result(key.clone(), value.clone()); + assert!(cache.get_range_result(&key).is_some()); + + let enable_all = CacheStrategy::EnableAll(cache.clone()); + assert!(enable_all.get_range_result(&key).is_some()); + + let compaction = CacheStrategy::Compaction(cache.clone()); + assert!(compaction.get_range_result(&key).is_none()); + compaction.put_range_result(key.clone(), value.clone()); + assert!(cache.get_range_result(&key).is_some()); + + let disabled = CacheStrategy::Disabled; + assert!(disabled.get_range_result(&key).is_none()); + disabled.put_range_result(key.clone(), value); + assert!(cache.get_range_result(&key).is_some()); + } + #[tokio::test] async fn test_evict_puffin_cache_clears_all_entries() { use std::collections::{BTreeMap, HashMap}; diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5fbd63ce8b..240a99c247 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -27,6 +27,7 @@ pub mod projection; pub(crate) mod prune; pub(crate) mod pruner; pub mod range; +pub(crate) mod range_cache; pub mod scan_region; pub mod scan_util; pub(crate) mod seq_scan; diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs new file mode 100644 index 0000000000..5b90e68bae --- /dev/null +++ b/src/mito2/src/read/range_cache.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for the partition range scan result cache. + +use std::mem; +use std::sync::Arc; + +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::ConcreteDataType; +use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; + +use crate::memtable::record_batch_estimated_size; +use crate::region::options::MergeMode; + +/// Fingerprint of the scan request fields that affect partition range cache reuse. +/// +/// It records a normalized view of the projected columns and filters, plus +/// scan options that can change the returned rows. Schema-dependent metadata +/// and the partition expression version are included so cached results are not +/// reused across incompatible schema or partitioning changes. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct ScanRequestFingerprint { + /// Projection and filters without the time index and partition exprs. + inner: Arc, + /// Filters with the time index column. + time_filters: Option>>, + series_row_selector: Option, + append_mode: bool, + filter_deleted: bool, + merge_mode: MergeMode, + /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr. + /// We store the version instead of the whole partition expr or partition expr filters. + partition_expr_version: u64, +} + +#[derive(Debug)] +pub(crate) struct ScanRequestFingerprintBuilder { + pub(crate) read_column_ids: Vec, + pub(crate) read_column_types: Vec>, + pub(crate) filters: Vec, + pub(crate) time_filters: Vec, + pub(crate) series_row_selector: Option, + pub(crate) append_mode: bool, + pub(crate) filter_deleted: bool, + pub(crate) merge_mode: MergeMode, + pub(crate) partition_expr_version: u64, +} + +impl ScanRequestFingerprintBuilder { + pub(crate) fn build(self) -> ScanRequestFingerprint { + let Self { + read_column_ids, + read_column_types, + filters, + time_filters, + series_row_selector, + append_mode, + filter_deleted, + merge_mode, + partition_expr_version, + } = self; + + ScanRequestFingerprint { + inner: Arc::new(SharedScanRequestFingerprint { + read_column_ids, + read_column_types, + filters, + }), + time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)), + series_row_selector, + append_mode, + filter_deleted, + merge_mode, + partition_expr_version, + } + } +} + +/// Non-copiable struct of the fingerprint. +#[derive(Debug, PartialEq, Eq, Hash)] +struct SharedScanRequestFingerprint { + /// Column ids of the projection. + read_column_ids: Vec, + /// Column types of the projection. + /// We keep this to ensure we won't reuse the fingerprint after a schema change. + read_column_types: Vec>, + /// Filters without the time index column and region partition exprs. + filters: Vec, +} + +impl ScanRequestFingerprint { + #[cfg(test)] + pub(crate) fn read_column_ids(&self) -> &[ColumnId] { + &self.inner.read_column_ids + } + + #[cfg(test)] + pub(crate) fn read_column_types(&self) -> &[Option] { + &self.inner.read_column_types + } + + #[cfg(test)] + pub(crate) fn filters(&self) -> &[String] { + &self.inner.filters + } + + #[cfg(test)] + pub(crate) fn time_filters(&self) -> &[String] { + self.time_filters + .as_deref() + .map(Vec::as_slice) + .unwrap_or(&[]) + } + + #[cfg(test)] + pub(crate) fn without_time_filters(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + time_filters: None, + series_row_selector: self.series_row_selector, + append_mode: self.append_mode, + filter_deleted: self.filter_deleted, + merge_mode: self.merge_mode, + partition_expr_version: self.partition_expr_version, + } + } + + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.inner.read_column_ids.capacity() * mem::size_of::() + + self.inner.read_column_types.capacity() * mem::size_of::>() + + self.inner.filters.capacity() * mem::size_of::() + + self + .inner + .filters + .iter() + .map(|filter| filter.capacity()) + .sum::() + + self.time_filters.as_ref().map_or(0, |filters| { + mem::size_of::>() + + filters.capacity() * mem::size_of::() + + filters + .iter() + .map(|filter| filter.capacity()) + .sum::() + }) + } +} + +/// Cache key for range scan outputs. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct RangeScanCacheKey { + pub(crate) region_id: RegionId, + /// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data. + pub(crate) row_groups: Vec<(FileId, i64)>, + pub(crate) scan: ScanRequestFingerprint, +} + +impl RangeScanCacheKey { + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>() + + self.scan.estimated_size() + } +} + +/// Cached result for one range scan. +pub(crate) struct RangeScanCacheValue { + pub(crate) batches: Vec, +} + +impl RangeScanCacheValue { + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn new(batches: Vec) -> Self { + Self { batches } + } + + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.batches.capacity() * mem::size_of::() + + self + .batches + .iter() + .map(record_batch_estimated_size) + .sum::() + } +} + +#[cfg(test)] +mod tests { + use store_api::storage::TimeSeriesRowSelector; + + use super::*; + + #[test] + fn normalizes_and_clears_time_filters() { + let normalized = ScanRequestFingerprintBuilder { + read_column_ids: vec![1, 2], + read_column_types: vec![None, None], + filters: vec!["k0 = 'foo'".to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: 0, + } + .build(); + + assert!(normalized.time_filters().is_empty()); + + let fingerprint = ScanRequestFingerprintBuilder { + read_column_ids: vec![1, 2], + read_column_types: vec![None, None], + filters: vec!["k0 = 'foo'".to_string()], + time_filters: vec!["ts >= 1000".to_string()], + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: 7, + } + .build(); + + let reset = fingerprint.without_time_filters(); + + assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids()); + assert_eq!(reset.read_column_types(), fingerprint.read_column_types()); + assert_eq!(reset.filters(), fingerprint.filters()); + assert!(reset.time_filters().is_empty()); + assert_eq!(reset.series_row_selector, fingerprint.series_row_selector); + assert_eq!(reset.append_mode, fingerprint.append_mode); + assert_eq!(reset.filter_deleted, fingerprint.filter_deleted); + assert_eq!(reset.merge_mode, fingerprint.merge_mode); + assert_eq!( + reset.partition_expr_version, + fingerprint.partition_expr_version + ); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5d934afd2d..5cb2d75e25 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -55,6 +55,7 @@ use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch}; use crate::read::projection::ProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; +use crate::read::range_cache::ScanRequestFingerprint; use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; @@ -815,7 +816,7 @@ pub struct ScanInput { /// But this read columns might also include non-projected columns needed for filtering. pub(crate) read_column_ids: Vec, /// Time range filter for time index. - time_range: Option, + pub(crate) time_range: Option, /// Predicate to push down. pub(crate) predicate: PredicateGroup, /// Region partition expr applied at read time. @@ -1417,6 +1418,92 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { } } +/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible +/// for partition range caching. +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { + let eligible = input.flat_format + && !input.compaction + && !input.files.is_empty() + && matches!(input.cache_strategy, CacheStrategy::EnableAll(_)); + + if !eligible { + return None; + } + + let metadata = input.region_metadata(); + let tag_names: HashSet<&str> = metadata + .column_metadatas + .iter() + .filter(|col| col.semantic_type == SemanticType::Tag) + .map(|col| col.column_schema.name.as_str()) + .collect(); + + let time_index_name = metadata.time_index_column().column_schema.name.clone(); + + let exprs = input + .predicate_group() + .predicate_without_region() + .map(|predicate| predicate.exprs()) + .unwrap_or_default(); + + let mut filters = Vec::new(); + let mut time_filters = Vec::new(); + let mut has_tag_filter = false; + let mut columns = HashSet::new(); + + for expr in exprs { + columns.clear(); + let is_time_only = match expr_to_columns(expr, &mut columns) { + Ok(()) if !columns.is_empty() => { + has_tag_filter |= columns + .iter() + .any(|col| tag_names.contains(col.name.as_str())); + columns.iter().all(|col| col.name == time_index_name) + } + _ => false, + }; + + if is_time_only { + time_filters.push(expr.to_string()); + } else { + filters.push(expr.to_string()); + } + } + + if !has_tag_filter { + // We only cache requests that have tag filters to avoid caching all series. + return None; + } + + // Ensure the filters are sorted for consistent fingerprinting. + filters.sort_unstable(); + time_filters.sort_unstable(); + + Some( + crate::read::range_cache::ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: input + .read_column_ids + .iter() + .map(|id| { + metadata + .column_by_id(*id) + .map(|col| col.column_schema.data_type.clone()) + }) + .collect(), + filters, + time_filters, + series_row_selector: input.series_row_selector, + append_mode: input.append_mode, + filter_deleted: input.filter_deleted, + merge_mode: input.merge_mode, + partition_expr_version: metadata.partition_expr_version, + } + .build(), + ) +} + /// Context shared by different streams from a scanner. /// It contains the input and ranges to scan. pub struct StreamContext { @@ -1763,10 +1850,15 @@ mod tests { use datafusion::physical_plan::expressions::lit as physical_lit; use datafusion_expr::{col, lit}; - use store_api::storage::ScanRequest; + use datatypes::value::Value; + use partition::expr::col as partition_col; + use store_api::metadata::RegionMetadataBuilder; + use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use super::*; + use crate::cache::CacheManager; use crate::memtable::time_partition::TimePartitions; + use crate::read::range_cache::ScanRequestFingerprintBuilder; use crate::region::options::RegionOptions; use crate::region::version::VersionBuilder; use crate::sst::FormatType; @@ -1804,6 +1896,26 @@ mod tests { ) } + async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec) -> ScanInput { + let env = SchedulerEnv::new().await; + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(); + let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); + let file = FileHandle::new( + crate::sst::file::FileMeta::default(), + Arc::new(crate::sst::file_purger::NoopFilePurger), + ); + + ScanInput::new(env.access_layer.clone(), mapper) + .with_predicate(predicate) + .with_cache(CacheStrategy::EnableAll(Arc::new( + CacheManager::builder() + .range_result_cache_size(1024) + .build(), + ))) + .with_flat_format(true) + .with_files(vec![file]) + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -1923,6 +2035,133 @@ mod tests { assert!(scan_region.use_flat_format()); } + #[tokio::test] + async fn test_build_scan_fingerprint_for_eligible_scan() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input( + metadata.clone(), + vec![ + col("ts").gt_eq(lit(1000)), + col("k0").eq(lit("foo")), + col("v0").gt(lit(1)), + ], + ) + .await + .with_distribution(Some(TimeSeriesDistribution::PerSeries)) + .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow)) + .with_merge_mode(MergeMode::LastNonNull) + .with_filter_deleted(false); + + let fingerprint = build_scan_fingerprint(&input).unwrap(); + + let expected = ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: vec![ + metadata + .column_by_id(0) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(2) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(3) + .map(|col| col.column_schema.data_type.clone()), + ], + filters: vec![ + col("k0").eq(lit("foo")).to_string(), + col("v0").gt(lit(1)).to_string(), + ], + time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()], + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + append_mode: false, + filter_deleted: false, + merge_mode: MergeMode::LastNonNull, + partition_expr_version: 0, + } + .build(); + assert_eq!(expected, fingerprint); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_requires_tag_filter() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input( + metadata, + vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))], + ) + .await; + + assert!(build_scan_fingerprint(&input).is_none()); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_respects_scan_eligibility() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let filters = vec![col("k0").eq(lit("foo"))]; + + let disabled = ScanInput::new( + SchedulerEnv::new().await.access_layer.clone(), + ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(), + ) + .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap()) + .with_flat_format(true); + assert!(build_scan_fingerprint(&disabled).is_none()); + + let non_flat = new_scan_input(metadata.clone(), filters.clone()) + .await + .with_flat_format(false); + assert!(build_scan_fingerprint(&non_flat).is_none()); + + let compaction = new_scan_input(metadata.clone(), filters.clone()) + .await + .with_compaction(true); + assert!(build_scan_fingerprint(&compaction).is_none()); + + // No files to read. + let no_files = new_scan_input(metadata, filters).await.with_files(vec![]); + assert!(build_scan_fingerprint(&no_files).is_none()); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() { + let base = metadata_with_primary_key(vec![0, 1], false); + let mut builder = RegionMetadataBuilder::from_existing(base); + let partition_expr = partition_col("k0") + .gt_eq(Value::String("foo".into())) + .as_json_str() + .unwrap(); + builder.partition_expr_json(Some(partition_expr)); + let metadata = Arc::new(builder.build_without_validation().unwrap()); + + let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await; + let fingerprint = build_scan_fingerprint(&input).unwrap(); + + let expected = ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: vec![ + metadata + .column_by_id(0) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(2) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(3) + .map(|col| col.column_schema.data_type.clone()), + ], + filters: vec![col("k0").eq(lit("foo")).to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: metadata.partition_expr_version, + } + .build(); + assert_eq!(expected, fingerprint); + assert_ne!(0, metadata.partition_expr_version); + } + #[test] fn test_update_dyn_filters_with_empty_base_predicates() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 0fe0a8f12a..fcf68a9216 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -50,7 +50,7 @@ pub(crate) fn parse_wal_options( } /// Mode to handle duplicate rows while merging. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum MergeMode { From e0aadffb911cece5988bf981a126b2a744337490 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 17 Mar 2026 15:55:48 +0800 Subject: [PATCH 5/7] feat: add flat last row reader to the final stream (#7818) Signed-off-by: evenyag --- src/mito2/src/engine/row_selector_test.rs | 25 +++++++++++---- src/mito2/src/read/last_row.rs | 38 ++++++++++++++++++++++- src/mito2/src/read/seq_scan.rs | 9 +++++- 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs index 317ede5a97..d79152e57f 100644 --- a/src/mito2/src/engine/row_selector_test.rs +++ b/src/mito2/src/engine/row_selector_test.rs @@ -24,7 +24,7 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema, }; -async fn test_last_row(append_mode: bool) { +async fn test_last_row(append_mode: bool, flat_format: bool) { let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -39,9 +39,12 @@ async fn test_last_row(append_mode: bool) { env.get_kv_backend(), ) .await; - let request = CreateRequestBuilder::new() - .insert_option("append_mode", &append_mode.to_string()) - .build(); + let mut request_builder = + CreateRequestBuilder::new().insert_option("append_mode", &append_mode.to_string()); + if flat_format { + request_builder = request_builder.insert_option("sst_format", "flat"); + } + let request = request_builder.build(); let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) @@ -106,10 +109,20 @@ async fn test_last_row(append_mode: bool) { #[tokio::test] async fn test_last_row_append_mode_disabled() { - test_last_row(false).await; + test_last_row(false, false).await; } #[tokio::test] async fn test_last_row_append_mode_enabled() { - test_last_row(true).await; + test_last_row(true, false).await; +} + +#[tokio::test] +async fn test_last_row_flat_format_append_mode_disabled() { + test_last_row(false, true).await; +} + +#[tokio::test] +async fn test_last_row_flat_format_append_mode_enabled() { + test_last_row(true, true).await; } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index c2336f218d..0c13c120a0 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -21,6 +21,7 @@ use datatypes::arrow::array::{Array, BinaryArray}; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::UInt32Vector; +use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; @@ -30,7 +31,7 @@ use crate::cache::{ }; use crate::error::{ComputeArrowSnafu, Result}; use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; -use crate::read::{Batch, BatchReader, BoxedBatchReader}; +use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index}; use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets}; @@ -610,6 +611,41 @@ impl FlatLastTimestampSelector { } } +/// Reader that keeps only the last row of each time series from a flat RecordBatch stream. +/// Assumes input is sorted, deduped, and contains no delete operations. +pub(crate) struct FlatLastRowReader { + stream: BoxedRecordBatchStream, + selector: FlatLastTimestampSelector, + pending: BatchBuffer, +} + +impl FlatLastRowReader { + /// Creates a new `FlatLastRowReader`. + pub(crate) fn new(stream: BoxedRecordBatchStream) -> Self { + Self { + stream, + selector: FlatLastTimestampSelector::default(), + pending: BatchBuffer::new(), + } + } + + /// Converts the reader into a stream of RecordBatches. + pub(crate) fn into_stream(mut self) -> impl Stream> { + async_stream::try_stream! { + while let Some(batch) = self.stream.try_next().await? { + self.selector.on_next(batch, &mut self.pending)?; + if self.pending.is_full() { + yield self.pending.concat()?; + } + } + self.selector.finish(&mut self.pending)?; + if !self.pending.is_empty() { + yield self.pending.concat()?; + } + } + } +} + /// Gets the primary key bytes at `index` from the primary key dictionary column. fn primary_key_bytes_at(batch: &RecordBatch, pk_col_idx: usize, index: usize) -> &[u8] { let pk_dict = batch diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d2be17cc83..a1b3b8f350 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -39,7 +39,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, Un use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeReader; -use crate::read::last_row::LastRowReader; +use crate::read::last_row::{FlatLastRowReader, LastRowReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::range::RangeMeta; @@ -289,6 +289,13 @@ impl SeqScan { Box::pin(reader.into_stream()) as _ }; + let reader = match &stream_ctx.input.series_row_selector { + Some(TimeSeriesRowSelector::LastRow) => { + Box::pin(FlatLastRowReader::new(reader).into_stream()) as _ + } + None => reader, + }; + Ok(reader) } From dc98e0215bd19312f136dfecd5f3d64fc26023b7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:28:06 +0800 Subject: [PATCH 6/7] feat(metric-engine): support bulk inserts with put fallback (#7792) * feat(metric-engine): support bulk inserts Implement `RegionRequest::BulkInserts` to support efficient columnar data ingestion in the metric engine. Key changes: - Implement `bulk_insert_region` to handle logical-to-physical region mapping and dispatch writes. - Add `batch_modifier` for `RecordBatch` transformations, specifically for `__tsid` generation and sparse primary key encoding. - Integrate `BulkInserts` into the `MetricEngine` request handling logic. - Provide a row-based fallback mechanism if the underlying storage doesn't support bulk writes. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Update `bulk_insert.rs` to Support Partition Expression Version - **Enhancements**: - Added support for `partition_expr_version` in `RegionBulkInsertsRequest` and `RegionPutRequest`. - Modified the handling of `partition_expr_version` to be dynamically set from the `request` object. Files affected: - `src/metric-engine/src/engine/bulk_insert.rs` Signed-off-by: Lei, HUANG * fix: cargo lock revert Signed-off-by: Lei, HUANG * add doc for conversions Signed-off-by: Lei, HUANG * chore: simplify test Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `bulk_insert.rs` in `metric-engine` - **Refactor Functionality**: - Replaced `resolve_tag_columns` with `resolve_tag_columns_from_metadata` to streamline tag column resolution. - Moved logic for resolving tag columns directly into `resolve_tag_columns_from_metadata`, removing the need for an external function call. - **Enhancements**: - Improved error handling and context provision for missing physical regions and columns. - Optimized tag column sorting and index management within the batch processing logic. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `record_batch_to_rows` Function in `bulk_insert.rs` - Simplified the `record_batch_to_rows` function by removing the `logical_metadata` parameter and directly validating column types within the function. - Enhanced error handling for timestamp, value, and tag columns by checking their data types and providing detailed error messages. - Replaced the use of `Helper::try_into_vector` with direct downcasting to `TimestampMillisecondArray`, `Float64Array`, and `StringArray` for improved type safety and clarity. - Updated the construction of `api::v1::Rows` to directly handle null values and construct `api::v1::Value` objects accordingly. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ## Commit Message Refactor `bulk_insert.rs` to optimize state access - Moved the state read operation inside a new block to limit its scope and improve code clarity. - Adjusted logic for processing `tag_columns` and `non_tag_indices` to work within the new block structure. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `compute_tsid_array` Function - **Refactored `compute_tsid_array` function**: Modified the function signature to accept `tag_arrays` as a parameter instead of building it internally. This change affects the following files: - `src/metric-engine/src/batch_modifier.rs` - **Updated test cases**: Adjusted test cases to accommodate the new `compute_tsid_array` function signature by passing `tag_arrays` explicitly. Signed-off-by: Lei, HUANG * docs: add doc for bulk_insert_region Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Commit Message Refactor `bulk_insert.rs` in `metric-engine`: - Removed error handling for unsupported status codes in `write_data` method. - Eliminated `record_batch_to_rows` function, simplifying the data insertion process. - Streamlined the `write_data` method by removing fallback logic for unsupported operations. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: - **Optimize Primary Key Construction**: Refactored `modify_batch_sparse` in `batch_modifier.rs` to use `BinaryBuilder` for more efficient primary key construction. - **Add Fallback for Unsupported Bulk Inserts**: Updated `bulk_insert.rs` to handle unsupported bulk inserts by converting record batches to rows and using `RegionPutRequest`. - **Implement Record Batch to Rows Conversion**: Added `record_batch_to_rows` function in `bulk_insert.rs` to convert `RecordBatch` to `api::v1::Rows` for fallback operations. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: Add test for handling null values in `record_batch_to_rows` - Added a new test `test_record_batch_to_rows_with_null_values` in `bulk_insert.rs` to verify the handling of null values in the `record_batch_to_rows` function. - The test checks the conversion of a `RecordBatch` with null values in various fields to ensure correct row creation and schema handling. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: Add fallback path for unsupported status and improve error context handling - **`bulk_insert.rs`**: - Added a fallback path for `PartitionTreeMemtable` in case of unsupported status code. - Enhanced error handling by using `with_context` for better error messages when timestamp and value columns are not found in `RecordBatch`. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- Cargo.lock | 1 + src/metric-engine/Cargo.toml | 1 + src/metric-engine/src/batch_modifier.rs | 426 +++++++++++ src/metric-engine/src/engine.rs | 6 +- src/metric-engine/src/engine/bulk_insert.rs | 783 ++++++++++++++++++++ src/metric-engine/src/engine/put.rs | 2 +- src/metric-engine/src/lib.rs | 1 + 7 files changed, 1216 insertions(+), 4 deletions(-) create mode 100644 src/metric-engine/src/batch_modifier.rs create mode 100644 src/metric-engine/src/engine/bulk_insert.rs diff --git a/Cargo.lock b/Cargo.lock index 1f65f1289c..605b037fc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7886,6 +7886,7 @@ dependencies = [ "common-base", "common-error", "common-function", + "common-grpc", "common-macro", "common-meta", "common-query", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 567210b952..5b561997ab 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -17,6 +17,7 @@ bytes.workspace = true fxhash = "0.2" common-base.workspace = true common-error.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true diff --git a/src/metric-engine/src/batch_modifier.rs b/src/metric-engine/src/batch_modifier.rs new file mode 100644 index 0000000000..8a5774889b --- /dev/null +++ b/src/metric-engine/src/batch_modifier.rs @@ -0,0 +1,426 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hasher; +use std::sync::Arc; + +use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array}; +use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::value::ValueRef; +use fxhash::FxHasher; +use mito_codec::row_converter::SparsePrimaryKeyCodec; +use snafu::ResultExt; +use store_api::storage::ColumnId; +use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId}; + +use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu}; + +/// Info about a tag column for TSID computation and sparse primary key encoding. +#[allow(dead_code)] +pub(crate) struct TagColumnInfo { + /// Column name (used for label-name hash). + pub name: String, + /// Column index in the RecordBatch. + pub index: usize, + /// Column ID in the physical region. + pub column_id: ColumnId, +} + +/// Computes `__tsid` values for each row. +#[allow(dead_code)] +pub(crate) fn compute_tsid_array( + batch: &RecordBatch, + sorted_tag_columns: &[TagColumnInfo], + tag_arrays: &[&StringArray], +) -> UInt64Array { + let num_rows = batch.num_rows(); + + let label_name_hash = { + let mut hasher = FxHasher::default(); + for tag_col in sorted_tag_columns { + hasher.write(tag_col.name.as_bytes()); + hasher.write_u8(0xff); + } + hasher.finish() + }; + + let mut tsid_values = Vec::with_capacity(num_rows); + for row in 0..num_rows { + let has_null = tag_arrays.iter().any(|arr| arr.is_null(row)); + + let tsid = if !has_null { + let mut hasher = FxHasher::default(); + hasher.write_u64(label_name_hash); + for arr in tag_arrays { + hasher.write(arr.value(row).as_bytes()); + hasher.write_u8(0xff); + } + hasher.finish() + } else { + let mut name_hasher = FxHasher::default(); + for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) { + if !arr.is_null(row) { + name_hasher.write(tc.name.as_bytes()); + name_hasher.write_u8(0xff); + } + } + let row_label_hash = name_hasher.finish(); + + let mut val_hasher = FxHasher::default(); + val_hasher.write_u64(row_label_hash); + for arr in tag_arrays { + if !arr.is_null(row) { + val_hasher.write(arr.value(row).as_bytes()); + val_hasher.write_u8(0xff); + } + } + val_hasher.finish() + }; + + tsid_values.push(tsid); + } + + UInt64Array::from(tsid_values) +} + +fn build_tag_arrays<'a>( + batch: &'a RecordBatch, + sorted_tag_columns: &[TagColumnInfo], +) -> Vec<&'a StringArray> { + sorted_tag_columns + .iter() + .map(|tc| { + batch + .column(tc.index) + .as_any() + .downcast_ref::() + .expect("tag column must be utf8") + }) + .collect() +} + +/// Modifies a RecordBatch for sparse primary key encoding. +#[allow(dead_code)] +pub(crate) fn modify_batch_sparse( + batch: RecordBatch, + table_id: u32, + sorted_tag_columns: &[TagColumnInfo], + non_tag_column_indices: &[usize], +) -> Result { + let num_rows = batch.num_rows(); + let codec = SparsePrimaryKeyCodec::schemaless(); + let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns); + let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays); + + let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0); + let mut buffer = Vec::new(); + for row in 0..num_rows { + buffer.clear(); + let internal = [ + (ReservedColumnId::table_id(), ValueRef::UInt32(table_id)), + ( + ReservedColumnId::tsid(), + ValueRef::UInt64(tsid_array.value(row)), + ), + ]; + codec + .encode_to_vec(internal.into_iter(), &mut buffer) + .context(EncodePrimaryKeySnafu)?; + + let tags = sorted_tag_columns + .iter() + .zip(tag_arrays.iter()) + .filter(|(_, arr)| !arr.is_null(row)) + .map(|(tc, arr)| (tc.column_id, ValueRef::String(arr.value(row)))); + codec + .encode_to_vec(tags, &mut buffer) + .context(EncodePrimaryKeySnafu)?; + + pk_builder.append_value(&buffer); + } + + let pk_array = pk_builder.finish(); + + let mut fields = vec![Arc::new(Field::new( + PRIMARY_KEY_COLUMN_NAME, + DataType::Binary, + false, + ))]; + let mut columns: Vec> = vec![Arc::new(pk_array)]; + + for &idx in non_tag_column_indices { + fields.push(batch.schema().fields()[idx].clone()); + columns.push(batch.column(idx).clone()); + } + + let new_schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(new_schema, columns).map_err(|e| { + UnexpectedRequestSnafu { + reason: format!("Failed to build modified sparse RecordBatch: {e}"), + } + .build() + }) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; + use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::codec::PrimaryKeyEncoding; + use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; + + use super::*; + use crate::row_modifier::{RowModifier, RowsIter, TableIdInput}; + + fn build_sparse_test_batch() -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("greptime_timestamp", DataType::Int64, false), + Field::new("greptime_value", DataType::Float64, true), + Field::new("namespace", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1000])), + Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])), + Arc::new(StringArray::from(vec!["greptimedb"])), + Arc::new(StringArray::from(vec!["127.0.0.1"])), + ], + ) + .unwrap() + } + + fn sparse_tag_columns() -> Vec { + vec![ + TagColumnInfo { + name: "host".to_string(), + index: 3, + column_id: 3, + }, + TagColumnInfo { + name: "namespace".to_string(), + index: 2, + column_id: 2, + }, + ] + } + + #[test] + fn test_compute_tsid_basic() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("namespace", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["greptimedb"])), + Arc::new(StringArray::from(vec!["127.0.0.1"])), + ], + ) + .unwrap(); + + let tag_columns: Vec = vec![ + TagColumnInfo { + name: "host".to_string(), + index: 1, + column_id: 2, + }, + TagColumnInfo { + name: "namespace".to_string(), + index: 0, + column_id: 1, + }, + ]; + let tag_arrays = build_tag_arrays(&batch, &tag_columns); + let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays); + + assert_eq!(tsid_array.value(0), 2721566936019240841); + } + + #[test] + fn test_compute_tsid_with_nulls() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ])); + let batch_no_null = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["A"])), + Arc::new(StringArray::from(vec!["B"])), + ], + ) + .unwrap(); + let tag_cols_2: Vec = vec![ + TagColumnInfo { + name: "a".to_string(), + index: 0, + column_id: 1, + }, + TagColumnInfo { + name: "b".to_string(), + index: 1, + column_id: 2, + }, + ]; + let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2); + let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2); + + let schema3 = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Utf8, true), + ])); + let batch_with_null = RecordBatch::try_new( + schema3, + vec![ + Arc::new(StringArray::from(vec!["A"])), + Arc::new(StringArray::from(vec!["B"])), + Arc::new(StringArray::from(vec![None as Option<&str>])), + ], + ) + .unwrap(); + let tag_cols_3: Vec = vec![ + TagColumnInfo { + name: "a".to_string(), + index: 0, + column_id: 1, + }, + TagColumnInfo { + name: "b".to_string(), + index: 1, + column_id: 2, + }, + TagColumnInfo { + name: "c".to_string(), + index: 2, + column_id: 3, + }, + ]; + let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3); + let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3); + + assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0)); + } + + #[test] + fn test_modify_batch_sparse() { + let batch = build_sparse_test_batch(); + let tag_columns = sparse_tag_columns(); + let non_tag_indices = vec![0, 1]; + let table_id: u32 = 1025; + + let modified = + modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); + + assert_eq!(modified.num_columns(), 3); + assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME); + assert_eq!(modified.schema().field(1).name(), "greptime_timestamp"); + assert_eq!(modified.schema().field(2).name(), "greptime_value"); + } + + #[test] + fn test_modify_batch_sparse_matches_row_modifier() { + let batch = build_sparse_test_batch(); + let tag_columns = sparse_tag_columns(); + let non_tag_indices = vec![0, 1]; + let table_id: u32 = 1025; + let modified = + modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); + + let name_to_column_id: HashMap = [ + ("greptime_timestamp".to_string(), 0), + ("greptime_value".to_string(), 1), + ("namespace".to_string(), 2), + ("host".to_string(), 3), + ] + .into_iter() + .collect(); + + let rows = Rows { + schema: vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "namespace".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ], + rows: vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1000)), + }, + Value { + value_data: Some(ValueData::F64Value(42.0)), + }, + Value { + value_data: Some(ValueData::StringValue("greptimedb".to_string())), + }, + Value { + value_data: Some(ValueData::StringValue("127.0.0.1".to_string())), + }, + ], + }], + }; + + let row_iter = RowsIter::new(rows, &name_to_column_id); + let rows = RowModifier::default() + .modify_rows( + row_iter, + TableIdInput::Single(table_id), + PrimaryKeyEncoding::Sparse, + ) + .unwrap(); + let ValueData::BinaryValue(expected_pk) = + rows.rows[0].values[0].value_data.clone().unwrap() + else { + panic!("expected binary primary key"); + }; + + let actual_array = modified + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(actual_array.value(0), expected_pk.as_slice()); + } +} diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7a1efedac4..ba90ca960d 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -13,6 +13,7 @@ // limitations under the License. mod alter; +mod bulk_insert; mod catchup; mod close; mod create; @@ -288,9 +289,8 @@ impl RegionEngine for MetricEngine { debug_assert_eq!(region_id, resp_region_id); return response; } - RegionRequest::BulkInserts(_) => { - // todo(hl): find a way to support bulk inserts in metric engine. - UnsupportedRegionRequestSnafu { request }.fail() + RegionRequest::BulkInserts(bulk) => { + self.inner.bulk_insert_region(region_id, bulk).await } }; diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs new file mode 100644 index 0000000000..2a3c26c80c --- /dev/null +++ b/src/metric-engine/src/engine/bulk_insert.rs @@ -0,0 +1,783 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::{ArrowIpc, ColumnDataType, SemanticType}; +use bytes::Bytes; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_grpc::flight::{FlightEncoder, FlightMessage}; +use common_query::prelude::{greptime_timestamp, greptime_value}; +use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::record_batch::RecordBatch; +use snafu::{OptionExt, ensure}; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::RegionMetadataRef; +use store_api::region_request::{ + AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest, +}; +use store_api::storage::RegionId; + +use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse}; +use crate::engine::MetricEngineInner; +use crate::error; +use crate::error::Result; + +impl MetricEngineInner { + /// Bulk-inserts logical rows into a metric region. + /// + /// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical + /// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`. + /// + /// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts` + /// request to the data region, along with the original `partition_expr_version`. + /// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request + /// is transparently retried as a `Put` by converting the original logical batch into + /// `api::v1::Rows`, so callers observe the same semantics as `put_region`. + /// + /// Returns the number of affected rows, or `0` if the input batch is empty. + pub async fn bulk_insert_region( + &self, + region_id: RegionId, + request: RegionBulkInsertsRequest, + ) -> Result { + ensure!( + !self.is_physical_region(region_id), + error::UnsupportedRegionRequestSnafu { + request: RegionRequest::BulkInserts(request), + } + ); + + let (physical_region_id, data_region_id, primary_key_encoding) = + self.find_data_region_meta(region_id)?; + + if primary_key_encoding != PrimaryKeyEncoding::Sparse { + return error::UnsupportedRegionRequestSnafu { + request: RegionRequest::BulkInserts(request), + } + .fail(); + } + + let batch = request.payload; + if batch.num_rows() == 0 { + return Ok(0); + } + + let logical_metadata = self + .logical_region_metadata(physical_region_id, region_id) + .await?; + let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata( + region_id, + data_region_id, + &batch, + &logical_metadata, + )?; + let modified_batch = modify_batch_sparse( + batch.clone(), + region_id.table_id(), + &tag_columns, + &non_tag_indices, + )?; + let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?; + + let partition_expr_version = request.partition_expr_version; + let request = RegionBulkInsertsRequest { + region_id: data_region_id, + payload: modified_batch, + raw_data: ArrowIpc { + schema, + data_header, + payload, + }, + partition_expr_version, + }; + match self + .data_region + .write_data(data_region_id, RegionRequest::BulkInserts(request)) + .await + { + Ok(affected_rows) => Ok(affected_rows), + Err(err) if err.status_code() == StatusCode::Unsupported => { + // todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it + let rows = record_batch_to_rows(&batch, region_id)?; + self.put_region( + region_id, + RegionPutRequest { + rows, + hint: None, + partition_expr_version, + }, + ) + .await + } + Err(err) => Err(err), + } + } + + fn resolve_tag_columns_from_metadata( + &self, + logical_region_id: RegionId, + data_region_id: RegionId, + batch: &RecordBatch, + logical_metadata: &RegionMetadataRef, + ) -> Result<(Vec, Vec)> { + let tag_names: HashSet<&str> = logical_metadata + .column_metadatas + .iter() + .filter_map(|column| { + if column.semantic_type == SemanticType::Tag { + Some(column.column_schema.name.as_str()) + } else { + None + } + }) + .collect(); + + let mut tag_columns = Vec::new(); + let mut non_tag_indices = Vec::new(); + { + let state = self.state.read().unwrap(); + let physical_columns = state + .physical_region_states() + .get(&data_region_id) + .context(error::PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .physical_columns(); + + for (index, field) in batch.schema().fields().iter().enumerate() { + let name = field.name(); + let column_id = + *physical_columns + .get(name) + .with_context(|| error::ColumnNotFoundSnafu { + name: name.clone(), + region_id: logical_region_id, + })?; + if tag_names.contains(name.as_str()) { + tag_columns.push(TagColumnInfo { + name: name.clone(), + index, + column_id, + }); + } else { + non_tag_indices.push(index); + } + } + } + + tag_columns.sort_by(|a, b| a.name.cmp(&b.name)); + Ok((tag_columns, non_tag_indices)) + } +} + +fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result { + let schema_ref = batch.schema(); + let fields = schema_ref.fields(); + + let mut ts_idx = None; + let mut val_idx = None; + let mut tag_indices = Vec::new(); + + for (idx, field) in fields.iter().enumerate() { + if field.name() == greptime_timestamp() { + ts_idx = Some(idx); + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Millisecond, + _ + ) + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Timestamp column '{}' in region {:?} has incompatible type: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + } else if field.name() == greptime_value() { + val_idx = Some(idx); + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Float64 + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Value column '{}' in region {:?} has incompatible type: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + } else { + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Utf8 + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Tag column '{}' in region {:?} must be Utf8, found: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + tag_indices.push(idx); + } + } + + let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu { + reason: format!( + "Timestamp column '{}' not found in RecordBatch for region {:?}", + greptime_timestamp(), + logical_region_id + ), + })?; + let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu { + reason: format!( + "Value column '{}' not found in RecordBatch for region {:?}", + greptime_value(), + logical_region_id + ), + })?; + + let mut schema = Vec::with_capacity(2 + tag_indices.len()); + schema.push(api::v1::ColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + options: None, + }); + schema.push(api::v1::ColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + options: None, + }); + for &idx in &tag_indices { + let field = &fields[idx]; + schema.push(api::v1::ColumnSchema { + column_name: field.name().clone(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + options: None, + }); + } + + let ts_array = batch + .column(ts_idx) + .as_any() + .downcast_ref::() + .expect("validated as TimestampMillisecond"); + let val_array = batch + .column(val_idx) + .as_any() + .downcast_ref::() + .expect("validated as Float64"); + let tag_arrays: Vec<&StringArray> = tag_indices + .iter() + .map(|&idx| { + batch + .column(idx) + .as_any() + .downcast_ref::() + .expect("validated as Utf8") + }) + .collect(); + + let num_rows = batch.num_rows(); + let mut rows = Vec::with_capacity(num_rows); + for row_idx in 0..num_rows { + let mut values = Vec::with_capacity(2 + tag_arrays.len()); + + if ts_array.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + ts_array.value(row_idx), + )), + }); + } + + if val_array.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value( + val_array.value(row_idx), + )), + }); + } + + for arr in &tag_arrays { + if arr.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue( + arr.value(row_idx).to_string(), + )), + }); + } + } + + rows.push(api::v1::Row { values }); + } + + Ok(api::v1::Rows { schema, rows }) +} + +fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> { + let mut encoder = FlightEncoder::default(); + let schema = encoder.encode_schema(record_batch.schema().as_ref()); + let mut iter = encoder + .encode(FlightMessage::RecordBatch(record_batch.clone())) + .into_iter(); + + let Some(flight_data) = iter.next() else { + return error::UnexpectedRequestSnafu { + reason: "Failed to encode empty flight data", + } + .fail(); + }; + ensure!( + iter.next().is_none(), + error::UnexpectedRequestSnafu { + reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(), + } + ); + + Ok(( + schema.data_header, + flight_data.data_header, + flight_data.data_body, + )) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use api::v1::ArrowIpc; + use common_error::ext::ErrorExt; + use common_query::prelude::{greptime_timestamp, greptime_value}; + use common_recordbatch::RecordBatches; + use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING; + use store_api::path_utils::table_dir; + use store_api::region_engine::RegionEngine; + use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest}; + use store_api::storage::{RegionId, ScanRequest}; + + use super::record_batch_to_ipc; + use crate::error::Error; + use crate::test_util::{self, TestEnv}; + + fn build_logical_batch(start: usize, rows: usize) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("job", DataType::Utf8, true), + ])); + + let mut ts = Vec::with_capacity(rows); + let mut values = Vec::with_capacity(rows); + let mut tags = Vec::with_capacity(rows); + for i in start..start + rows { + ts.push(i as i64); + values.push(i as f64); + tags.push("tag_0".to_string()); + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(ts)), + Arc::new(Float64Array::from(values)), + Arc::new(StringArray::from(tags)), + ], + ) + .unwrap() + } + + fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest { + let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap(); + RegionRequest::BulkInserts(RegionBulkInsertsRequest { + region_id: logical_region_id, + payload: batch, + raw_data: ArrowIpc { + schema, + data_header, + payload, + }, + partition_expr_version: None, + }) + } + + async fn init_dense_metric_region(env: &TestEnv) -> RegionId { + let physical_region_id = env.default_physical_region_id(); + env.create_physical_region( + physical_region_id, + &TestEnv::default_table_dir(), + vec![( + MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(), + "dense".to_string(), + )], + ) + .await; + + let logical_region_id = env.default_logical_region_id(); + let request = test_util::create_logical_region_request( + &["job"], + physical_region_id, + &table_dir("test", logical_region_id.table_id()), + ); + env.metric() + .handle_request(logical_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + logical_region_id + } + + #[tokio::test] + async fn test_bulk_insert_empty_batch_returns_zero() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let batch = build_logical_batch(0, 0); + let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest { + region_id: logical_region_id, + payload: batch, + raw_data: ArrowIpc::default(), + partition_expr_version: None, + }); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 0); + } + + #[tokio::test] + async fn test_bulk_insert_physical_region_rejected() { + let env = TestEnv::new().await; + env.init_metric_region().await; + + let physical_region_id = env.default_physical_region_id(); + let batch = build_logical_batch(0, 2); + let request = build_bulk_request(physical_region_id, batch); + + let err = env + .metric() + .handle_request(physical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::UnsupportedRegionRequest { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_unknown_column_errors() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("nonexistent_column", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0i64])), + Arc::new(Float64Array::from(vec![1.0])), + Arc::new(StringArray::from(vec!["val"])), + ], + ) + .unwrap(); + + let request = build_bulk_request(logical_region_id, batch); + let err = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::ColumnNotFound { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_multiple_tag_columns() { + let env = TestEnv::new().await; + let physical_region_id = env.default_physical_region_id(); + env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![]) + .await; + let logical_region_id = env.default_logical_region_id(); + let request = test_util::create_logical_region_request( + &["host", "region"], + physical_region_id, + &table_dir("test", logical_region_id.table_id()), + ); + env.metric() + .handle_request(logical_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("region", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + Arc::new(StringArray::from(vec!["h1", "h2", "h1"])), + Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])), + ], + ) + .unwrap(); + + let request = build_bulk_request(logical_region_id, batch); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 3); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 3); + } + + #[tokio::test] + async fn test_bulk_insert_accumulates_rows() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 3); + + let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 5); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 8); + } + + #[tokio::test] + async fn test_bulk_insert_sparse_encoding() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 4); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 4); + } + + #[tokio::test] + async fn test_bulk_insert_dense_encoding_rejected() { + let env = TestEnv::new().await; + let logical_region_id = init_dense_metric_region(&env).await; + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2)); + let err = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::UnsupportedRegionRequest { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_matches_put() { + let env_put = TestEnv::new().await; + env_put.init_metric_region().await; + let logical_region_id = env_put.default_logical_region_id(); + let schema = test_util::row_schema_with_tags(&["job"]); + let rows = test_util::build_rows(1, 5); + env_put + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: api::v1::Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap(); + let put_stream = env_put + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let put_batches = RecordBatches::try_collect(put_stream).await.unwrap(); + let put_output = put_batches.pretty_print().unwrap(); + + let env_bulk = TestEnv::new().await; + env_bulk.init_metric_region().await; + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5)); + env_bulk + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + let bulk_stream = env_bulk + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap(); + let bulk_output = bulk_batches.pretty_print().unwrap(); + + assert_eq!(put_output, bulk_output); + } + + #[test] + fn test_record_batch_to_rows_with_null_values() { + use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::storage::RegionId; + + use crate::engine::bulk_insert::record_batch_to_rows; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("job", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + + let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]); + let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]); + let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]); + let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(ts_array), + Arc::new(val_array), + Arc::new(job_array), + Arc::new(host_array), + ], + ) + .unwrap(); + + let region_id = RegionId::new(1, 1); + let rows = record_batch_to_rows(&batch, region_id).unwrap(); + + assert_eq!(rows.rows.len(), 3); + assert_eq!(rows.schema.len(), 4); + + // Row 0: all non-null except host + assert!(rows.rows[0].values[0].value_data.is_some()); + assert!(rows.rows[0].values[1].value_data.is_some()); + assert!(rows.rows[0].values[2].value_data.is_some()); + assert!(rows.rows[0].values[3].value_data.is_none()); + + // Row 1: null timestamp, null job + assert!(rows.rows[1].values[0].value_data.is_none()); + assert!(rows.rows[1].values[1].value_data.is_some()); + assert!(rows.rows[1].values[2].value_data.is_none()); + assert!(rows.rows[1].values[3].value_data.is_some()); + + // Row 2: null value + assert!(rows.rows[2].values[0].value_data.is_some()); + assert!(rows.rows[2].values[1].value_data.is_none()); + assert!(rows.rows[2].values[2].value_data.is_some()); + assert!(rows.rows[2].values[3].value_data.is_some()); + } +} diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 9251605aea..edae0d2bb4 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -460,7 +460,7 @@ impl MetricEngineInner { .await } - fn find_data_region_meta( + pub(crate) fn find_data_region_meta( &self, logical_region_id: RegionId, ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> { diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 30daa80b91..b93029f2f4 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -52,6 +52,7 @@ #![feature(assert_matches)] +mod batch_modifier; pub mod config; mod data_region; pub mod engine; From cc441b564238562b25767be31c5d93d86c3fdc00 Mon Sep 17 00:00:00 2001 From: ZonaHe Date: Wed, 18 Mar 2026 02:25:14 +0800 Subject: [PATCH 7/7] feat: update dashboard to v0.12.0 (#7823) Co-authored-by: sunchanglong --- src/servers/dashboard/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/servers/dashboard/VERSION b/src/servers/dashboard/VERSION index 03ee1a5314..87a1cf595a 100644 --- a/src/servers/dashboard/VERSION +++ b/src/servers/dashboard/VERSION @@ -1 +1 @@ -v0.11.13 +v0.12.0