diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 55bb41ee51..115636821b 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -178,3 +178,7 @@ harness = false [[bench]] name = "loki_labels" harness = false + +[[bench]] +name = "flush_batch_physical" +harness = false diff --git a/src/servers/benches/flush_batch_physical.rs b/src/servers/benches/flush_batch_physical.rs new file mode 100644 index 0000000000..a3d190adf2 --- /dev/null +++ b/src/servers/benches/flush_batch_physical.rs @@ -0,0 +1,289 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::region::RegionResponse; +use api::v1::meta::Peer; +use api::v1::region::RegionRequest; +use arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; +use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use catalog::error::Result as CatalogResult; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema}; +use partition::error::Result as PartitionResult; +use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask}; +use servers::error::{self, Result}; +use servers::pending_rows_batcher::{ + PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester, PhysicalFlushPartitionProvider, + PhysicalTableMetadata, TableBatch, flush_batch_physical, +}; +use store_api::storage::RegionId; +use table::test_util::table_info::test_table_info; +use tokio::runtime::Runtime; + +// --------------------------------------------------------------------------- +// Mock implementations (memory-backed, no I/O) +// --------------------------------------------------------------------------- + +struct BenchCatalogProvider { + table: PhysicalTableMetadata, +} + +#[async_trait] +impl PhysicalFlushCatalogProvider for BenchCatalogProvider { + async fn physical_table( + &self, + _catalog: &str, + _schema: &str, + _table_name: &str, + _query_ctx: &session::context::QueryContext, + ) -> CatalogResult> { + Ok(Some(self.table.clone())) + } +} + +struct BenchPartitionProvider; + +struct SingleRegionPartitionRule; + +impl PartitionRule for SingleRegionPartitionRule { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn partition_columns(&self) -> &[String] { + &[] + } + + fn find_region( + &self, + _values: &[datatypes::prelude::Value], + ) -> PartitionResult { + Ok(1) + } + + fn split_record_batch( + &self, + record_batch: &RecordBatch, + ) -> PartitionResult> { + let n = record_batch.num_rows(); + Ok(HashMap::from([( + 1, + RegionMask::new(arrow::array::BooleanArray::from(vec![true; n]), n), + )])) + } +} + +#[async_trait] +impl PhysicalFlushPartitionProvider for BenchPartitionProvider { + async fn find_table_partition_rule( + &self, + _table_info: &table::metadata::TableInfo, + ) -> PartitionResult { + Ok(Arc::new(SingleRegionPartitionRule)) + } + + async fn find_region_leader(&self, _region_id: RegionId) -> Result { + Ok(Peer { + id: 1, + addr: "bench-node".to_string(), + }) + } +} + +struct BenchNodeRequester; + +#[async_trait] +impl PhysicalFlushNodeRequester for BenchNodeRequester { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> error::Result { + Ok(RegionResponse::new(0)) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn make_physical_table_metadata(num_tags: usize) -> PhysicalTableMetadata { + let mut columns = vec![ + DtColumnSchema::new("__primary_key", ConcreteDataType::binary_datatype(), false), + DtColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + DtColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true), + ]; + + let mut name_to_ids = HashMap::new(); + let mut column_ids = vec![0u32, 1, 2]; + + for i in 0..num_tags { + let tag_name = format!("tag{}", i); + let col_id = (i + 3) as u32; + columns.push(DtColumnSchema::new( + &tag_name, + ConcreteDataType::string_datatype(), + true, + )); + name_to_ids.insert(tag_name, col_id); + column_ids.push(col_id); + } + + let schema = Arc::new(DtSchema::try_new(columns).unwrap()); + let mut table_info = test_table_info(1, "phy", "public", "greptime", schema); + table_info.meta.column_ids = column_ids; + + PhysicalTableMetadata { + table_info: Arc::new(table_info), + col_name_to_ids: Some(name_to_ids), + } +} + +fn make_tag_batch(tag_names: &[&str], num_rows: usize) -> RecordBatch { + let mut fields = vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + ]; + for tag in tag_names { + fields.push(Field::new(*tag, ArrowDataType::Utf8, true)); + } + + let schema = Arc::new(ArrowSchema::new(fields)); + + let ts: Vec = (0..num_rows as i64).collect(); + let vals: Vec = (0..num_rows).map(|i| i as f64).collect(); + + let mut arrays: Vec> = vec![ + Arc::new(TimestampMillisecondArray::from(ts)), + Arc::new(Float64Array::from(vals)), + ]; + + for (tag_idx, _tag) in tag_names.iter().enumerate() { + let values: Vec = (0..num_rows) + .map(|i| format!("val-{}-{}", tag_idx, i)) + .collect(); + arrays.push(Arc::new(StringArray::from(values))); + } + + RecordBatch::try_new(schema, arrays).unwrap() +} + +fn make_table_batches( + num_logical_tables: usize, + rows_per_table: usize, + tag_names: &[&str], +) -> Vec { + (0..num_logical_tables) + .map(|i| { + let batch = make_tag_batch(tag_names, rows_per_table); + let row_count = batch.num_rows(); + TableBatch { + table_name: format!("logical_{}", i), + table_id: (100 + i) as u32, + batches: vec![batch], + row_count, + } + }) + .collect() +} + +// --------------------------------------------------------------------------- +// Benchmarks +// --------------------------------------------------------------------------- + +fn bench_flush_batch_physical(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let ctx = session::context::QueryContext::arc(); + + let num_tags = 5; + let tag_names: Vec = (0..num_tags).map(|i| format!("tag{}", i)).collect(); + let tag_refs: Vec<&str> = tag_names.iter().map(|s| s.as_str()).collect(); + + let catalog_provider = BenchCatalogProvider { + table: make_physical_table_metadata(num_tags), + }; + let partition_provider = BenchPartitionProvider; + let node_requester = BenchNodeRequester; + + let mut group = c.benchmark_group("flush_batch_physical"); + + // Vary the number of logical tables + for num_tables in [1, 10, 50, 100] { + let rows_per_table = 100; + let table_batches = make_table_batches(num_tables, rows_per_table, &tag_refs); + + group.bench_with_input( + BenchmarkId::new("tables", num_tables), + &table_batches, + |b, batches| { + b.iter(|| { + rt.block_on(async { + flush_batch_physical( + batches, + "phy", + &ctx, + &partition_provider, + &node_requester, + &catalog_provider, + ) + .await + .unwrap(); + }); + }); + }, + ); + } + + // Vary the number of rows per table + for rows_per_table in [10, 100, 1000, 5000] { + let num_tables = 10; + let table_batches = make_table_batches(num_tables, rows_per_table, &tag_refs); + + group.bench_with_input( + BenchmarkId::new("rows_per_table", rows_per_table), + &table_batches, + |b, batches| { + b.iter(|| { + rt.block_on(async { + flush_batch_physical( + batches, + "phy", + &ctx, + &partition_provider, + &node_requester, + &catalog_provider, + ) + .await + .unwrap(); + }); + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_flush_batch_physical); +criterion_main!(benches); diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 682288b271..8a3c554058 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::net::SocketAddr; use std::string::FromUtf8Error; +use std::sync::Arc; use axum::http::StatusCode as HttpStatusCode; use axum::response::{IntoResponse, Response}; @@ -51,6 +52,8 @@ pub enum Error { Arrow { #[snafu(source)] error: arrow_schema::ArrowError, + #[snafu(implicit)] + location: Location, }, #[snafu(display("Internal error: {}", err_msg))] @@ -685,6 +688,23 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + Partition { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(transparent)] + MetricEngine { + source: metric_engine::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to submit batch: {}", source))] + SubmitBatch { source: Arc }, } pub type Result = std::result::Result; @@ -818,6 +838,9 @@ impl ErrorExt for Error { MemoryLimitExceeded { .. } => StatusCode::RateLimited, GreptimeProto { source, .. } => source.status_code(), + Partition { source, .. } => source.status_code(), + MetricEngine { source, .. } => source.status_code(), + SubmitBatch { source, .. } => source.status_code(), } } diff --git a/src/servers/src/pending_rows_batcher.rs b/src/servers/src/pending_rows_batcher.rs index b6e07d2a81..4cd8331636 100644 --- a/src/servers/src/pending_rows_batcher.rs +++ b/src/servers/src/pending_rows_batcher.rs @@ -31,15 +31,17 @@ use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_meta::node_manager::NodeManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value}; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{debug, warn}; use dashmap::DashMap; use dashmap::mapref::entry::Entry; use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse}; use partition::manager::PartitionRuleManagerRef; +use partition::partition::PartitionRuleRef; use session::context::QueryContextRef; use smallvec::SmallVec; -use snafu::{OptionExt, ensure}; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::{RegionId, TableId}; +use table::metadata::{TableInfo, TableInfoRef}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot}; use crate::error; @@ -86,6 +88,116 @@ pub trait PendingRowsSchemaAlterer: Send + Sync { pub type PendingRowsSchemaAltererRef = Arc; +#[derive(Clone)] +pub struct PhysicalTableMetadata { + pub table_info: TableInfoRef, + /// Mapping from column name to column id + pub col_name_to_ids: Option>, +} + +#[async_trait] +pub trait PhysicalFlushCatalogProvider: Send + Sync { + async fn physical_table( + &self, + catalog: &str, + schema: &str, + table_name: &str, + query_ctx: &session::context::QueryContext, + ) -> catalog::error::Result>; +} + +#[async_trait] +pub trait PhysicalFlushPartitionProvider: Send + Sync { + async fn find_table_partition_rule( + &self, + table_info: &TableInfo, + ) -> partition::error::Result; + + async fn find_region_leader(&self, region_id: RegionId) -> Result; +} + +#[async_trait] +pub trait PhysicalFlushNodeRequester: Send + Sync { + async fn handle( + &self, + peer: &Peer, + request: RegionRequest, + ) -> Result; +} + +#[derive(Clone)] +struct CatalogManagerPhysicalFlushAdapter { + catalog_manager: CatalogManagerRef, +} + +#[async_trait] +impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter { + async fn physical_table( + &self, + catalog: &str, + schema: &str, + table_name: &str, + query_ctx: &session::context::QueryContext, + ) -> catalog::error::Result> { + self.catalog_manager + .table(catalog, schema, table_name, Some(query_ctx)) + .await + .map(|table| { + table.map(|table| { + let table_info = table.table_info(); + let name_to_ids = table_info.name_to_ids(); + PhysicalTableMetadata { + table_info, + col_name_to_ids: name_to_ids, + } + }) + }) + } +} + +#[derive(Clone)] +struct PartitionManagerPhysicalFlushAdapter { + partition_manager: PartitionRuleManagerRef, +} + +#[async_trait] +impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter { + async fn find_table_partition_rule( + &self, + table_info: &TableInfo, + ) -> partition::error::Result { + self.partition_manager + .find_table_partition_rule(table_info) + .await + .map(|(rule, _)| rule) + } + + async fn find_region_leader(&self, region_id: RegionId) -> Result { + let peer = self.partition_manager.find_region_leader(region_id).await?; + Ok(peer) + } +} + +#[derive(Clone)] +struct NodeManagerPhysicalFlushAdapter { + node_manager: NodeManagerRef, +} + +#[async_trait] +impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter { + async fn handle( + &self, + peer: &Peer, + request: RegionRequest, + ) -> error::Result { + let datanode = self.node_manager.datanode(peer).await; + datanode + .handle(request) + .await + .context(error::CommonMetaSnafu) + } +} + #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct BatchKey { catalog: String, @@ -94,11 +206,11 @@ struct BatchKey { } #[derive(Debug)] -struct TableBatch { - table_name: String, - table_id: TableId, - batches: Vec, - row_count: usize, +pub struct TableBatch { + pub table_name: String, + pub table_id: TableId, + pub batches: Vec, + pub row_count: usize, } /// Intermediate planning state for resolving and preparing logical tables @@ -114,14 +226,14 @@ struct TableResolutionPlan { struct PendingBatch { tables: HashMap, - created_at: Option, + created_at: Instant, total_row_count: usize, - ctx: Option, + ctx: QueryContextRef, waiters: Vec, } struct FlushWaiter { - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>>, _permit: OwnedSemaphorePermit, } @@ -142,7 +254,7 @@ enum WorkerCommand { table_batches: Vec<(String, u32, RecordBatch)>, total_rows: usize, ctx: QueryContextRef, - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>>, _permit: OwnedSemaphorePermit, }, } @@ -301,7 +413,9 @@ impl PendingRowsBatcher { .await .map_err(|_| error::BatcherChannelClosedSnafu.build())? }; - result.map(|()| total_rows as u64) + result + .context(error::SubmitBatchSnafu) + .map(|()| total_rows as u64) } else { Ok(total_rows as u64) } @@ -706,12 +820,12 @@ impl Drop for PendingRowsBatcher { } impl PendingBatch { - fn new() -> Self { + fn new(ctx: QueryContextRef) -> Self { Self { tables: HashMap::new(), - created_at: None, + created_at: Instant::now(), total_row_count: 0, - ctx: None, + ctx, waiters: Vec::new(), } } @@ -733,7 +847,7 @@ fn start_worker( flush_semaphore: Arc, ) { tokio::spawn(async move { - let mut batch = PendingBatch::new(); + let mut batch = None; let mut interval = tokio::time::interval(flush_interval); let mut shutdown_rx = shutdown.subscribe(); let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout; @@ -747,16 +861,15 @@ fn start_worker( Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => { idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout); - if batch.total_row_count == 0 { - batch.created_at = Some(Instant::now()); - batch.ctx = Some(ctx); + let pending_batch = batch.get_or_insert_with(||{ PENDING_BATCHES.inc(); - } + PendingBatch::new(ctx) + }); - batch.waiters.push(FlushWaiter { response_tx, _permit }); + pending_batch.waiters.push(FlushWaiter { response_tx, _permit }); for (table_name, table_id, record_batch) in table_batches { - let entry = batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch { + let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch { table_name, table_id, batches: Vec::new(), @@ -766,10 +879,10 @@ fn start_worker( entry.batches.push(record_batch); } - batch.total_row_count += total_rows; + pending_batch.total_row_count += total_rows; PENDING_ROWS.add(total_rows as i64); - if batch.total_row_count >= max_batch_rows + if pending_batch.total_row_count >= max_batch_rows && let Some(flush) = drain_batch(&mut batch) { spawn_flush( flush, @@ -794,7 +907,10 @@ fn start_worker( } } _ = &mut idle_timer => { - if !should_close_worker_on_idle_timeout(batch.total_row_count, rx.len()) { + if !should_close_worker_on_idle_timeout( + batch.as_ref().map_or(0, |batch| batch.total_row_count), + rx.len(), + ) { idle_timer .as_mut() .reset(tokio::time::Instant::now() + worker_idle_timeout); @@ -810,9 +926,9 @@ fn start_worker( break; } _ = interval.tick() => { - if let Some(created_at) = batch.created_at - && batch.total_row_count > 0 - && created_at.elapsed() >= flush_interval + if batch + .as_ref() + .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval) && let Some(flush) = drain_batch(&mut batch) { spawn_flush( flush, @@ -862,24 +978,16 @@ fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: total_row_count == 0 && queued_requests == 0 } -fn drain_batch(batch: &mut PendingBatch) -> Option { - if batch.total_row_count == 0 { +fn drain_batch(batch: &mut Option) -> Option { + let batch = batch.take()?; + let total_row_count = batch.total_row_count; + + if total_row_count == 0 { return None; } - let ctx = match batch.ctx.take() { - Some(ctx) => ctx, - None => { - flush_with_error(batch, "Pending batch missing context"); - return None; - } - }; - - let total_row_count = batch.total_row_count; - let table_batches = std::mem::take(&mut batch.tables).into_values().collect(); - let waiters = std::mem::take(&mut batch.waiters); - batch.total_row_count = 0; - batch.created_at = None; + let table_batches = batch.tables.into_values().collect(); + let waiters = batch.waiters; PENDING_ROWS.sub(total_row_count as i64); PENDING_BATCHES.dec(); @@ -887,7 +995,7 @@ fn drain_batch(batch: &mut PendingBatch) -> Option { Some(FlushBatch { table_batches, total_row_count, - ctx, + ctx: batch.ctx, waiters, }) } @@ -914,15 +1022,25 @@ async fn spawn_flush( } struct FlushRegionWrite { - region_id: RegionId, - row_count: usize, datanode: Peer, request: RegionRequest, } -enum FlushWriteResult { - Success { row_count: usize }, - Failed { row_count: usize, message: String }, +struct PlannedRegionBatch { + region_id: RegionId, + batch: RecordBatch, +} + +#[cfg(test)] +impl PlannedRegionBatch { + fn num_rows(&self) -> usize { + self.batch.num_rows() + } +} + +struct ResolvedRegionBatch { + planned: PlannedRegionBatch, + datanode: Peer, } fn should_dispatch_concurrently(region_write_count: usize) -> bool { @@ -1045,65 +1163,35 @@ fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result } ); let essential_indices: Vec = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect(); - batch - .project(&essential_indices) - .map_err(|err| Error::Internal { - err_msg: format!("Failed to project essential columns from RecordBatch: {err}"), - }) + batch.project(&essential_indices).context(error::ArrowSnafu) } async fn flush_region_writes_concurrently( - node_manager: NodeManagerRef, + node_manager: &(impl PhysicalFlushNodeRequester + ?Sized), writes: Vec, -) -> Vec { +) -> Result<()> { if !should_dispatch_concurrently(writes.len()) { - let mut results = Vec::with_capacity(writes.len()); for write in writes { - let datanode = node_manager.datanode(&write.datanode).await; let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_write_region"]) .start_timer(); - match datanode.handle(write.request).await { - Ok(_) => results.push(FlushWriteResult::Success { - row_count: write.row_count, - }), - Err(err) => results.push(FlushWriteResult::Failed { - row_count: write.row_count, - message: format!( - "Bulk insert flush failed for region {}: {:?}", - write.region_id, err - ), - }), - } + node_manager.handle(&write.datanode, write.request).await?; } - return results; + return Ok(()); } - let write_futures = writes.into_iter().map(|write| { - let node_manager = node_manager.clone(); - async move { - let datanode = node_manager.datanode(&write.datanode).await; - let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED - .with_label_values(&["flush_write_region"]) - .start_timer(); + let write_futures = writes.into_iter().map(|write| async move { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_write_region"]) + .start_timer(); - match datanode.handle(write.request).await { - Ok(_) => FlushWriteResult::Success { - row_count: write.row_count, - }, - Err(err) => FlushWriteResult::Failed { - row_count: write.row_count, - message: format!( - "Bulk insert flush failed for region {}: {:?}", - write.region_id, err - ), - }, - } - } + node_manager.handle(&write.datanode, write.request).await?; + Ok::<_, Error>(()) }); // todo(hl): should be bounded. - futures::future::join_all(write_futures).await + futures::future::try_join_all(write_futures).await?; + Ok(()) } async fn flush_batch( @@ -1119,7 +1207,6 @@ async fn flush_batch( waiters, } = flush; let start = Instant::now(); - let mut first_error: Option = None; // Physical-table-level flush: transform all logical table batches // into physical format and write them together. @@ -1127,169 +1214,148 @@ async fn flush_batch( .extension(PHYSICAL_TABLE_KEY) .unwrap_or(GREPTIME_PHYSICAL_TABLE) .to_string(); - flush_batch_physical( + let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager }; + let node_requester = NodeManagerPhysicalFlushAdapter { node_manager }; + let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager }; + let result = flush_batch_physical( &table_batches, - total_row_count, &physical_table_name, &ctx, - &partition_manager, - &node_manager, - &catalog_manager, - &mut first_error, + &partition_provider, + &node_requester, + &catalog_provider, ) .await; let elapsed = start.elapsed().as_secs_f64(); FLUSH_ELAPSED.observe(elapsed); + + if result.is_err() { + FLUSH_FAILURES.inc(); + FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64); + } else { + FLUSH_TOTAL.inc(); + FLUSH_ROWS.observe(total_row_count as f64); + } + debug!( "Pending rows batch flushed, total rows: {}, elapsed time: {}s", total_row_count, elapsed ); - notify_waiters(waiters, &first_error); + notify_waiters(waiters, result); } -/// Attempts to flush all table batches by transforming them into the physical -/// table format (sparse primary key encoding) and writing directly to the -/// physical data regions. +/// Flushes a batch of logical table rows by transforming them into the physical table format +/// and writing them to the appropriate datanode regions. /// -/// This is the only flush path. Any failure in resolving or transforming the -/// physical flush inputs is recorded as flush failure and reported to waiters. -#[allow(clippy::too_many_arguments)] -async fn flush_batch_physical( +/// This function performs the end-to-end physical flush pipeline: +/// 1. Resolves the physical table metadata and column ID mapping. +/// 2. Fetches the physical table's partition rule. +/// 3. Transforms each logical table batch into the physical (sparse primary key) format. +/// 4. Concatenates all transformed batches into a single combined batch. +/// 5. Splits the combined batch by partition rule and sends region write requests +/// concurrently to the target datanodes. +pub async fn flush_batch_physical( table_batches: &[TableBatch], - total_row_count: usize, physical_table_name: &str, ctx: &QueryContextRef, - partition_manager: &PartitionRuleManagerRef, - node_manager: &NodeManagerRef, - catalog_manager: &CatalogManagerRef, - first_error: &mut Option, -) { - macro_rules! record_failure { - ($row_count:expr, $msg:expr) => {{ - let msg = $msg; - if first_error.is_none() { - *first_error = Some(msg.clone()); - } - mark_flush_failure($row_count, &msg); - }}; - } - + partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized), + node_manager: &(impl PhysicalFlushNodeRequester + ?Sized), + catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized), +) -> Result<()> { // 1. Resolve the physical table and get column ID mapping let physical_table = { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_physical_resolve_table"]) .start_timer(); - match catalog_manager - .table( + catalog_manager + .physical_table( ctx.current_catalog(), &ctx.current_schema(), physical_table_name, - Some(ctx.as_ref()), + ctx.as_ref(), ) - .await - { - Ok(Some(table)) => table, - Ok(None) => { - record_failure!( - total_row_count, - format!( - "Physical table '{}' not found during pending flush", - physical_table_name - ) - ); - return; - } - Err(err) => { - record_failure!( - total_row_count, - format!( - "Failed to resolve physical table '{}' for pending flush: {:?}", - physical_table_name, err - ) - ); - return; - } - } + .await? + .with_context(|| error::InternalSnafu { + err_msg: format!( + "Physical table '{}' not found during pending flush", + physical_table_name + ), + })? }; - let physical_table_info = physical_table.table_info(); - let name_to_ids = match physical_table_info.name_to_ids() { - Some(ids) => ids, - None => { - record_failure!( - total_row_count, - format!( - "Physical table '{}' has no column IDs for pending flush", - physical_table_name - ) - ); - return; - } - }; + let physical_table_info = physical_table.table_info; + let name_to_ids = physical_table + .col_name_to_ids + .with_context(|| error::InternalSnafu { + err_msg: format!( + "Physical table '{}' has no column IDs for pending flush", + physical_table_name + ), + })?; // 2. Get the physical table's partition rule (one lookup instead of N) let partition_rule = { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_physical_fetch_partition_rule"]) .start_timer(); - match partition_manager - .find_table_partition_rule(&physical_table_info) - .await - { - Ok(rule) => rule, - Err(err) => { - record_failure!( - total_row_count, - format!( - "Failed to fetch partition rule for physical table '{}': {:?}", - physical_table_name, err - ) - ); - return; - } - } + partition_manager + .find_table_partition_rule(physical_table_info.as_ref()) + .await? }; - let partition_columns = partition_rule.0.partition_columns(); + let partition_columns = partition_rule.partition_columns(); let partition_columns_set: HashSet<&str> = partition_columns.iter().map(String::as_str).collect(); // 3. Transform each logical table batch into physical format - let mut modified_batches: Vec = Vec::with_capacity(table_batches.len()); - let mut modified_row_count: usize = 0; + let modified_batches = + transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?; + + // 4. Concatenate all modified batches (all share the same physical schema) + let combined_batch = concat_modified_batches(&modified_batches)?; + + // 5. Split by physical partition rule and send to regions + let physical_table_id = physical_table_info.table_id(); + let planned_batches = plan_region_batches( + combined_batch, + physical_table_id, + partition_rule.as_ref(), + partition_columns, + )?; + + let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?; + let region_writes = encode_region_write_requests(resolved_batches)?; + flush_region_writes_concurrently(node_manager, region_writes).await +} + +/// Transforms logical table batches into physical format (sparse primary key encoding). +/// +/// It identifies tag columns and essential columns (timestamp, value) for each logical batch +/// and applies sparse primary key modification. +fn transform_logical_batches_to_physical( + table_batches: &[TableBatch], + name_to_ids: &HashMap, + partition_columns_set: &HashSet<&str>, +) -> Result> { + let mut modified_batches: Vec = + Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum()); let mut modify_elapsed = Duration::ZERO; let mut columns_taxonomy_elapsed = Duration::ZERO; - 'next_table: for table_batch in table_batches { + for table_batch in table_batches { let table_id = table_batch.table_id; - // Transform each chunk to physical format directly, avoiding an - // intermediate concat_batches per logical table. for batch in &table_batch.batches { - // Identify tag columns and non-tag columns from the logical batch schema. - // Chunks within a table_batch may have different schemas if new tag columns - // are added between submits. - // In prom batches, Float64 = value, Timestamp = timestamp, Utf8 = tags. let batch_schema = batch.schema(); let start = Instant::now(); - let (tag_columns, essential_col_indices) = match columns_taxonomy( + let (tag_columns, essential_col_indices) = columns_taxonomy( &batch_schema, &table_batch.table_name, - &name_to_ids, - &partition_columns_set, - ) { - Ok(columns) => columns, - Err(err) => { - warn!( - "Failed to resolve columns for logical table '{}': {:?}", - table_batch.table_name, err - ); - record_failure!(table_batch.row_count, err.to_string()); - continue 'next_table; - } - }; + name_to_ids, + partition_columns_set, + )?; columns_taxonomy_elapsed += start.elapsed(); if tag_columns.is_empty() && essential_col_indices.is_empty() { @@ -1298,30 +1364,16 @@ async fn flush_batch_physical( let modified = { let start = Instant::now(); - match modify_batch_sparse( + let batch = modify_batch_sparse( batch.clone(), table_id, &tag_columns, &essential_col_indices, - ) { - Ok(batch) => { - modify_elapsed += start.elapsed(); - batch - } - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to modify batch for logical table '{}': {:?}", - table_batch.table_name, err - ) - ); - continue 'next_table; - } - } + )?; + modify_elapsed += start.elapsed(); + batch }; - modified_row_count += modified.num_rows(); modified_batches.push(modified); } } @@ -1333,147 +1385,130 @@ async fn flush_batch_physical( .with_label_values(&["flush_physical_columns_taxonomy"]) .observe(columns_taxonomy_elapsed.as_secs_f64()); - if modified_batches.is_empty() { - if first_error.is_none() { - record_failure!( - total_row_count, - format!( - "No batches can be transformed for physical table '{}' during pending flush", - physical_table_name - ) - ); + ensure!( + !modified_batches.is_empty(), + error::InternalSnafu { + err_msg: "No batches can be transformed during pending flush", } - return; + ); + Ok(modified_batches) +} + +/// Concatenates all modified batches into a single large batch. +/// +/// All modified batches share the same physical schema. +fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result { + let combined_schema = modified_batches[0].schema(); + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_concat_all"]) + .start_timer(); + concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu) +} + +fn split_combined_batch_by_region( + combined_batch: &RecordBatch, + partition_rule: &dyn partition::partition::PartitionRule, +) -> Result> { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_split_record_batch"]) + .start_timer(); + let map = partition_rule.split_record_batch(combined_batch)?; + Ok(map) +} + +fn prepare_physical_region_routing_batch( + combined_batch: RecordBatch, + partition_columns: &[String], +) -> Result { + if partition_columns.is_empty() { + return Ok(combined_batch); + } + strip_partition_columns_from_batch(combined_batch) +} + +fn plan_region_batch( + stripped_batch: &RecordBatch, + physical_table_id: TableId, + region_number: u32, + mask: &partition::partition::RegionMask, +) -> Result> { + if mask.select_none() { + return Ok(None); } - // 4. Concatenate all modified batches (all share the same physical schema) - let combined_batch = { - let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED - .with_label_values(&["flush_physical_concat_all"]) - .start_timer(); - let combined_schema = modified_batches[0].schema(); - // todo(hl): maybe limit max rows to concat. - match concat_batches(&combined_schema, &modified_batches) { - Ok(batch) => batch, - Err(err) => { - record_failure!( - modified_row_count, - format!("Failed to concat modified batches: {:?}", err) - ); - return; - } - } - }; - - // 5. Split by physical partition rule and send to regions - let physical_table_id = physical_table_info.table_id(); - let region_masks = { - let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED - .with_label_values(&["flush_physical_split_record_batch"]) - .start_timer(); - match partition_rule.0.split_record_batch(&combined_batch) { - Ok(masks) => masks, - Err(err) => { - record_failure!( - total_row_count, - format!( - "Failed to split combined batch for physical table '{}': {:?}", - physical_table_name, err - ) - ); - return; - } - } - }; - - let stripped_batch = if partition_columns.is_empty() { - combined_batch + let region_batch = if mask.select_all() { + stripped_batch.clone() } else { - // Strip partition columns before encoding and sending requests. - match strip_partition_columns_from_batch(combined_batch) { - Ok(batch) => batch, - Err(err) => { - record_failure!( - total_row_count, - format!( - "Failed to strip partition columns for physical table '{}': {:?}", - physical_table_name, err - ) - ); - return; - } - } + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_filter_record_batch"]) + .start_timer(); + filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)? }; - let mut region_writes = Vec::new(); + let row_count = region_batch.num_rows(); + if row_count == 0 { + return Ok(None); + } + + Ok(Some(PlannedRegionBatch { + region_id: RegionId::new(physical_table_id, region_number), + batch: region_batch, + })) +} + +fn plan_region_batches( + combined_batch: RecordBatch, + physical_table_id: TableId, + partition_rule: &dyn partition::partition::PartitionRule, + partition_columns: &[String], +) -> Result> { + let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?; + let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?; + + let mut planned_batches = Vec::new(); for (region_number, mask) in region_masks { - if mask.select_none() { - continue; + if let Some(planned_batch) = + plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)? + { + planned_batches.push(planned_batch); } + } - let region_batch = if mask.select_all() { - stripped_batch.clone() - } else { - let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED - .with_label_values(&["flush_physical_filter_record_batch"]) - .start_timer(); - match filter_record_batch(&stripped_batch, mask.array()) { - Ok(batch) => batch, - Err(err) => { - record_failure!( - total_row_count, - format!( - "Failed to filter combined batch for physical table '{}': {:?}", - physical_table_name, err - ) - ); - continue; - } - } - }; + Ok(planned_batches) +} - let row_count = region_batch.num_rows(); - if row_count == 0 { - continue; - } - - let region_id = RegionId::new(physical_table_id, region_number); +async fn resolve_region_targets( + planned_batches: Vec, + partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized), +) -> Result> { + let mut resolved_batches = Vec::with_capacity(planned_batches.len()); + for planned in planned_batches { let datanode = { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_physical_resolve_region_leader"]) .start_timer(); - match partition_manager.find_region_leader(region_id).await { - Ok(peer) => peer, - Err(err) => { - record_failure!( - row_count, - format!( - "Failed to resolve region leader for physical region {}: {:?}", - region_id, err - ) - ); - continue; - } - } + partition_manager + .find_region_leader(planned.region_id) + .await? }; + resolved_batches.push(ResolvedRegionBatch { planned, datanode }); + } + + Ok(resolved_batches) +} + +fn encode_region_write_requests( + resolved_batches: Vec, +) -> Result> { + let mut region_writes = Vec::with_capacity(resolved_batches.len()); + for resolved in resolved_batches { + let region_id = resolved.planned.region_id; let (schema_bytes, data_header, payload) = { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_physical_encode_ipc"]) .start_timer(); - match record_batch_to_ipc(region_batch) { - Ok(encoded) => encoded, - Err(err) => { - record_failure!( - row_count, - format!( - "Failed to encode Arrow IPC for physical region {}: {:?}", - region_id, err - ) - ); - continue; - } - } + record_batch_to_ipc(resolved.planned.batch)? }; let request = RegionRequest { @@ -1493,65 +1528,25 @@ async fn flush_batch_physical( }; region_writes.push(FlushRegionWrite { - region_id, - row_count, - datanode, + datanode: resolved.datanode, request, }); } - for result in flush_region_writes_concurrently(node_manager.clone(), region_writes).await { - match result { - FlushWriteResult::Success { row_count } => { - FLUSH_TOTAL.inc(); - FLUSH_ROWS.observe(row_count as f64); - } - FlushWriteResult::Failed { row_count, message } => { - record_failure!(row_count, message); - } - } - } + Ok(region_writes) } -fn notify_waiters(waiters: Vec, first_error: &Option) { +fn notify_waiters(waiters: Vec, result: Result<()>) { + let shared_result = result.map_err(Arc::new); for waiter in waiters { - let result = match first_error { - Some(err_msg) => Err(Error::Internal { - err_msg: err_msg.clone(), - }), - None => Ok(()), - }; - let _ = waiter.response_tx.send(result); + let _ = waiter.response_tx.send(match &shared_result { + Ok(()) => Ok(()), + Err(error) => Err(Arc::clone(error)), + }); // waiter._permit is dropped here, releasing the inflight semaphore slot } } -fn mark_flush_failure(row_count: usize, message: &str) { - error!("Pending rows batch flush failed, message: {}", message); - FLUSH_FAILURES.inc(); - FLUSH_DROPPED_ROWS.inc_by(row_count as u64); -} - -fn flush_with_error(batch: &mut PendingBatch, message: &str) { - if batch.total_row_count == 0 { - return; - } - - let row_count = batch.total_row_count; - let waiters = std::mem::take(&mut batch.waiters); - batch.tables.clear(); - batch.total_row_count = 0; - batch.created_at = None; - batch.ctx = None; - - PENDING_ROWS.sub(row_count as i64); - PENDING_BATCHES.dec(); - - let err_msg = Some(message.to_string()); - notify_waiters(waiters, &err_msg); - mark_flush_failure(row_count, message); -} - fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> { let mut encoder = FlightEncoder::default(); let schema = encoder.encode_schema(record_batch.schema().as_ref()); @@ -1581,17 +1576,18 @@ mod tests { use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use std::time::Duration; + use std::time::{Duration, Instant}; use api::region::RegionResponse; use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse}; use api::v1::meta::Peer; - use api::v1::region::{InsertRequests, RegionRequest}; + use api::v1::region::{InsertRequests, RegionRequest, region_request}; use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows}; - use arrow::array::{BinaryArray, StringArray, TimestampMillisecondArray}; + use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray}; use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; + use catalog::error::Result as CatalogResult; use common_meta::error::Result as MetaResult; use common_meta::node_manager::{ Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, @@ -1599,17 +1595,28 @@ mod tests { use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use dashmap::DashMap; + use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema}; + use partition::error::Result as PartitionResult; + use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask}; use smallvec::SmallVec; + use snafu::ResultExt; use store_api::storage::RegionId; - use tokio::sync::mpsc; + use table::metadata::TableId; + use table::test_util::table_info::test_table_info; + use tokio::sync::{Semaphore, mpsc, oneshot}; use tokio::time::sleep; use super::{ - BatchKey, Error, FlushRegionWrite, FlushWriteResult, PendingRowsBatcher, PendingWorker, - WorkerCommand, columns_taxonomy, flush_region_writes_concurrently, - remove_worker_if_same_channel, should_close_worker_on_idle_timeout, + BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher, + PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester, + PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch, + ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch, + encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently, + plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout, should_dispatch_concurrently, strip_partition_columns_from_batch, + transform_logical_batches_to_physical, }; + use crate::error; fn mock_rows(row_count: usize, schema_name: &str) -> Rows { Rows { @@ -1621,6 +1628,190 @@ mod tests { } } + fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new(tag_name, ArrowDataType::Utf8, true), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![ts])), + Arc::new(arrow::array::Float64Array::from(vec![val])), + Arc::new(StringArray::from(vec![tag_value])), + ], + ) + .unwrap() + } + + fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata { + let schema = Arc::new( + DtSchema::try_new(vec![ + DtColumnSchema::new( + "__primary_key", + datatypes::prelude::ConcreteDataType::binary_datatype(), + false, + ), + DtColumnSchema::new( + "greptime_timestamp", + datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + DtColumnSchema::new( + "greptime_value", + datatypes::prelude::ConcreteDataType::float64_datatype(), + true, + ), + DtColumnSchema::new( + "tag1", + datatypes::prelude::ConcreteDataType::string_datatype(), + true, + ), + ]) + .unwrap(), + ); + let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema); + table_info.meta.column_ids = vec![0, 1, 2, 3]; + + PhysicalTableMetadata { + table_info: Arc::new(table_info), + col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])), + } + } + + struct MockFlushCatalogProvider { + table: Option, + } + + #[async_trait] + impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider { + async fn physical_table( + &self, + _catalog: &str, + _schema: &str, + _table_name: &str, + _query_ctx: &session::context::QueryContext, + ) -> CatalogResult> { + Ok(self.table.clone()) + } + } + + struct SingleRegionPartitionRule; + + impl PartitionRule for SingleRegionPartitionRule { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn partition_columns(&self) -> &[String] { + &[] + } + + fn find_region( + &self, + _values: &[datatypes::prelude::Value], + ) -> partition::error::Result { + unimplemented!() + } + + fn split_record_batch( + &self, + record_batch: &RecordBatch, + ) -> partition::error::Result> + { + Ok(HashMap::from([( + 1, + RegionMask::new( + arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]), + record_batch.num_rows(), + ), + )])) + } + } + + struct TwoRegionPartitionRule { + partition_columns: Vec, + } + + impl PartitionRule for TwoRegionPartitionRule { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn partition_columns(&self) -> &[String] { + &self.partition_columns + } + + fn find_region( + &self, + _values: &[datatypes::prelude::Value], + ) -> partition::error::Result { + unimplemented!() + } + + fn split_record_batch( + &self, + _record_batch: &RecordBatch, + ) -> partition::error::Result> + { + Ok(HashMap::from([ + (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)), + (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)), + ( + 3, + RegionMask::new(BooleanArray::from(vec![false, false]), 0), + ), + ])) + } + } + + struct MockFlushPartitionProvider { + partition_rule_calls: Arc, + region_leader_calls: Arc, + } + + #[async_trait] + impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider { + async fn find_table_partition_rule( + &self, + _table_info: &table::metadata::TableInfo, + ) -> PartitionResult { + self.partition_rule_calls.fetch_add(1, Ordering::SeqCst); + Ok(Arc::new(SingleRegionPartitionRule)) + } + + async fn find_region_leader(&self, _region_id: RegionId) -> error::Result { + self.region_leader_calls.fetch_add(1, Ordering::SeqCst); + Ok(Peer { + id: 1, + addr: "node-1".to_string(), + }) + } + } + + #[derive(Default)] + struct MockFlushNodeRequester { + writes: Arc, + } + + #[async_trait] + impl PhysicalFlushNodeRequester for MockFlushNodeRequester { + async fn handle( + &self, + _peer: &Peer, + _request: RegionRequest, + ) -> error::Result { + self.writes.fetch_add(1, Ordering::SeqCst); + Ok(RegionResponse::new(0)) + } + } + #[test] fn test_collect_non_empty_table_rows_filters_empty_payloads() { let requests = RowInsertRequests { @@ -1648,6 +1839,38 @@ mod tests { assert_eq!(2, table_rows[0].1.rows.len()); } + #[test] + fn test_drain_batch_takes_initialized_pending_batch_from_option() { + let ctx = session::context::QueryContext::arc(); + let (response_tx, _response_rx) = oneshot::channel(); + let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap(); + let mut batch = Some(PendingBatch { + tables: HashMap::from([( + "cpu".to_string(), + TableBatch { + table_name: "cpu".to_string(), + table_id: 42, + batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)], + row_count: 1, + }, + )]), + created_at: Instant::now(), + total_row_count: 1, + ctx: ctx.clone(), + waiters: vec![FlushWaiter { + response_tx, + _permit: permit, + }], + }); + + let flush = drain_batch(&mut batch).unwrap(); + + assert!(batch.is_none()); + assert_eq!(1, flush.total_row_count); + assert_eq!(1, flush.table_batches.len()); + assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog()); + } + #[derive(Clone)] struct ConcurrentMockDatanode { delay: Duration, @@ -1728,6 +1951,21 @@ mod tests { } } + #[async_trait] + impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager { + async fn handle( + &self, + peer: &Peer, + request: RegionRequest, + ) -> error::Result { + let datanode = self.datanode(peer).await; + datanode + .handle(request) + .await + .context(error::CommonMetaSnafu) + } + } + #[test] fn test_remove_worker_if_same_channel_removes_matching_entry() { let workers = DashMap::new(); @@ -1798,8 +2036,6 @@ mod tests { let writes = vec![ FlushRegionWrite { - region_id: RegionId::new(1024, 1), - row_count: 10, datanode: Peer { id: 1, addr: "node1".to_string(), @@ -1807,8 +2043,6 @@ mod tests { request: RegionRequest::default(), }, FlushRegionWrite { - region_id: RegionId::new(1024, 2), - row_count: 12, datanode: Peer { id: 2, addr: "node2".to_string(), @@ -1817,13 +2051,9 @@ mod tests { }, ]; - let results = flush_region_writes_concurrently(node_manager, writes).await; - assert_eq!(2, results.len()); - assert!( - results - .iter() - .all(|result| matches!(result, FlushWriteResult::Success { .. })) - ); + flush_region_writes_concurrently(node_manager.as_ref(), writes) + .await + .unwrap(); assert!(max_inflight.load(Ordering::SeqCst) >= 2); } @@ -2108,4 +2338,304 @@ mod tests { "PK should be different because batch2 has tag2!" ); } + + #[test] + fn test_transform_logical_batches_to_physical_success() { + let batch = mock_tag_batch("tag1", "v1", 1000, 1.0); + + let table_batches = vec![TableBatch { + table_name: "t1".to_string(), + table_id: 1, + batches: vec![batch], + row_count: 1, + }]; + + let name_to_ids = HashMap::from([("tag1".to_string(), 1)]); + let partition_columns = HashSet::new(); + let modified = + transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns) + .unwrap(); + + assert_eq!(1, modified.len()); + assert_eq!(3, modified[0].num_columns()); + assert_eq!("__primary_key", modified[0].schema().field(0).name()); + assert_eq!("greptime_timestamp", modified[0].schema().field(1).name()); + assert_eq!("greptime_value", modified[0].schema().field(2).name()); + } + + #[test] + fn test_transform_logical_batches_to_physical_taxonomy_failure() { + let batch = mock_tag_batch("tag1", "v1", 1000, 1.0); + + let table_batches = vec![TableBatch { + table_name: "t1".to_string(), + table_id: 1, + batches: vec![batch], + row_count: 1, + }]; + + // tag1 is missing from name_to_ids, causing columns_taxonomy to fail. + let name_to_ids = HashMap::new(); + let partition_columns = HashSet::new(); + let err = + transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns) + .unwrap_err(); + + assert!( + err.to_string() + .contains("not found in physical table column IDs") + ); + } + + #[test] + fn test_transform_logical_batches_to_physical_multiple_batches() { + let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0); + let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0); + + let table_batches = vec![ + TableBatch { + table_name: "t1".to_string(), + table_id: 1, + batches: vec![batch1], + row_count: 1, + }, + TableBatch { + table_name: "t2".to_string(), + table_id: 2, + batches: vec![batch2], + row_count: 1, + }, + ]; + + let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]); + let partition_columns = HashSet::new(); + let modified = + transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns) + .unwrap(); + + assert_eq!(2, modified.len()); + } + + #[test] + fn test_transform_logical_batches_to_physical_mixed_success_failure() { + let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0); + let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0); + + let table_batches = vec![ + TableBatch { + table_name: "t1".to_string(), + table_id: 1, + batches: vec![batch1], + row_count: 1, + }, + TableBatch { + table_name: "t2".to_string(), + table_id: 2, + batches: vec![batch2], + row_count: 1, + }, + ]; + + // tag1 is missing from name_to_ids, causing batch1 to fail. + let name_to_ids = HashMap::from([("tag2".to_string(), 2)]); + let partition_columns = HashSet::new(); + let err = + transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns) + .unwrap_err(); + + assert!(err.to_string().contains("tag1")); + } + + #[tokio::test] + async fn test_flush_batch_physical_uses_mockable_trait_dependencies() { + let table_batches = vec![TableBatch { + table_name: "t1".to_string(), + table_id: 11, + batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)], + row_count: 1, + }]; + let partition_calls = Arc::new(AtomicUsize::new(0)); + let leader_calls = Arc::new(AtomicUsize::new(0)); + let node = MockFlushNodeRequester::default(); + let ctx = session::context::QueryContext::arc(); + + flush_batch_physical( + &table_batches, + "phy", + &ctx, + &MockFlushPartitionProvider { + partition_rule_calls: partition_calls.clone(), + region_leader_calls: leader_calls.clone(), + }, + &node, + &MockFlushCatalogProvider { + table: Some(mock_physical_table_metadata(1024)), + }, + ) + .await + .unwrap(); + + assert_eq!(1, partition_calls.load(Ordering::SeqCst)); + assert_eq!(1, leader_calls.load(Ordering::SeqCst)); + assert_eq!(1, node.writes.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() { + let table_batches = vec![TableBatch { + table_name: "t1".to_string(), + table_id: 11, + batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)], + row_count: 1, + }]; + let partition_calls = Arc::new(AtomicUsize::new(0)); + let leader_calls = Arc::new(AtomicUsize::new(0)); + let node = MockFlushNodeRequester::default(); + let ctx = session::context::QueryContext::arc(); + + let err = flush_batch_physical( + &table_batches, + "missing_phy", + &ctx, + &MockFlushPartitionProvider { + partition_rule_calls: partition_calls.clone(), + region_leader_calls: leader_calls.clone(), + }, + &node, + &MockFlushCatalogProvider { table: None }, + ) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("Physical table 'missing_phy' not found") + ); + assert_eq!(0, partition_calls.load(Ordering::SeqCst)); + assert_eq!(0, leader_calls.load(Ordering::SeqCst)); + assert_eq!(0, node.writes.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_flush_batch_physical_aborts_immediately_on_transform_error() { + let table_batches = vec![ + TableBatch { + table_name: "broken".to_string(), + table_id: 11, + batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)], + row_count: 1, + }, + TableBatch { + table_name: "healthy".to_string(), + table_id: 12, + batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)], + row_count: 1, + }, + ]; + let partition_calls = Arc::new(AtomicUsize::new(0)); + let leader_calls = Arc::new(AtomicUsize::new(0)); + let node = MockFlushNodeRequester::default(); + let ctx = session::context::QueryContext::arc(); + + let err = flush_batch_physical( + &table_batches, + "phy", + &ctx, + &MockFlushPartitionProvider { + partition_rule_calls: partition_calls.clone(), + region_leader_calls: leader_calls.clone(), + }, + &node, + &MockFlushCatalogProvider { + table: Some(mock_physical_table_metadata(1024)), + }, + ) + .await + .unwrap_err(); + + assert!(err.to_string().contains("unknown_tag")); + assert_eq!(1, partition_calls.load(Ordering::SeqCst)); + assert_eq!(0, leader_calls.load(Ordering::SeqCst)); + assert_eq!(0, node.writes.load(Ordering::SeqCst)); + } + + #[test] + fn test_plan_region_batches_splits_and_strips_partition_columns() { + let combined_batch = RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("__primary_key", ArrowDataType::Binary, false), + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])), + vec![ + Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])), + Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])), + Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])), + Arc::new(StringArray::from(vec!["node-1", "node-2"])), + ], + ) + .unwrap(); + let mut planned_batches = plan_region_batches( + combined_batch, + 1024, + &TwoRegionPartitionRule { + partition_columns: vec!["host".to_string()], + }, + &["host".to_string()], + ) + .unwrap(); + planned_batches.sort_by_key(|planned| planned.region_id.region_number()); + + assert_eq!(2, planned_batches.len()); + assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id); + assert_eq!(1, planned_batches[0].num_rows()); + assert_eq!(3, planned_batches[0].batch.num_columns()); + assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id); + assert_eq!(1, planned_batches[1].num_rows()); + assert_eq!(3, planned_batches[1].batch.num_columns()); + } + + #[test] + fn test_encode_region_write_requests_builds_bulk_insert_requests() { + let planned_batch = PlannedRegionBatch { + region_id: RegionId::new(1024, 1), + batch: RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("__primary_key", ArrowDataType::Binary, false), + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + ])), + vec![ + Arc::new(BinaryArray::from(vec![b"k1".as_slice()])), + Arc::new(TimestampMillisecondArray::from(vec![1000_i64])), + Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])), + ], + ) + .unwrap(), + }; + let resolved_batch = ResolvedRegionBatch { + planned: planned_batch, + datanode: Peer { + id: 1, + addr: "node-1".to_string(), + }, + }; + let writes = encode_region_write_requests(vec![resolved_batch]).unwrap(); + + assert_eq!(1, writes.len()); + assert_eq!(1, writes[0].datanode.id); + let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else { + panic!("expected bulk insert request"); + }; + assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id); + } }