refactor: unify flush and compaction to always use FlatSource (#7799)

* feat: support write flat as primary key format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: migrate flush to always use FlatSource

Add FormatType propagation in SstWriteRequest and use it to choose
Flat vs PrimaryKey write paths (write_all_flat vs
write_all_flat_as_primary_key) in AccessLayer and WriteCache. Make
compactor and flush derive the sst_write_format from region options or
engine config. Simplify flush logic and remove the old memtable_source
helper. Update tests to set default sst_write_format.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: compaction use flat source

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: read parquet sequentially as flat batches

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove new_batch_with_binary in favor of new_record_batch_with_binary

Replace PrimaryKeyWriteFormat with FlatWriteFormat in test_read_large_binary
test and use new_record_batch_with_binary directly, removing the now-unused
new_batch_with_binary function and its BinaryArray import.

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for PrimaryKeyWriteFormat::convert_flat_batch

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove Either from SstWriteRequest

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle index build mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: consider sparse encoding and last non null in flush

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add unit tests for field_column_start edge cases

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-13 17:44:13 +08:00
committed by GitHub
parent 74ff5c37ea
commit e215851c8a
17 changed files with 668 additions and 904 deletions

View File

@@ -20,13 +20,14 @@ use clap::Parser;
use colored::Colorize;
use datanode::config::RegionEngineConfig;
use datanode::store;
use either::Either;
use futures::stream;
use mito2::access_layer::{
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
};
use mito2::cache::{CacheManager, CacheManagerRef};
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::read::FlatSource;
use mito2::sst::FormatType;
use mito2::sst::file::{FileHandle, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
use mito2::sst::index::intermediate::IntermediateManager;
@@ -210,6 +211,7 @@ impl ObjbenchCommand {
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.flat_format(true)
.build()
.await
.map_err(|e| {
@@ -231,6 +233,10 @@ impl ObjbenchCommand {
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
let reader_stream = Box::pin(stream::try_unfold(reader, |mut reader| async move {
let batch = reader.next_record_batch().await?;
Ok(batch.map(|batch| (batch, reader)))
}));
// Build write request
let fulltext_index_config = FulltextIndexConfig {
@@ -241,10 +247,11 @@ impl ObjbenchCommand {
let write_req = SstWriteRequest {
op_type: OperationType::Flush,
metadata: region_meta,
source: Either::Left(Source::Reader(Box::new(reader))),
source: FlatSource::Stream(reader_stream),
cache_manager,
storage: None,
max_sequence: None,
sst_write_format: FormatType::PrimaryKey,
index_options: Default::default(),
index_config: mito_engine_config.index.clone(),
inverted_index_config: MitoConfig::default().inverted_index,

View File

@@ -17,7 +17,6 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_time::Timestamp;
use either::Either;
use futures::{Stream, TryStreamExt};
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
@@ -37,7 +36,7 @@ use crate::error::{
CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::read::FlatSource;
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::IndexerBuilderImpl;
@@ -47,7 +46,7 @@ use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FormatType};
pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
@@ -391,15 +390,19 @@ impl AccessLayer {
)
.await
.with_file_cleaner(cleaner);
match request.source {
Either::Left(source) => {
match request.sst_write_format {
FormatType::PrimaryKey => {
writer
.write_all(source, request.max_sequence, write_opts)
.write_all_flat_as_primary_key(
request.source,
request.max_sequence,
write_opts,
)
.await?
}
Either::Right(flat_source) => {
FormatType::Flat => {
writer
.write_all_flat(flat_source, request.max_sequence, write_opts)
.write_all_flat(request.source, request.max_sequence, write_opts)
.await?
}
}
@@ -520,11 +523,12 @@ pub enum OperationType {
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Either<Source, FlatSource>,
pub source: FlatSource,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
pub sst_write_format: FormatType,
/// Configs for index
pub index_options: IndexOptions,

View File

@@ -244,15 +244,19 @@ impl WriteCache {
.await
.with_file_cleaner(cleaner);
let sst_info = match write_request.source {
either::Left(source) => {
let sst_info = match write_request.sst_write_format {
crate::sst::FormatType::PrimaryKey => {
writer
.write_all(source, write_request.max_sequence, write_opts)
.write_all_flat_as_primary_key(
write_request.source,
write_request.max_sequence,
write_opts,
)
.await?
}
either::Right(flat_source) => {
crate::sst::FormatType::Flat => {
writer
.write_all_flat(flat_source, write_request.max_sequence, write_opts)
.write_all_flat(write_request.source, write_request.max_sequence, write_opts)
.await?
}
};
@@ -509,12 +513,13 @@ mod tests {
use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
use crate::read::Source;
use crate::read::FlatSource;
use crate::region::options::IndexOptions;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::TestEnv;
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata,
new_flat_source_from_record_batches, new_record_batch_by_range,
sst_file_handle_with_file_id, sst_region_metadata,
};
#[tokio::test]
@@ -532,21 +537,22 @@ mod tests {
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
.await;
// Create Source
// Create source.
let metadata = Arc::new(sst_region_metadata());
let region_id = metadata.region_id;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source: either::Left(source),
source,
storage: None,
max_sequence: None,
sst_write_format: Default::default(),
cache_manager: Default::default(),
index_options: IndexOptions::default(),
index_config: Default::default(),
@@ -636,19 +642,20 @@ mod tests {
// Create source
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source: either::Left(source),
source,
storage: None,
max_sequence: None,
sst_write_format: Default::default(),
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),
@@ -715,9 +722,9 @@ mod tests {
let metadata = Arc::new(sst_region_metadata());
// Creates a source that can return an error to abort the writer.
let source = Source::Iter(Box::new(
let source = FlatSource::Iter(Box::new(
[
Ok(new_batch_by_range(&["a", "d"], 0, 60)),
Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
InvalidBatchSnafu {
reason: "Abort the writer",
}
@@ -730,9 +737,10 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source: either::Left(source),
source,
storage: None,
max_sequence: None,
sst_write_format: Default::default(),
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),

View File

@@ -58,10 +58,10 @@ use crate::error::{
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
use crate::region::options::{MergeMode, RegionOptions};
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
@@ -828,7 +828,7 @@ pub struct SerializedCompactionOutput {
output_time_range: Option<TimestampRange>,
}
/// Builders to create [BoxedBatchReader] for compaction.
/// Builders to create [BoxedRecordBatchStream] for compaction.
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
@@ -841,24 +841,17 @@ struct CompactionSstReaderBuilder<'a> {
}
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
let scan_input = self.build_scan_input(false)?.with_compaction(true);
SeqScan::new(scan_input).build_reader_for_compaction().await
}
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
let scan_input = self.build_scan_input(true)?.with_compaction(true);
let scan_input = self.build_scan_input()?.with_compaction(true);
SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
}
fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
let mapper = ProjectionMapper::all(&self.metadata, flat_format)?;
fn build_scan_input(self) -> Result<ScanInput> {
let mapper = ProjectionMapper::all(&self.metadata, true)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
@@ -868,7 +861,7 @@ impl CompactionSstReaderBuilder<'_> {
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(flat_format);
.with_flat_format(true);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.

View File

@@ -43,7 +43,7 @@ use crate::error::{
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::metrics;
use crate::read::{FlatSource, Source};
use crate::read::FlatSource;
use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
@@ -356,13 +356,8 @@ impl DefaultCompactor {
time_range: output.output_time_range,
merge_mode,
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
Either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
Either::Left(Source::Reader(reader))
};
let reader = builder.build_flat_sst_reader().await?;
let source = FlatSource::Stream(reader);
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
@@ -375,6 +370,11 @@ impl DefaultCompactor {
cache_manager: compaction_region.cache_manager.clone(),
storage,
max_sequence: max_sequence.map(NonZero::get),
sst_write_format: if flat_format {
FormatType::Flat
} else {
FormatType::PrimaryKey
},
index_options,
index_config,
inverted_index_config,

View File

@@ -22,7 +22,6 @@ use std::time::Instant;
use common_telemetry::{debug, error, info};
use datatypes::arrow::datatypes::SchemaRef;
use either::Either;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
@@ -41,18 +40,14 @@ use crate::error::{
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
use crate::memtable::{
BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
};
use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::FlatSource;
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
use crate::read::merge::MergeReaderBuilder;
use crate::read::{FlatSource, Source};
use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
@@ -62,8 +57,10 @@ use crate::request::{
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::FileMeta;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::sst::parquet::{
DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
};
use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
use crate::worker::WorkerListener;
/// Global write buffer (memtable) manager.
@@ -480,78 +477,29 @@ impl RegionFlushTask {
// the counter may have more series than the actual series count.
series_count += memtable_series_count;
if mem_ranges.is_record_batch() {
let flush_start = Instant::now();
let FlushFlatMemResult {
num_encoded,
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.await?;
encoded_part_count += num_encoded;
for (source_idx, result) in results.into_iter().enumerate() {
let (max_sequence, ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
}
common_telemetry::debug!(
"Region {} flush one memtable {} {}/{}, metrics: {:?}",
self.region_id,
memtable_id,
source_idx,
num_sources,
metrics
);
flush_metrics = flush_metrics.merge(metrics);
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
Self::new_file_meta(
self.region_id,
max_sequence,
sst_info,
partition_expr.clone(),
)
}));
}
common_telemetry::debug!(
"Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
self.region_id,
num_sources,
memtable_id,
num_mem_ranges,
num_encoded,
num_mem_rows,
flush_start.elapsed(),
compact_cost,
);
} else {
let max_sequence = mem_ranges.max_sequence();
let source = memtable_source(mem_ranges, &version.options).await?;
// Flush to level 0.
let source = Either::Left(source);
let write_request = self.new_write_request(version, max_sequence, source);
let mut metrics = Metrics::new(WriteType::Flush);
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts, &mut metrics)
.await?;
FLUSH_FILE_TOTAL.inc_by(ssts_written.len() as u64);
let flush_start = Instant::now();
let FlushFlatMemResult {
num_encoded,
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.await?;
encoded_part_count += num_encoded;
for (source_idx, result) in results.into_iter().enumerate() {
let (max_sequence, ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
}
debug!(
"Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
self.region_id, num_mem_ranges, num_mem_rows, metrics
common_telemetry::debug!(
"Region {} flush one memtable {} {}/{}, metrics: {:?}",
self.region_id,
memtable_id,
source_idx,
num_sources,
metrics
);
flush_metrics = flush_metrics.merge(metrics);
@@ -565,7 +513,19 @@ impl RegionFlushTask {
partition_expr.clone(),
)
}));
};
}
common_telemetry::debug!(
"Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
self.region_id,
num_sources,
memtable_id,
num_mem_ranges,
num_encoded,
num_mem_rows,
flush_start.elapsed(),
compact_cost,
);
}
Ok(DoFlushMemtablesResult {
@@ -587,16 +547,17 @@ impl RegionFlushTask {
&version.metadata,
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
);
let field_column_start =
flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
let flat_sources = memtable_flat_sources(
batch_schema,
mem_ranges,
&version.options,
version.metadata.primary_key.len(),
field_column_start,
)?;
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
for (source, max_sequence) in flat_sources.sources {
let source = Either::Right(source);
let write_request = self.new_write_request(version, max_sequence, source);
let access_layer = self.access_layer.clone();
let write_opts = write_opts.clone();
@@ -667,8 +628,13 @@ impl RegionFlushTask {
&self,
version: &VersionRef,
max_sequence: u64,
source: Either<Source, FlatSource>,
source: FlatSource,
) -> SstWriteRequest {
let flat_format = version
.options
.sst_format
.map(|f| f == FormatType::Flat)
.unwrap_or(self.engine_config.default_experimental_flat_format);
SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
@@ -676,6 +642,11 @@ impl RegionFlushTask {
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
sst_write_format: if flat_format {
FormatType::Flat
} else {
FormatType::PrimaryKey
},
index_options: self.index_options.clone(),
index_config: self.engine_config.index.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
@@ -722,41 +693,6 @@ struct DoFlushMemtablesResult {
flush_metrics: Metrics,
}
/// Returns a [Source] for the given memtable.
async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
let source = if mem_ranges.ranges.len() == 1 {
let only_range = mem_ranges.ranges.into_values().next().unwrap();
let iter = only_range.build_iter()?;
Source::Iter(iter)
} else {
// todo(hl): a workaround since sync version of MergeReader is wip.
let sources = mem_ranges
.ranges
.into_values()
.map(|r| r.build_iter().map(Source::Iter))
.collect::<Result<Vec<_>>>()?;
let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
let maybe_dedup = if options.append_mode {
// no dedup in append mode
Box::new(merge_reader) as _
} else {
// dedup according to merge mode
match options.merge_mode.unwrap_or(MergeMode::LastRow) {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
}
MergeMode::LastNonNull => Box::new(DedupReader::new(
merge_reader,
LastNonNull::new(false),
None,
)) as _,
}
};
Source::Reader(maybe_dedup)
};
Ok(source)
}
struct FlatSources {
sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,

View File

@@ -57,7 +57,7 @@ use crate::memtable::{
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
use crate::region::options::MergeMode;
use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
use crate::sst::parquet::flat_format::field_column_start;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
@@ -1186,13 +1186,8 @@ impl MemtableCompactor {
Box::new(dedup_iter)
}
MergeMode::LastNonNull => {
// Calculates field column start: total columns - fixed columns - field columns
// Field column count = total metadata columns - time index column - primary key columns
let field_column_count =
metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
let total_columns = arrow_schema.fields().len();
let field_column_start =
total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
field_column_start(metadata, arrow_schema.fields().len());
let dedup_iter = FlatDedupIterator::new(
merged_iter,

View File

@@ -80,11 +80,6 @@ impl PruneReader {
}
}
pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) {
self.source = source;
self.skip_fields = skip_fields;
}
/// Merge metrics with the inner reader and return the merged metrics.
pub(crate) fn metrics(&self) -> ReaderMetrics {
let mut metrics = self.metrics.clone();

View File

@@ -128,28 +128,6 @@ impl SeqScan {
Ok(Box::pin(futures::stream::iter(streams).flatten()))
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::merge_all_ranges_for_compaction(
&self.stream_ctx,
partition_ranges,
&part_metrics,
self.pruner.clone(),
)
.await?;
Ok(Box::new(reader))
}
/// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
///
/// # Panics
@@ -172,40 +150,6 @@ impl SeqScan {
Ok(reader)
}
/// Builds a merge reader that reads all ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_ranges_for_compaction(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
part_metrics: &PartitionMetrics,
pruner: Arc<Pruner>,
) -> Result<BoxedBatchReader> {
pruner.add_partition_ranges(partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
let mut sources = Vec::new();
for part_range in partition_ranges {
build_sources(
stream_ctx,
part_range,
true,
part_metrics,
partition_pruner.clone(),
&mut sources,
None,
)
.await?;
}
common_telemetry::debug!(
"Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
stream_ctx.input.mapper.metadata().region_id,
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a merge reader that reads all flat ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_flat_ranges_for_compaction(

View File

@@ -31,7 +31,6 @@ use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use crate::read::Batch;
use crate::sst::parquet::flat_format::time_index_column_index;
pub mod file;
@@ -260,33 +259,6 @@ pub(crate) struct SeriesEstimator {
}
impl SeriesEstimator {
/// Updates the estimator with a new Batch.
///
/// Since each Batch contains only one series, this increments the series count
/// and updates the last timestamp.
pub(crate) fn update(&mut self, batch: &Batch) {
let Some(last_ts) = batch.last_timestamp() else {
return;
};
// Checks if there's a boundary between the last batch and this batch
if let Some(prev_last_ts) = self.last_timestamp {
// If the first timestamp of this batch is less than the last timestamp
// we've seen, it indicates a new series
if let Some(first_ts) = batch.first_timestamp()
&& first_ts.value() <= prev_last_ts
{
self.series_count += 1;
}
} else {
// First batch, counts as first series
self.series_count = 1;
}
// Updates the last timestamp
self.last_timestamp = Some(last_ts.value());
}
/// Updates the estimator with a new record batch in flat format.
///
/// This method examines the time index column to detect series boundaries.
@@ -340,43 +312,14 @@ impl SeriesEstimator {
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{
BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt8Builder,
UInt32Array, UInt64Array,
BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
UInt64Array,
};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use super::*;
use crate::read::{Batch, BatchBuilder};
fn new_batch(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
) -> Batch {
let timestamps = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
let sequences = Arc::new(UInt64Array::from(sequences.to_vec()));
let mut op_type_builder = UInt8Builder::with_capacity(op_types.len());
for op_type in op_types {
op_type_builder.append_value(*op_type as u8);
}
let op_types = Arc::new(UInt8Array::from(
op_types.iter().map(|op| *op as u8).collect::<Vec<_>>(),
));
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(timestamps)
.unwrap()
.sequences_array(sequences)
.unwrap()
.op_types_array(op_types)
.unwrap();
builder.build().unwrap()
}
fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
// Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type]
@@ -411,128 +354,6 @@ mod tests {
RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
}
#[test]
fn test_series_estimator_empty_batch() {
let mut estimator = SeriesEstimator::default();
let batch = new_batch(b"test", &[], &[], &[]);
estimator.update(&batch);
assert_eq!(0, estimator.finish());
}
#[test]
fn test_series_estimator_single_batch() {
let mut estimator = SeriesEstimator::default();
let batch = new_batch(
b"test",
&[1, 2, 3],
&[1, 2, 3],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch);
assert_eq!(1, estimator.finish());
}
#[test]
fn test_series_estimator_multiple_batches_same_series() {
let mut estimator = SeriesEstimator::default();
// First batch with timestamps 1, 2, 3
let batch1 = new_batch(
b"test",
&[1, 2, 3],
&[1, 2, 3],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch1);
// Second batch with timestamps 4, 5, 6 (continuation)
let batch2 = new_batch(
b"test",
&[4, 5, 6],
&[4, 5, 6],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch2);
assert_eq!(1, estimator.finish());
}
#[test]
fn test_series_estimator_new_series_detected() {
let mut estimator = SeriesEstimator::default();
// First batch with timestamps 1, 2, 3
let batch1 = new_batch(
b"pk0",
&[1, 2, 3],
&[1, 2, 3],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch1);
// Second batch with timestamps 2, 3, 4 (timestamp goes back, new series)
let batch2 = new_batch(
b"pk1",
&[2, 3, 4],
&[4, 5, 6],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch2);
assert_eq!(2, estimator.finish());
}
#[test]
fn test_series_estimator_equal_timestamp_boundary() {
let mut estimator = SeriesEstimator::default();
// First batch ending at timestamp 5
let batch1 = new_batch(
b"test",
&[1, 2, 5],
&[1, 2, 3],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch1);
// Second batch starting at timestamp 5 (equal, indicates new series)
let batch2 = new_batch(
b"test",
&[5, 6, 7],
&[4, 5, 6],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch2);
assert_eq!(2, estimator.finish());
}
#[test]
fn test_series_estimator_finish_resets_state() {
let mut estimator = SeriesEstimator::default();
let batch1 = new_batch(
b"test",
&[1, 2, 3],
&[1, 2, 3],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch1);
assert_eq!(1, estimator.finish());
// After finish, state should be reset
let batch2 = new_batch(
b"test",
&[4, 5, 6],
&[4, 5, 6],
&[OpType::Put, OpType::Put, OpType::Put],
);
estimator.update(&batch2);
assert_eq!(1, estimator.finish());
}
#[test]
fn test_series_estimator_flat_empty_batch() {
let mut estimator = SeriesEstimator::default();

View File

@@ -58,7 +58,7 @@ use crate::error::{
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::{Batch, BatchReader};
use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState};
@@ -802,9 +802,9 @@ impl IndexBuildTask {
if let Some(mut parquet_reader) = parquet_reader {
// TODO(SNC123): optimize index batch
loop {
match parquet_reader.next_batch().await {
Ok(Some(mut batch)) => {
indexer.update(&mut batch).await;
match parquet_reader.next_record_batch().await {
Ok(Some(batch)) => {
indexer.update_flat(&batch).await;
}
Ok(None) => break,
Err(e) => {
@@ -1227,7 +1227,9 @@ mod tests {
use crate::sst::parquet::WriteOptions;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
use crate::test_util::sst_util::{
new_flat_source_from_record_batches, new_record_batch_by_range, sst_region_metadata,
};
struct MetaConfig {
with_inverted: bool,
@@ -1358,19 +1360,20 @@ mod tests {
env: &SchedulerEnv,
build_mode: IndexBuildMode,
) -> SstInfo {
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
let mut index_config = MitoConfig::default().index;
index_config.build_mode = build_mode;
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: metadata.clone(),
source: either::Left(source),
source,
storage: None,
max_sequence: None,
sst_write_format: Default::default(),
cache_manager: Default::default(),
index_options: IndexOptions::default(),
index_config,

View File

@@ -110,6 +110,7 @@ mod tests {
TimestampMillisecondArray, UInt8Array, UInt64Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
use datatypes::arrow::util::pretty::pretty_format_batches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
use object_store::ObjectStore;
@@ -129,7 +130,7 @@ mod tests {
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::read::FlatSource;
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_purger::NoopFilePurger;
@@ -137,19 +138,19 @@ mod tests {
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::flat_format::FlatWriteFormat;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{
DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
};
use crate::test_util::TestEnv;
use crate::test_util::sst_util::{
build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary,
new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key,
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
build_test_binary_test_region_metadata, new_flat_source_from_record_batches,
new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence,
new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
sst_region_metadata_with_encoding,
};
use crate::test_util::{TestEnv, check_reader_result};
const FILE_DIR: &str = "/";
const REGION_ID: RegionId = RegionId::new(0, 0);
@@ -191,10 +192,10 @@ mod tests {
region_file_id: handle.file_id(),
};
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -214,7 +215,7 @@ mod tests {
.await;
let info = writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -235,14 +236,14 @@ mod tests {
object_store,
);
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(
check_record_batch_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 150),
new_batch_by_range(&["b", "h"], 150, 200),
new_record_batch_by_range(&["a", "d"], 0, 50),
new_record_batch_by_range(&["a", "d"], 50, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 150),
new_record_batch_by_range(&["b", "h"], 150, 200),
],
)
.await;
@@ -254,10 +255,10 @@ mod tests {
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -279,7 +280,7 @@ mod tests {
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -299,14 +300,14 @@ mod tests {
.cache(cache.clone());
for _ in 0..3 {
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(
check_record_batch_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 150),
new_batch_by_range(&["b", "h"], 150, 200),
new_record_batch_by_range(&["a", "d"], 0, 50),
new_record_batch_by_range(&["a", "d"], 50, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 150),
new_record_batch_by_range(&["b", "h"], 150, 200),
],
)
.await;
@@ -340,10 +341,10 @@ mod tests {
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
@@ -366,7 +367,7 @@ mod tests {
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -392,10 +393,10 @@ mod tests {
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -416,7 +417,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -436,11 +437,11 @@ mod tests {
)
.predicate(predicate);
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(
check_record_batch_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_record_batch_by_range(&["a", "d"], 0, 50),
new_record_batch_by_range(&["a", "d"], 50, 60),
],
)
.await;
@@ -452,10 +453,10 @@ mod tests {
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "z"], 0, 0),
new_batch_by_range(&["a", "z"], 100, 100),
new_batch_by_range(&["a", "z"], 200, 230),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "z"], 0, 0),
new_record_batch_by_range(&["a", "z"], 100, 100),
new_record_batch_by_range(&["a", "z"], 200, 230),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -476,7 +477,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -488,7 +489,11 @@ mod tests {
object_store,
);
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
check_record_batch_reader_result(
&mut reader,
&[new_record_batch_by_range(&["a", "z"], 200, 230)],
)
.await;
}
#[tokio::test]
@@ -497,10 +502,10 @@ mod tests {
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 60),
new_record_batch_by_range(&["b", "f"], 0, 40),
new_record_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -522,7 +527,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -542,7 +547,11 @@ mod tests {
)
.predicate(predicate);
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
check_record_batch_reader_result(
&mut reader,
&[new_record_batch_by_range(&["b", "h"], 150, 200)],
)
.await;
}
#[tokio::test]
@@ -569,7 +578,7 @@ mod tests {
let writer_props = props_builder.build();
let write_format = PrimaryKeyWriteFormat::new(metadata);
let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
@@ -603,9 +612,8 @@ mod tests {
)
.unwrap();
let batch = new_batch_with_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
let batch = new_record_batch_with_binary(&["a"], 0, 60);
let arrays: Vec<_> = batch
.columns()
.iter()
.map(|array| {
@@ -629,11 +637,11 @@ mod tests {
object_store,
);
let mut reader = builder.build().await.unwrap().unwrap();
check_reader_result(
check_record_batch_reader_result(
&mut reader,
&[
new_batch_with_binary(&["a"], 0, 50),
new_batch_with_binary(&["a"], 50, 60),
new_record_batch_with_binary(&["a"], 0, 50),
new_record_batch_with_binary(&["a"], 50, 60),
],
)
.await;
@@ -646,17 +654,17 @@ mod tests {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
new_batch_by_range(&["a", "d"], 0, 1000),
new_batch_by_range(&["b", "f"], 0, 1000),
new_batch_by_range(&["c", "g"], 0, 1000),
new_batch_by_range(&["b", "h"], 100, 200),
new_batch_by_range(&["b", "h"], 200, 300),
new_batch_by_range(&["b", "h"], 300, 1000),
let batches = vec![
new_record_batch_by_range(&["a", "d"], 0, 1000),
new_record_batch_by_range(&["b", "f"], 0, 1000),
new_record_batch_by_range(&["c", "g"], 0, 1000),
new_record_batch_by_range(&["b", "h"], 100, 200),
new_record_batch_by_range(&["b", "h"], 200, 300),
new_record_batch_by_range(&["b", "h"], 300, 1000),
];
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
let source = new_source(batches);
let source = new_flat_source_from_record_batches(batches);
let write_opts = WriteOptions {
row_group_size: 50,
max_file_size: Some(1024 * 16),
@@ -678,7 +686,10 @@ mod tests {
)
.await;
let files = writer.write_all(source, None, &write_opts).await.unwrap();
let files = writer
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap();
assert_eq!(2, files.len());
let mut rows_read = 0;
@@ -695,7 +706,7 @@ mod tests {
object_store.clone(),
);
let mut reader = builder.build().await.unwrap().unwrap();
while let Some(batch) = reader.next_batch().await.unwrap() {
while let Some(batch) = reader.next_record_batch().await.unwrap() {
rows_read += batch.num_rows();
}
}
@@ -710,12 +721,12 @@ mod tests {
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 50;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 20),
new_batch_by_range(&["c", "f"], 0, 40),
new_batch_by_range(&["c", "h"], 100, 200),
let source = new_flat_source_from_record_batches(vec![
new_record_batch_by_range(&["a", "d"], 0, 20),
new_record_batch_by_range(&["b", "d"], 0, 20),
new_record_batch_by_range(&["c", "d"], 0, 20),
new_record_batch_by_range(&["c", "f"], 0, 40),
new_record_batch_by_range(&["c", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
@@ -760,7 +771,7 @@ mod tests {
.await;
let info = writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -877,6 +888,7 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -891,7 +903,11 @@ mod tests {
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
check_record_batch_reader_result(
&mut reader,
&[new_record_batch_by_range(&["b", "d"], 0, 20)],
)
.await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
@@ -937,6 +953,7 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -991,6 +1008,7 @@ mod tests {
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.bloom_filter_index_appliers([bloom_filter_applier.clone(), None])
@@ -1005,13 +1023,13 @@ mod tests {
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(
check_record_batch_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 10),
new_batch_by_range(&["c", "d"], 10, 20),
new_record_batch_by_range(&["a", "d"], 0, 20),
new_record_batch_by_range(&["b", "d"], 0, 20),
new_record_batch_by_range(&["c", "d"], 0, 10),
new_record_batch_by_range(&["c", "d"], 10, 20),
],
)
.await;
@@ -1032,37 +1050,32 @@ mod tests {
assert!(cached.contains_row_group(3));
}
/// Creates a flat format RecordBatch for testing.
/// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch {
assert!(end >= start);
let metadata = Arc::new(sst_region_metadata());
let metadata = build_test_binary_test_region_metadata();
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let num_rows = end - start;
let mut columns = Vec::new();
// Add primary key columns (tag_0, tag_1) as dictionary arrays
let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
tag_0_builder.append_value(tags[0]);
tag_1_builder.append_value(tags[1]);
}
columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
// Add field column (field_0)
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
columns.push(Arc::new(UInt64Array::from(field_values)));
let values = (0..num_rows)
.map(|_| "some data".as_bytes())
.collect::<Vec<_>>();
columns.push(
Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values(
values,
)) as ArrayRef,
);
// Add time index column (ts)
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
// Add encoded primary key column
let pk = new_primary_key(tags);
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
@@ -1070,10 +1083,7 @@ mod tests {
}
columns.push(Arc::new(pk_builder.finish()));
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(
OpType::Put as u8,
num_rows,
@@ -1082,9 +1092,19 @@ mod tests {
RecordBatch::try_new(flat_schema, columns).unwrap()
}
/// Creates a FlatSource from flat format RecordBatches.
fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
async fn check_record_batch_reader_result(
reader: &mut ParquetReader,
expected: &[RecordBatch],
) {
let mut actual = Vec::new();
while let Some(batch) = reader.next_record_batch().await.unwrap() {
actual.push(batch);
}
assert_eq!(
pretty_format_batches(expected).unwrap().to_string(),
pretty_format_batches(&actual).unwrap().to_string()
);
assert!(reader.next_record_batch().await.unwrap().is_none());
}
/// Creates a flat format RecordBatch for testing with sparse primary key encoding.
@@ -1333,10 +1353,11 @@ mod tests {
};
let metadata = Arc::new(sst_region_metadata());
// Create batches with sequence 0 to trigger override functionality
let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0);
let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0);
let source = new_source(&[batch1, batch2]);
// Create batches with sequence 0 to trigger override functionality.
let source = new_flat_source_from_record_batches(vec![
new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0),
new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0),
]);
let write_opts = WriteOptions {
row_group_size: 50,
@@ -1355,7 +1376,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts)
.write_all_flat_as_primary_key(source, None, &write_opts)
.await
.unwrap()
.remove(0);
@@ -1369,7 +1390,7 @@ mod tests {
);
let mut reader = builder.build().await.unwrap().unwrap();
let mut normal_batches = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
while let Some(batch) = reader.next_record_batch().await.unwrap() {
normal_batches.push(batch);
}
@@ -1391,22 +1412,19 @@ mod tests {
);
let mut reader = builder.build().await.unwrap().unwrap();
let mut override_batches = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
while let Some(batch) = reader.next_record_batch().await.unwrap() {
override_batches.push(batch);
}
// Compare the results
assert_eq!(normal_batches.len(), override_batches.len());
for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) {
// Create expected batch with override sequence
let expected_batch = {
let num_rows = normal.num_rows();
let mut builder = BatchBuilder::from(normal);
builder
.sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows)))
.unwrap();
builder.build().unwrap()
let mut columns = normal.columns().to_vec();
let num_cols = columns.len();
columns[num_cols - 2] =
Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows()));
RecordBatch::try_new(normal.schema(), columns).unwrap()
};
// Override batch should match expected batch

View File

@@ -52,8 +52,8 @@ use crate::error::{
NewRecordBatchSnafu, Result,
};
use crate::sst::parquet::format::{
FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
StatValues,
FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
PrimaryKeyReadFormat, ReadFormat, StatValues,
};
use crate::sst::{
FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
@@ -127,6 +127,21 @@ pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
num_columns - 1
}
/// Returns the start index of field columns in a flat batch.
///
/// `num_columns` is the total number of columns in the flat batch schema,
/// including tag columns (if present), field columns, and fixed position columns
/// (time index, primary key, sequence, op type).
///
/// For Dense encoding (raw PK columns included): field_column_start = primary_key.len()
/// For Sparse encoding (no raw PK columns): field_column_start = 0
pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
// Calculates field column start: total columns - fixed columns - field columns
// Field column count = total metadata columns - time index column - primary key columns
let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
num_columns - FIXED_POS_COLUMN_NUM - field_column_count
}
// TODO(yingwen): Add an option to skip reading internal columns if the region is
// append only and doesn't use sparse encoding (We need to check the table id under
// sparse encoding).
@@ -765,3 +780,89 @@ impl FlatReadFormat {
.unwrap()
}
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::field_column_start;
use crate::sst::{FlatSchemaOptions, flat_sst_arrow_schema_column_num};
/// Builds a `RegionMetadata` with the given number of tags and fields.
fn build_metadata(
num_tags: usize,
num_fields: usize,
encoding: PrimaryKeyEncoding,
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
let mut col_id = 0u32;
for i in 0..num_tags {
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("tag_{i}"),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: col_id,
});
col_id += 1;
}
for i in 0..num_fields {
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("field_{i}"),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: col_id,
});
col_id += 1;
}
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: col_id,
});
let primary_key: Vec<u32> = (0..num_tags as u32).collect();
builder.primary_key(primary_key);
builder.primary_key_encoding(encoding);
builder.build().unwrap()
}
#[test]
fn test_field_column_start() {
// (num_tags, num_fields, encoding, expected)
let cases = [
(1, 1, PrimaryKeyEncoding::Dense, 1),
(2, 2, PrimaryKeyEncoding::Dense, 2),
(0, 2, PrimaryKeyEncoding::Dense, 0),
(2, 2, PrimaryKeyEncoding::Sparse, 0),
];
for (num_tags, num_fields, encoding, expected) in cases {
let metadata = build_metadata(num_tags, num_fields, encoding);
let options = FlatSchemaOptions::from_encoding(encoding);
let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
let result = field_column_start(&metadata, num_columns);
assert_eq!(
result, expected,
"num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
);
}
}
}

View File

@@ -34,12 +34,12 @@ use api::v1::SemanticType;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{
ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use datatypes::vectors::Helper;
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
@@ -51,8 +51,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::sst::file::{FileMeta, FileTimeRange};
@@ -73,7 +72,6 @@ pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
/// Helper for writing the SST format with primary key.
pub(crate) struct PrimaryKeyWriteFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
override_sequence: Option<SequenceNumber>,
@@ -84,7 +82,6 @@ impl PrimaryKeyWriteFormat {
pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
let arrow_schema = to_sst_arrow_schema(&metadata);
PrimaryKeyWriteFormat {
metadata,
arrow_schema,
override_sequence: None,
}
@@ -104,40 +101,25 @@ impl PrimaryKeyWriteFormat {
&self.arrow_schema
}
/// Convert `batch` to a arrow record batch to store in parquet.
pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
debug_assert_eq!(
batch.fields().len() + FIXED_POS_COLUMN_NUM,
self.arrow_schema.fields().len()
);
let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
// Store all fields first.
for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
ensure!(
column.column_id == column_metadata.column_id,
InvalidBatchSnafu {
reason: format!(
"Batch has column {} but metadata has column {}",
column.column_id, column_metadata.column_id
),
}
);
columns.push(column.data.to_arrow_array());
}
// Add time index column.
columns.push(batch.timestamps().to_arrow_array());
// Add internal columns: primary key, sequences, op types.
columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
/// Convert a flat `RecordBatch` to primary-key format, retaining only
/// field columns, time index, and internal columns.
///
/// `num_fields` is the number of field columns. The method strips
/// leading tag columns: `num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM`.
pub(crate) fn convert_flat_batch(
&self,
batch: &RecordBatch,
num_fields: usize,
) -> Result<RecordBatch> {
let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
if let Some(override_sequence) = self.override_sequence {
let sequence_array =
let num_cols = columns.len();
// sequence is at num_cols - 2 (before op_type)
columns[num_cols - 2] =
Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
columns.push(sequence_array);
} else {
columns.push(batch.sequences().to_arrow_array());
}
columns.push(batch.op_types().to_arrow_array());
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
@@ -926,15 +908,6 @@ pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec
Ok(offsets)
}
/// Creates a new array for specific `primary_key`.
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
let keys = UInt32Array::from_value(0, num_rows);
// Safety: The key index is valid.
Arc::new(DictionaryArray::new(keys, values))
}
/// Gets the min/max time index of the row group from the parquet meta.
/// It assumes the parquet is created by the mito engine.
pub(crate) fn parquet_row_group_time_range(
@@ -1017,7 +990,7 @@ mod tests {
use api::v1::OpType;
use datatypes::arrow::array::{
Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
use datatypes::prelude::ConcreteDataType;
@@ -1145,13 +1118,6 @@ mod tests {
assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
}
#[test]
fn test_new_primary_key_array() {
let array = new_primary_key_array(b"test", 3);
let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
assert_eq!(&expect, &array);
}
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
let values = Arc::new(BinaryArray::from_iter_values(
pk_row_nums.iter().map(|v| &v.0),
@@ -1164,49 +1130,6 @@ mod tests {
Arc::new(DictionaryArray::new(keys, values))
}
#[test]
fn test_convert_batch() {
let metadata = build_test_region_metadata();
let write_format = PrimaryKeyWriteFormat::new(metadata);
let num_rows = 4;
let batch = new_batch(b"test", 1, 2, num_rows);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
let actual = write_format.convert_batch(&batch).unwrap();
assert_eq!(expect_record, actual);
}
#[test]
fn test_convert_batch_with_override_sequence() {
let metadata = build_test_region_metadata();
let write_format =
PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
let num_rows = 4;
let batch = new_batch(b"test", 1, 2, num_rows);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
let actual = write_format.convert_batch(&batch).unwrap();
assert_eq!(expect_record, actual);
}
#[test]
fn test_projection_indices() {
let metadata = build_test_region_metadata();
@@ -1867,4 +1790,100 @@ mod tests {
let result = format.convert_batch(record_batch.clone(), None).unwrap();
assert_eq!(record_batch, result);
}
#[test]
fn test_convert_flat_batch() {
let metadata = build_test_region_metadata();
let write_format = PrimaryKeyWriteFormat::new(metadata);
let num_rows = 4;
// Build a flat record batch: tag0, tag1, field1, field0, ts, __primary_key, __sequence, __op_type
let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
// num_fields = 2 (field1, field0)
let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
// Expected: tag columns stripped, only field1, field0, ts, __primary_key, __sequence, __op_type
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
];
let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_convert_flat_batch_with_override_sequence() {
let metadata = build_test_region_metadata();
let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
let num_rows = 4;
let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
Arc::new(UInt64Array::from(vec![999; num_rows])), // overridden __sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
];
let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_convert_flat_batch_no_tags() {
// Test with a region that has no primary key columns (no tags to strip).
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
});
let metadata = Arc::new(builder.build().unwrap());
let write_format = PrimaryKeyWriteFormat::new(metadata);
let num_rows = 3;
// No tag columns, so flat batch is: field0, ts, __primary_key, __sequence, __op_type
let sst_schema = write_format.arrow_schema().clone();
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![10; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), // ts
build_test_pk_array(&[(b"".to_vec(), num_rows)]), // __primary_key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
];
let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
// num_fields = 1, num_tag_columns = 5 - 1 - 4 = 0, so nothing is stripped
let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
assert_eq!(expected, result);
}
}

View File

@@ -21,9 +21,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, tracing, warn};
use common_telemetry::{tracing, warn};
use datafusion_expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::Field;
@@ -57,7 +56,7 @@ use crate::metrics::{
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::flat_projection::CompactionProjectionMapper;
use crate::read::prune::{PruneReader, Source};
use crate::read::prune::FlatPruneReader;
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::{
@@ -303,7 +302,8 @@ impl ParquetReaderBuilder {
pub async fn build(&self) -> Result<Option<ParquetReader>> {
let mut metrics = ReaderMetrics::default();
let Some((context, selection)) = self.build_reader_input(&mut metrics).await? else {
let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await?
else {
return Ok(None);
};
ParquetReader::new(Arc::new(context), selection)
@@ -325,12 +325,14 @@ impl ParquetReaderBuilder {
&self,
metrics: &mut ReaderMetrics,
) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
self.build_reader_input_inner(metrics).await
self.build_reader_input_inner(metrics, self.flat_format)
.await
}
async fn build_reader_input_inner(
&self,
metrics: &mut ReaderMetrics,
flat_format: bool,
) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
let start = Instant::now();
@@ -373,7 +375,7 @@ impl ParquetReaderBuilder {
// before compat handling.
let compaction_projection_mapper = if self.compaction
&& !is_same_region_partition
&& self.flat_format
&& flat_format
&& region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
{
Some(CompactionProjectionMapper::try_new(&region_meta)?)
@@ -385,7 +387,7 @@ impl ParquetReaderBuilder {
ReadFormat::new(
region_meta.clone(),
Some(column_ids),
self.flat_format,
flat_format,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
@@ -401,7 +403,7 @@ impl ParquetReaderBuilder {
ReadFormat::new(
region_meta.clone(),
Some(&column_ids),
self.flat_format,
flat_format,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
@@ -1751,24 +1753,6 @@ impl RowGroupReaderBuilder {
}
}
/// The state of a [ParquetReader].
enum ReaderState {
/// The reader is reading a row group.
Readable(PruneReader),
/// The reader is exhausted.
Exhausted(ReaderMetrics),
}
impl ReaderState {
/// Returns the metrics of the reader.
fn metrics(&self) -> ReaderMetrics {
match self {
ReaderState::Readable(reader) => reader.metrics(),
ReaderState::Exhausted(m) => m.clone(),
}
}
}
/// The filter to evaluate or the prune result of the default value.
pub(crate) enum MaybeFilter {
/// The filter to evaluate.
@@ -1879,13 +1863,12 @@ pub struct ParquetReader {
/// Row group selection to read.
selection: RowGroupSelection,
/// Reader of current row group.
reader_state: ReaderState,
reader: Option<FlatPruneReader>,
/// Metrics for tracking row group fetch operations.
fetch_metrics: ParquetFetchMetrics,
}
#[async_trait]
impl BatchReader for ParquetReader {
impl ParquetReader {
#[tracing::instrument(
skip_all,
fields(
@@ -1893,18 +1876,20 @@ impl BatchReader for ParquetReader {
file_id = %self.context.reader_builder().file_handle.file_id()
)
)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let ReaderState::Readable(reader) = &mut self.reader_state else {
return Ok(None);
};
pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
if let Some(reader) = &mut self.reader {
if let Some(batch) = reader.next_batch()? {
return Ok(Some(batch));
}
self.reader = None;
continue;
}
// We don't collect the elapsed time if the reader returns an error.
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
}
let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
return Ok(None);
};
// No more items in current row group, reads next row group.
while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
let parquet_reader = self
.context
.reader_builder()
@@ -1915,54 +1900,14 @@ impl BatchReader for ParquetReader {
)
.await?;
// Resets the parquet reader.
// Compute skip_fields for this row group
let skip_fields = self.context.should_skip_fields(row_group_idx);
reader.reset_source(
Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
self.reader = Some(FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
FlatRowGroupReader::new(self.context.clone(), parquet_reader),
skip_fields,
);
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
}
));
}
// The reader is exhausted.
self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
Ok(None)
}
}
impl Drop for ParquetReader {
fn drop(&mut self) {
let metrics = self.reader_state.metrics();
debug!(
"Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.filter_metrics.rg_total
- metrics.filter_metrics.rg_inverted_filtered
- metrics.filter_metrics.rg_minmax_filtered
- metrics.filter_metrics.rg_fulltext_filtered
- metrics.filter_metrics.rg_bloom_filtered,
metrics.filter_metrics.rg_total,
metrics
);
// Report metrics.
READ_STAGE_ELAPSED
.with_label_values(&["build_parquet_reader"])
.observe(metrics.build_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan_row_groups"])
.observe(metrics.scan_cost.as_secs_f64());
metrics.observe_rows("parquet_reader");
metrics.filter_metrics.observe();
}
}
impl ParquetReader {
/// Creates a new reader.
#[tracing::instrument(
skip_all,
@@ -1975,28 +1920,27 @@ impl ParquetReader {
context: FileRangeContextRef,
mut selection: RowGroupSelection,
) -> Result<Self> {
debug_assert!(context.read_format().as_flat().is_some());
let fetch_metrics = ParquetFetchMetrics::default();
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
.await?;
// Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx);
ReaderState::Readable(PruneReader::new_with_row_group_reader(
Some(FlatPruneReader::new_with_row_group_reader(
context.clone(),
RowGroupReader::new(context.clone(), parquet_reader),
FlatRowGroupReader::new(context.clone(), parquet_reader),
skip_fields,
))
} else {
ReaderState::Exhausted(ReaderMetrics::default())
None
};
Ok(ParquetReader {
context,
selection,
reader_state,
reader,
fetch_metrics,
})
}

View File

@@ -50,7 +50,7 @@ use crate::config::{IndexBuildMode, IndexConfig};
use crate::error::{
InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
};
use crate::read::{Batch, FlatSource, Source};
use crate::read::FlatSource;
use crate::sst::file::RegionFileId;
use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
@@ -60,6 +60,35 @@ use crate::sst::{
DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
};
/// Converts a flat RecordBatch for writing to parquet.
enum FlatBatchConverter {
/// Write as-is in flat format.
Flat(FlatWriteFormat),
/// Convert flat batch to primary-key format by stripping tag columns.
PrimaryKey {
format: PrimaryKeyWriteFormat,
num_fields: usize,
},
}
impl FlatBatchConverter {
fn arrow_schema(&self) -> &SchemaRef {
match self {
FlatBatchConverter::Flat(f) => f.arrow_schema(),
FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(),
}
}
fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
match self {
FlatBatchConverter::Flat(f) => f.convert_batch(batch),
FlatBatchConverter::PrimaryKey { format, num_fields } => {
format.convert_flat_batch(batch, *num_fields)
}
}
}
}
/// Parquet SST writer.
pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
@@ -240,81 +269,6 @@ where
Ok(())
}
/// Iterates source and writes all rows to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all(
&mut self,
source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let res = self
.write_all_without_cleaning(source, override_sequence, opts)
.await;
if res.is_err() {
// Clean tmp files explicitly on failure.
let file_id = self.current_file;
if let Some(cleaner) = &self.file_cleaner {
cleaner.clean_by_file_id(file_id).await;
}
}
res
}
async fn write_all_without_cleaning(
&mut self,
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
.with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
while let Some(res) = self
.write_next_batch(&mut source, &write_format, opts)
.await
.transpose()
{
match res {
Ok(mut batch) => {
stats.update(&batch);
let start = Instant::now();
// safety: self.current_indexer must be set when first batch has been written.
match self.index_config.build_mode {
IndexBuildMode::Sync => {
self.current_indexer
.as_mut()
.unwrap()
.update(&mut batch)
.await;
}
IndexBuildMode::Async => {}
}
self.metrics.update_index += start.elapsed();
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
{
self.finish_current_file(&mut results, &mut stats).await?;
}
}
Err(e) => {
if let Some(indexer) = &mut self.current_indexer {
indexer.abort().await;
}
return Err(e);
}
}
}
self.finish_current_file(&mut results, &mut stats).await?;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(results)
}
/// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
@@ -324,11 +278,15 @@ where
override_sequence: Option<SequenceNumber>,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let res = self
.write_all_flat_without_cleaning(source, override_sequence, opts)
.await;
let converter = FlatBatchConverter::Flat(
FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(override_sequence),
);
let res = self.write_all_flat_inner(source, &converter, opts).await;
if res.is_err() {
// Clean tmp files explicitly on failure.
let file_id = self.current_file;
if let Some(cleaner) = &self.file_cleaner {
cleaner.clean_by_file_id(file_id).await;
@@ -337,36 +295,58 @@ where
res
}
async fn write_all_flat_without_cleaning(
/// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all_flat_as_primary_key(
&mut self,
mut source: FlatSource,
source: FlatSource,
override_sequence: Option<SequenceNumber>,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let num_fields = self.metadata.field_columns().count();
let converter = FlatBatchConverter::PrimaryKey {
format: PrimaryKeyWriteFormat::new(self.metadata.clone())
.with_override_sequence(override_sequence),
num_fields,
};
let res = self.write_all_flat_inner(source, &converter, opts).await;
if res.is_err() {
let file_id = self.current_file;
if let Some(cleaner) = &self.file_cleaner {
cleaner.clean_by_file_id(file_id).await;
}
}
res
}
async fn write_all_flat_inner(
&mut self,
mut source: FlatSource,
converter: &FlatBatchConverter,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let flat_format = FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
while let Some(record_batch) = self
.write_next_flat_batch(&mut source, &flat_format, opts)
.write_next_flat_batch(&mut source, converter, opts)
.await
.transpose()
{
match record_batch {
Ok(batch) => {
stats.update_flat(&batch)?;
let start = Instant::now();
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update_flat(&batch)
.await;
self.metrics.update_index += start.elapsed();
if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
let start = Instant::now();
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update_flat(&batch)
.await;
self.metrics.update_index += start.elapsed();
}
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
{
@@ -411,34 +391,10 @@ where
.set_column_compression(op_type_col, Compression::UNCOMPRESSED)
}
async fn write_next_batch(
&mut self,
source: &mut Source,
write_format: &PrimaryKeyWriteFormat,
opts: &WriteOptions,
) -> Result<Option<Batch>> {
let start = Instant::now();
let Some(batch) = source.next_batch().await? else {
return Ok(None);
};
self.metrics.iter_source += start.elapsed();
let arrow_batch = write_format.convert_batch(&batch)?;
let start = Instant::now();
self.maybe_init_writer(write_format.arrow_schema(), opts)
.await?
.write(&arrow_batch)
.await
.context(WriteParquetSnafu)?;
self.metrics.write_batch += start.elapsed();
Ok(Some(batch))
}
async fn write_next_flat_batch(
&mut self,
source: &mut FlatSource,
flat_format: &FlatWriteFormat,
converter: &FlatBatchConverter,
opts: &WriteOptions,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
@@ -447,15 +403,16 @@ where
};
self.metrics.iter_source += start.elapsed();
let arrow_batch = flat_format.convert_batch(&record_batch)?;
let arrow_batch = converter.convert_batch(&record_batch)?;
let start = Instant::now();
self.maybe_init_writer(flat_format.arrow_schema(), opts)
self.maybe_init_writer(converter.arrow_schema(), opts)
.await?
.write(&arrow_batch)
.await
.context(WriteParquetSnafu)?;
self.metrics.write_batch += start.elapsed();
// Return original flat batch for stats/indexer which use flat layout.
Ok(Some(record_batch))
}
@@ -515,26 +472,6 @@ struct SourceStats {
}
impl SourceStats {
fn update(&mut self, batch: &Batch) {
if batch.is_empty() {
return;
}
self.num_rows += batch.num_rows();
self.series_estimator.update(batch);
// Safety: batch is not empty.
let (min_in_batch, max_in_batch) = (
batch.first_timestamp().unwrap(),
batch.last_timestamp().unwrap(),
);
if let Some(time_range) = &mut self.time_range {
time_range.0 = time_range.0.min(min_in_batch);
time_range.1 = time_range.1.max(max_in_batch);
} else {
self.time_range = Some((min_in_batch, max_in_batch));
}
}
fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
if record_batch.num_rows() == 0 {
return Ok(());

View File

@@ -18,7 +18,11 @@ use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt8Array, UInt64Array};
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
TimestampMillisecondArray, UInt8Array, UInt64Array,
};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::ValueRef;
@@ -32,8 +36,9 @@ use store_api::metric_engine_consts::{
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{FileId, RegionId};
use crate::read::{Batch, BatchBuilder, Source};
use crate::read::{Batch, FlatSource, Source};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger};
/// Test region id.
@@ -246,34 +251,68 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
new_batch_with_custom_sequence(tags, start, end, 1000)
}
pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch {
/// Creates a flat format RecordBatch for testing.
/// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
pub fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
new_record_batch_with_custom_sequence(tags, start, end, 1000)
}
/// Creates a flat format RecordBatch for testing with a custom sequence.
pub fn new_record_batch_with_custom_sequence(
tags: &[&str],
start: usize,
end: usize,
sequence: u64,
) -> RecordBatch {
assert!(end >= start);
let metadata = Arc::new(sst_region_metadata());
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let num_rows = end - start;
let mut columns = Vec::new();
// Add primary key columns (tag_0, tag_1) as dictionary arrays
let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
tag_0_builder.append_value(tags[0]);
tag_1_builder.append_value(tags[1]);
}
columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
// Add field column (field_0)
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
columns.push(Arc::new(UInt64Array::from(field_values)));
// Add time index column (ts)
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
// Add encoded primary key column
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
pk_builder.append(&pk).unwrap();
}
columns.push(Arc::new(pk_builder.finish()));
let field: Vec<_> = (start..end)
.map(|_v| "some data".as_bytes().to_vec())
.collect();
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(sequence, num_rows)));
let mut builder = BatchBuilder::new(pk);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(1, Arc::new(BinaryArray::from_iter_values(field)))
.unwrap();
builder.build().unwrap()
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(
OpType::Put as u8,
num_rows,
)));
RecordBatch::try_new(flat_schema, columns).unwrap()
}
/// Creates a FlatSource from flat format RecordBatches.
pub fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
}
/// Creates a new region metadata for testing SSTs with binary datatype.