mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 01:10:37 +00:00
refactor: store the schema of flat source (#8091)
* refactor: store the schema of flat source Signed-off-by: luofucong <luofc@foxmail.com> * resolve PR comments Signed-off-by: luofucong <luofc@foxmail.com> * fix ci Signed-off-by: luofucong <luofc@foxmail.com> --------- Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
@@ -238,9 +238,6 @@
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `heartbeat` | -- | -- | The heartbeat options. |
|
||||
| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. |
|
||||
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
|
||||
@@ -364,7 +361,7 @@
|
||||
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
|
||||
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
|
||||
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
|
||||
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
|
||||
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>Heartbeat intervals are negotiated from metasrv during handshake; local node configs do not override this. |
|
||||
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
@@ -473,9 +470,6 @@
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
| `heartbeat` | -- | -- | The heartbeat options. |
|
||||
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
|
||||
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
|
||||
| `meta_client` | -- | -- | The metasrv client options. |
|
||||
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
|
||||
| `meta_client.timeout` | String | `3s` | Operation timeout. |
|
||||
@@ -663,9 +657,6 @@
|
||||
| `meta_client.metadata_cache_max_capacity` | Integer | `100000` | The configuration about the cache of the metadata. |
|
||||
| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
|
||||
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
|
||||
| `heartbeat` | -- | -- | The heartbeat options. |
|
||||
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
|
||||
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
|
||||
| `logging` | -- | -- | The logging options. |
|
||||
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
|
||||
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
|
||||
|
||||
@@ -244,10 +244,12 @@ impl ObjbenchCommand {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let source =
|
||||
FlatSource::new_stream(region_meta.schema.arrow_schema().clone(), reader_stream);
|
||||
let write_req = SstWriteRequest {
|
||||
op_type: OperationType::Flush,
|
||||
metadata: region_meta,
|
||||
source: FlatSource::Stream(reader_stream),
|
||||
source,
|
||||
cache_manager,
|
||||
storage: None,
|
||||
max_sequence: None,
|
||||
|
||||
9
src/mito2/src/cache/write_cache.rs
vendored
9
src/mito2/src/cache/write_cache.rs
vendored
@@ -728,16 +728,19 @@ mod tests {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
|
||||
// Creates a source that can return an error to abort the writer.
|
||||
let source = FlatSource::Iter(Box::new(
|
||||
let record_batch = new_record_batch_by_range(&["a", "d"], 0, 60);
|
||||
let schema = record_batch.schema();
|
||||
let iter = Box::new(
|
||||
[
|
||||
Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
|
||||
Ok(record_batch),
|
||||
InvalidBatchSnafu {
|
||||
reason: "Abort the writer",
|
||||
}
|
||||
.fail(),
|
||||
]
|
||||
.into_iter(),
|
||||
));
|
||||
);
|
||||
let source = FlatSource::new_iter(schema, iter);
|
||||
|
||||
// Write to local cache and upload sst to mock remote store
|
||||
let write_request = SstWriteRequest {
|
||||
|
||||
@@ -59,7 +59,7 @@ use crate::error::{
|
||||
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
|
||||
};
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
|
||||
use crate::read::BoxedRecordBatchStream;
|
||||
use crate::read::FlatSource;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
@@ -992,13 +992,18 @@ struct CompactionSstReaderBuilder<'a> {
|
||||
}
|
||||
|
||||
impl CompactionSstReaderBuilder<'_> {
|
||||
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
|
||||
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
|
||||
/// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
|
||||
/// for compaction. The schema of the [FlatSource] is unified.
|
||||
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
|
||||
let scan_input = self.build_scan_input()?.with_compaction(true);
|
||||
|
||||
let schema = scan_input.mapper.output_schema();
|
||||
let schema = schema.arrow_schema();
|
||||
|
||||
SeqScan::new(scan_input)
|
||||
.build_flat_reader_for_compaction()
|
||||
.await
|
||||
.map(|stream| FlatSource::new_stream(schema.clone(), stream))
|
||||
}
|
||||
|
||||
fn build_scan_input(self) -> Result<ScanInput> {
|
||||
|
||||
@@ -43,7 +43,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::read::FlatSource;
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::VersionRef;
|
||||
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
|
||||
@@ -362,8 +361,7 @@ impl SstMerger for DefaultSstMerger {
|
||||
time_range: output.output_time_range,
|
||||
merge_mode,
|
||||
};
|
||||
let reader = builder.build_flat_sst_reader().await?;
|
||||
let source = FlatSource::Stream(reader);
|
||||
let source = builder.build_flat_sst_reader().await?;
|
||||
let mut metrics = Metrics::new(WriteType::Compaction);
|
||||
let region_metadata = compaction_region.region_metadata.clone();
|
||||
let sst_infos = compaction_region
|
||||
|
||||
@@ -761,7 +761,7 @@ fn memtable_flat_sources(
|
||||
);
|
||||
flat_sources
|
||||
.sources
|
||||
.push((FlatSource::Iter(iter), max_sequence));
|
||||
.push((FlatSource::new_iter(schema, iter), max_sequence));
|
||||
};
|
||||
} else {
|
||||
let min_flush_rows = *ENCODE_ROW_THRESHOLD;
|
||||
@@ -824,9 +824,10 @@ fn memtable_flat_sources(
|
||||
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
|
||||
)?;
|
||||
|
||||
flat_sources
|
||||
.sources
|
||||
.push((FlatSource::Iter(maybe_dedup), max_sequence));
|
||||
flat_sources.sources.push((
|
||||
FlatSource::new_iter(schema.clone(), maybe_dedup),
|
||||
max_sequence,
|
||||
));
|
||||
last_iter_rows = 0;
|
||||
current_ranges.clear();
|
||||
}
|
||||
@@ -857,7 +858,7 @@ fn memtable_flat_sources(
|
||||
|
||||
flat_sources
|
||||
.sources
|
||||
.push((FlatSource::Iter(maybe_dedup), max_sequence));
|
||||
.push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1530,14 +1531,10 @@ mod tests {
|
||||
// Consume the iterator and count rows
|
||||
let mut total_rows = 0usize;
|
||||
for (source, _sequence) in flat_sources.sources {
|
||||
match source {
|
||||
crate::read::FlatSource::Iter(iter) => {
|
||||
for rb in iter {
|
||||
total_rows += rb.unwrap().num_rows();
|
||||
}
|
||||
}
|
||||
crate::read::FlatSource::Stream(_) => unreachable!(),
|
||||
}
|
||||
total_rows += source
|
||||
.take_iter()
|
||||
.map(|x| x.unwrap().num_rows())
|
||||
.sum::<usize>();
|
||||
}
|
||||
assert_eq!(1, total_rows, "dedup should keep a single row");
|
||||
}
|
||||
@@ -1560,14 +1557,10 @@ mod tests {
|
||||
|
||||
let mut total_rows = 0usize;
|
||||
for (source, _sequence) in flat_sources.sources {
|
||||
match source {
|
||||
crate::read::FlatSource::Iter(iter) => {
|
||||
for rb in iter {
|
||||
total_rows += rb.unwrap().num_rows();
|
||||
}
|
||||
}
|
||||
crate::read::FlatSource::Stream(_) => unreachable!(),
|
||||
}
|
||||
total_rows += source
|
||||
.take_iter()
|
||||
.map(|x| x.unwrap().num_rows())
|
||||
.sum::<usize>();
|
||||
}
|
||||
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::OpType;
|
||||
use arrow_schema::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::arrow::array::UInt8Array;
|
||||
@@ -1105,19 +1106,59 @@ impl Source {
|
||||
}
|
||||
|
||||
/// Async [RecordBatch] reader and iterator wrapper for flat format.
|
||||
pub enum FlatSource {
|
||||
pub struct FlatSource {
|
||||
schema: SchemaRef,
|
||||
inner: FlatSourceInner,
|
||||
}
|
||||
|
||||
impl FlatSource {
|
||||
/// Create a [FlatSource] from a [BoxedRecordBatchIterator] and its schema.
|
||||
pub fn new_iter(schema: SchemaRef, iter: BoxedRecordBatchIterator) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
inner: FlatSourceInner::Iter(iter),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a [FlatSource] from a [BoxedRecordBatchStream] and its schema.
|
||||
pub fn new_stream(schema: SchemaRef, stream: BoxedRecordBatchStream) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
inner: FlatSourceInner::Stream(stream),
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
self.inner.next_batch().await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator {
|
||||
match self.inner {
|
||||
FlatSourceInner::Iter(iter) => iter,
|
||||
FlatSourceInner::Stream(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum FlatSourceInner {
|
||||
/// Source from a [BoxedRecordBatchIterator].
|
||||
Iter(BoxedRecordBatchIterator),
|
||||
/// Source from a [BoxedRecordBatchStream].
|
||||
Stream(BoxedRecordBatchStream),
|
||||
}
|
||||
|
||||
impl FlatSource {
|
||||
impl FlatSourceInner {
|
||||
/// Returns next [RecordBatch] from this data source.
|
||||
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
match self {
|
||||
FlatSource::Iter(iter) => iter.next().transpose(),
|
||||
FlatSource::Stream(stream) => stream.try_next().await,
|
||||
Self::Iter(iter) => iter.next().transpose(),
|
||||
Self::Stream(stream) => stream.try_next().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{OpType, SemanticType};
|
||||
use arrow_schema::Schema;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::{
|
||||
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
|
||||
@@ -306,8 +307,14 @@ pub fn new_record_batch_with_custom_sequence(
|
||||
}
|
||||
|
||||
/// Creates a FlatSource from flat format RecordBatches.
|
||||
pub fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
|
||||
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
|
||||
pub(crate) fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
|
||||
FlatSource::new_iter(
|
||||
batches
|
||||
.first()
|
||||
.map(|x| x.schema())
|
||||
.unwrap_or_else(|| Arc::new(Schema::empty())),
|
||||
Box::new(batches.into_iter().map(Ok)),
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a new region metadata for testing SSTs with binary datatype.
|
||||
|
||||
Reference in New Issue
Block a user