From abf4623440a2134cae2d295db197574d5296c019 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 11 May 2026 18:22:40 +0800 Subject: [PATCH] refactor: store the schema of flat source (#8091) * refactor: store the schema of flat source Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong * fix ci Signed-off-by: luofucong --------- Signed-off-by: luofucong --- config/config.md | 11 +----- src/cmd/src/datanode/objbench.rs | 4 ++- src/mito2/src/cache/write_cache.rs | 9 +++-- src/mito2/src/compaction.rs | 11 ++++-- src/mito2/src/compaction/compactor.rs | 4 +-- src/mito2/src/flush.rs | 35 ++++++++----------- src/mito2/src/read.rs | 49 ++++++++++++++++++++++++--- src/mito2/src/test_util/sst_util.rs | 11 ++++-- 8 files changed, 87 insertions(+), 47 deletions(-) diff --git a/config/config.md b/config/config.md index 0e395dc445..dfb4db1cf1 100644 --- a/config/config.md +++ b/config/config.md @@ -238,9 +238,6 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | -| `heartbeat` | -- | -- | The heartbeat options. | -| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. | -| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. | @@ -364,7 +361,7 @@ | `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.
This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. | | `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.
**This option is not recommended to be set to true, because it may lead to data loss during failover.** | | `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | -| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.
The frontend heartbeat interval is 6 times of the base heartbeat interval.
The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. | +| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.
The frontend heartbeat interval is 6 times of the base heartbeat interval.
The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
Heartbeat intervals are negotiated from metasrv during handshake; local node configs do not override this. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | @@ -473,9 +470,6 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | -| `heartbeat` | -- | -- | The heartbeat options. | -| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | -| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | | `meta_client` | -- | -- | The metasrv client options. | | `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. | | `meta_client.timeout` | String | `3s` | Operation timeout. | @@ -663,9 +657,6 @@ | `meta_client.metadata_cache_max_capacity` | Integer | `100000` | The configuration about the cache of the metadata. | | `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. | | `meta_client.metadata_cache_tti` | String | `5m` | -- | -| `heartbeat` | -- | -- | The heartbeat options. | -| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | -| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | | `logging` | -- | -- | The logging options. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index d8793bee2f..a298430c83 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -244,10 +244,12 @@ impl ObjbenchCommand { ..Default::default() }; + let source = + FlatSource::new_stream(region_meta.schema.arrow_schema().clone(), reader_stream); let write_req = SstWriteRequest { op_type: OperationType::Flush, metadata: region_meta, - source: FlatSource::Stream(reader_stream), + source, cache_manager, storage: None, max_sequence: None, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index e2483ed4e4..6b02a92811 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -728,16 +728,19 @@ mod tests { let metadata = Arc::new(sst_region_metadata()); // Creates a source that can return an error to abort the writer. - let source = FlatSource::Iter(Box::new( + let record_batch = new_record_batch_by_range(&["a", "d"], 0, 60); + let schema = record_batch.schema(); + let iter = Box::new( [ - Ok(new_record_batch_by_range(&["a", "d"], 0, 60)), + Ok(record_batch), InvalidBatchSnafu { reason: "Abort the writer", } .fail(), ] .into_iter(), - )); + ); + let source = FlatSource::new_iter(schema, iter); // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 296d9ce2b1..1d1151177d 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -59,7 +59,7 @@ use crate::error::{ RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; -use crate::read::BoxedRecordBatchStream; +use crate::read::FlatSource; use crate::read::flat_projection::FlatProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; @@ -992,13 +992,18 @@ struct CompactionSstReaderBuilder<'a> { } impl CompactionSstReaderBuilder<'_> { - /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. - async fn build_flat_sst_reader(self) -> Result { + /// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files, + /// for compaction. The schema of the [FlatSource] is unified. + async fn build_flat_sst_reader(self) -> Result { let scan_input = self.build_scan_input()?.with_compaction(true); + let schema = scan_input.mapper.output_schema(); + let schema = schema.arrow_schema(); + SeqScan::new(scan_input) .build_flat_reader_for_compaction() .await + .map(|stream| FlatSource::new_stream(schema.clone(), stream)) } fn build_scan_input(self) -> Result { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 59a8a10077..e5dae0af0c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -43,7 +43,6 @@ use crate::error::{ }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; @@ -362,8 +361,7 @@ impl SstMerger for DefaultSstMerger { time_range: output.output_time_range, merge_mode, }; - let reader = builder.build_flat_sst_reader().await?; - let source = FlatSource::Stream(reader); + let source = builder.build_flat_sst_reader().await?; let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c5ad2276eb..c863d15d08 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -761,7 +761,7 @@ fn memtable_flat_sources( ); flat_sources .sources - .push((FlatSource::Iter(iter), max_sequence)); + .push((FlatSource::new_iter(schema, iter), max_sequence)); }; } else { let min_flush_rows = *ENCODE_ROW_THRESHOLD; @@ -824,9 +824,10 @@ fn memtable_flat_sources( std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; - flat_sources - .sources - .push((FlatSource::Iter(maybe_dedup), max_sequence)); + flat_sources.sources.push(( + FlatSource::new_iter(schema.clone(), maybe_dedup), + max_sequence, + )); last_iter_rows = 0; current_ranges.clear(); } @@ -857,7 +858,7 @@ fn memtable_flat_sources( flat_sources .sources - .push((FlatSource::Iter(maybe_dedup), max_sequence)); + .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence)); } } @@ -1530,14 +1531,10 @@ mod tests { // Consume the iterator and count rows let mut total_rows = 0usize; for (source, _sequence) in flat_sources.sources { - match source { - crate::read::FlatSource::Iter(iter) => { - for rb in iter { - total_rows += rb.unwrap().num_rows(); - } - } - crate::read::FlatSource::Stream(_) => unreachable!(), - } + total_rows += source + .take_iter() + .map(|x| x.unwrap().num_rows()) + .sum::(); } assert_eq!(1, total_rows, "dedup should keep a single row"); } @@ -1560,14 +1557,10 @@ mod tests { let mut total_rows = 0usize; for (source, _sequence) in flat_sources.sources { - match source { - crate::read::FlatSource::Iter(iter) => { - for rb in iter { - total_rows += rb.unwrap().num_rows(); - } - } - crate::read::FlatSource::Stream(_) => unreachable!(), - } + total_rows += source + .take_iter() + .map(|x| x.unwrap().num_rows()) + .sum::(); } assert_eq!(2, total_rows, "append_mode should preserve duplicates"); } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index aaeaa9e62e..5a71bd1a4e 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -42,6 +42,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::OpType; +use arrow_schema::SchemaRef; use async_trait::async_trait; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; @@ -1105,19 +1106,59 @@ impl Source { } /// Async [RecordBatch] reader and iterator wrapper for flat format. -pub enum FlatSource { +pub struct FlatSource { + schema: SchemaRef, + inner: FlatSourceInner, +} + +impl FlatSource { + /// Create a [FlatSource] from a [BoxedRecordBatchIterator] and its schema. + pub fn new_iter(schema: SchemaRef, iter: BoxedRecordBatchIterator) -> Self { + Self { + schema, + inner: FlatSourceInner::Iter(iter), + } + } + + /// Create a [FlatSource] from a [BoxedRecordBatchStream] and its schema. + pub fn new_stream(schema: SchemaRef, stream: BoxedRecordBatchStream) -> Self { + Self { + schema, + inner: FlatSourceInner::Stream(stream), + } + } + + #[expect(unused)] + fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub async fn next_batch(&mut self) -> Result> { + self.inner.next_batch().await + } + + #[cfg(test)] + pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator { + match self.inner { + FlatSourceInner::Iter(iter) => iter, + FlatSourceInner::Stream(_) => unreachable!(), + } + } +} + +enum FlatSourceInner { /// Source from a [BoxedRecordBatchIterator]. Iter(BoxedRecordBatchIterator), /// Source from a [BoxedRecordBatchStream]. Stream(BoxedRecordBatchStream), } -impl FlatSource { +impl FlatSourceInner { /// Returns next [RecordBatch] from this data source. pub async fn next_batch(&mut self) -> Result> { match self { - FlatSource::Iter(iter) => iter.next().transpose(), - FlatSource::Stream(stream) => stream.try_next().await, + Self::Iter(iter) => iter.next().transpose(), + Self::Stream(stream) => stream.try_next().await, } } } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 4e759f50cd..3f635c6e94 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; +use arrow_schema::Schema; use common_time::Timestamp; use datatypes::arrow::array::{ ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder, @@ -306,8 +307,14 @@ pub fn new_record_batch_with_custom_sequence( } /// Creates a FlatSource from flat format RecordBatches. -pub fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { - FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) +pub(crate) fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { + FlatSource::new_iter( + batches + .first() + .map(|x| x.schema()) + .unwrap_or_else(|| Arc::new(Schema::empty())), + Box::new(batches.into_iter().map(Ok)), + ) } /// Creates a new region metadata for testing SSTs with binary datatype.