feat: support flush and compact flat format files (#6949)

* feat: basic functions for flush/compact flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: bridge flush and compaction for flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add write cache support

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: change log level to debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: wrap duplicated code to merge and dedup iter

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: wrap some code into flush_flat_mem_ranges

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: extract logic into do_flush_memtables

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-14 21:36:24 +08:00
committed by GitHub
parent 028effe952
commit b3aabb6706
8 changed files with 643 additions and 124 deletions

View File

@@ -13,10 +13,11 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_time::Timestamp;
use either::Either;
use futures::{Stream, TryStreamExt};
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
@@ -34,7 +35,7 @@ use crate::cache::write_cache::SstUploadRequest;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::Source;
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, RegionFileId};
use crate::sst::index::IndexerBuilderImpl;
@@ -44,6 +45,7 @@ use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
@@ -288,9 +290,16 @@ impl AccessLayer {
)
.await
.with_file_cleaner(cleaner);
let ssts = writer
.write_all(request.source, request.max_sequence, write_opts)
.await?;
let ssts = match request.source {
Either::Left(source) => {
writer
.write_all(source, request.max_sequence, write_opts)
.await?
}
Either::Right(flat_source) => {
writer.write_all_flat(flat_source, write_opts).await?
}
};
let metrics = writer.into_metrics();
(ssts, metrics)
};
@@ -310,6 +319,53 @@ impl AccessLayer {
Ok((sst_info, metrics))
}
/// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store.
pub(crate) async fn put_sst(
&self,
data: &bytes::Bytes,
region_id: RegionId,
sst_info: &SstInfo,
cache_manager: &CacheManagerRef,
) -> Result<Metrics> {
if let Some(write_cache) = cache_manager.write_cache() {
// Write to cache and upload to remote store
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory::new(
self.table_dir.clone(),
self.path_type,
),
remote_store: self.object_store.clone(),
};
write_cache
.put_and_upload_sst(data, region_id, sst_info, upload_request)
.await
} else {
let start = Instant::now();
let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
let sst_file_path =
path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id));
let mut writer = self
.object_store
.writer_with(&sst_file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(OpenDalSnafu)?;
if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) {
cleaner.clean_by_file_id(sst_info.file_id).await;
return Err(err);
}
if let Err(err) = writer.close().await.context(OpenDalSnafu) {
cleaner.clean_by_file_id(sst_info.file_id).await;
return Err(err);
}
let mut metrics = Metrics::new(WriteType::Flush);
metrics.write_batch = start.elapsed();
Ok(metrics)
}
}
/// Lists the SST entries from the storage layer in the table directory.
pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> + use<> {
let object_store = self.object_store.clone();
@@ -363,7 +419,7 @@ pub enum OperationType {
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub source: Either<Source, FlatSource>,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub storage: Option<String>,

View File

@@ -37,8 +37,8 @@ use crate::sst::file::RegionFileId;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// A cache for uploading files to remote object stores.
@@ -101,6 +101,66 @@ impl WriteCache {
self.file_cache.clone()
}
/// Put encoded SST data to the cache and upload to the remote object store.
pub(crate) async fn put_and_upload_sst(
&self,
data: &bytes::Bytes,
region_id: RegionId,
sst_info: &SstInfo,
upload_request: SstUploadRequest,
) -> Result<Metrics> {
let file_id = sst_info.file_id;
let mut metrics = Metrics::new(WriteType::Flush);
// Create index key for the SST file
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
// Write to cache first
let cache_start = Instant::now();
let cache_path = self.file_cache.cache_file_path(parquet_key);
let mut cache_writer = self
.file_cache
.local_store()
.writer(&cache_path)
.await
.context(crate::error::OpenDalSnafu)?;
cache_writer
.write(data.clone())
.await
.context(crate::error::OpenDalSnafu)?;
cache_writer
.close()
.await
.context(crate::error::OpenDalSnafu)?;
// Register in file cache
let index_value = IndexValue {
file_size: data.len() as u32,
};
self.file_cache.put(parquet_key, index_value).await;
metrics.write_batch = cache_start.elapsed();
// Upload to remote store
let upload_start = Instant::now();
let region_file_id = RegionFileId::new(region_id, file_id);
let remote_path = upload_request
.dest_path_provider
.build_sst_file_path(region_file_id);
if let Err(e) = self
.upload(parquet_key, &remote_path, &upload_request.remote_store)
.await
{
// Clean up cache on failure
self.remove(parquet_key).await;
return Err(e);
}
metrics.upload_parquet = upload_start.elapsed();
Ok(metrics)
}
/// Writes SST to the cache and then uploads it to the remote object store.
pub(crate) async fn write_and_upload_sst(
&self,
@@ -139,9 +199,14 @@ impl WriteCache {
.await
.with_file_cleaner(cleaner);
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
.await?;
let sst_info = match write_request.source {
either::Left(source) => {
writer
.write_all(source, write_request.max_sequence, write_opts)
.await?
}
either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
};
let mut metrics = writer.into_metrics();
// Upload sst file to remote object store.
@@ -469,7 +534,7 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source,
source: either::Left(source),
storage: None,
max_sequence: None,
cache_manager: Default::default(),
@@ -567,7 +632,7 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source,
source: either::Left(source),
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),
@@ -646,7 +711,7 @@ mod tests {
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source,
source: either::Left(source),
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),

View File

@@ -55,10 +55,10 @@ use crate::error::{
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedBatchReader;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
use crate::region::options::MergeMode;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
@@ -662,6 +662,32 @@ impl CompactionSstReaderBuilder<'_> {
.build_reader_for_compaction()
.await
}
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
let mut scan_input =
ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata, true)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(true);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
if let Some(time_range) = self.time_range {
scan_input =
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input, true)
.build_flat_reader_for_compaction()
.await
}
}
/// Converts time range to predicates so that rows outside the range will be filtered.

View File

@@ -42,7 +42,7 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics;
use crate::read::Source;
use crate::read::{FlatSource, Source};
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
@@ -335,6 +335,8 @@ impl Compactor for DefaultCompactor {
let region_id = compaction_region.region_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
// TODO: Set flat_format from mito config
let flat_format = false;
let index_options = compaction_region
.current_version
.options
@@ -359,7 +361,7 @@ impl Compactor for DefaultCompactor {
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let reader = CompactionSstReaderBuilder {
let builder = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
@@ -368,15 +370,20 @@ impl Compactor for DefaultCompactor {
filter_deleted: output.filter_deleted,
time_range: output.output_time_range,
merge_mode,
}
.build_sst_reader()
.await?;
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
either::Left(Source::Reader(reader))
};
let (sst_infos, metrics) = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_metadata,
source: Source::Reader(reader),
source,
cache_manager,
storage,
max_sequence: max_sequence.map(NonZero::get),

View File

@@ -18,32 +18,41 @@ use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use common_telemetry::{debug, error, info, trace};
use datatypes::arrow::datatypes::SchemaRef;
use either::Either;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::{mpsc, watch};
use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType};
use crate::access_layer::{
AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, InvalidPartitionExprSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, Result,
Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableRanges;
use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
};
use crate::read::Source;
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
use crate::read::merge::MergeReaderBuilder;
use crate::read::scan_region::PredicateGroup;
use crate::region::options::{IndexOptions, MergeMode};
use crate::region::version::{VersionControlData, VersionControlRef};
use crate::read::{FlatSource, Source};
use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
@@ -51,7 +60,8 @@ use crate::request::{
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::FileMeta;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::worker::WorkerListener;
/// Global write buffer (memtable) manager.
@@ -343,100 +353,12 @@ impl RegionFlushTask {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;
let mut series_count = 0;
let mut flush_metrics = Metrics::new(WriteType::Flush);
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
continue;
}
let MemtableRanges { ranges, stats } =
mem.ranges(None, PredicateGroup::default(), None)?;
let max_sequence = stats.max_sequence();
series_count += stats.series_count();
let source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
let iter = only_range.build_iter()?;
Source::Iter(iter)
} else {
// todo(hl): a workaround since sync version of MergeReader is wip.
let sources = ranges
.into_values()
.map(|r| r.build_iter().map(Source::Iter))
.collect::<Result<Vec<_>>>()?;
let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
let maybe_dedup = if version.options.append_mode {
// no dedup in append mode
Box::new(merge_reader) as _
} else {
// dedup according to merge mode
match version.options.merge_mode.unwrap_or(MergeMode::LastRow) {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
}
}
};
Source::Reader(maybe_dedup)
};
// Flush to level 0.
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let (ssts_written, metrics) = self
.access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.await?;
if ssts_written.is_empty() {
// No data written.
continue;
}
flush_metrics = flush_metrics.merge(metrics);
// Convert partition expression once outside the map
let partition_expr = match &version.metadata.partition_expr {
None => None,
Some(json_expr) if json_expr.is_empty() => None,
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
};
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
FileMeta {
region_id: self.region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
partition_expr: partition_expr.clone(),
}
}));
}
let DoFlushMemtablesResult {
file_metas,
flushed_bytes,
series_count,
flush_metrics,
} = self.do_flush_memtables(version, write_opts).await?;
if !file_metas.is_empty() {
FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
@@ -493,8 +415,237 @@ impl RegionFlushTask {
Ok(edit)
}
async fn do_flush_memtables(
&self,
version: &VersionRef,
write_opts: WriteOptions,
) -> Result<DoFlushMemtablesResult> {
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;
let mut series_count = 0;
// Convert partition expression once outside the map
let partition_expr = match &version.metadata.partition_expr {
None => None,
Some(json_expr) if json_expr.is_empty() => None,
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
};
let mut flush_metrics = Metrics::new(WriteType::Flush);
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
continue;
}
// Compact the memtable first, this waits the background compaction to finish.
let compact_start = std::time::Instant::now();
if let Err(e) = mem.compact(true) {
common_telemetry::error!(e; "Failed to compact memtable before flush");
}
let compact_cost = compact_start.elapsed();
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?;
let num_mem_ranges = mem_ranges.ranges.len();
let num_mem_rows = mem_ranges.stats.num_rows();
let memtable_id = mem.id();
// Increases series count for each mem range. We consider each mem range has different series so
// the counter may have more series than the actual series count.
series_count += mem_ranges.stats.series_count();
if mem_ranges.is_record_batch() {
let flush_start = Instant::now();
let FlushFlatMemResult {
num_encoded,
max_sequence,
num_sources,
results,
} = self
.flush_flat_mem_ranges(version, &write_opts, mem_ranges)
.await?;
for (source_idx, result) in results.into_iter().enumerate() {
let (ssts_written, metrics) = result?;
if ssts_written.is_empty() {
// No data written.
continue;
}
common_telemetry::debug!(
"Region {} flush one memtable {} {}/{}, metrics: {:?}",
self.region_id,
memtable_id,
source_idx,
num_sources,
metrics
);
flush_metrics = flush_metrics.merge(metrics);
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
Self::new_file_meta(
self.region_id,
max_sequence,
sst_info,
partition_expr.clone(),
)
}));
}
common_telemetry::debug!(
"Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
self.region_id,
num_sources,
memtable_id,
num_mem_ranges,
num_encoded,
num_mem_rows,
flush_start.elapsed(),
compact_cost,
);
} else {
let max_sequence = mem_ranges.stats.max_sequence();
let source = memtable_source(mem_ranges, &version.options).await?;
// Flush to level 0.
let source = Either::Left(source);
let write_request = self.new_write_request(version, max_sequence, source);
let (ssts_written, metrics) = self
.access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.await?;
if ssts_written.is_empty() {
// No data written.
continue;
}
common_telemetry::debug!(
"Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
self.region_id,
num_mem_ranges,
num_mem_rows,
metrics
);
flush_metrics = flush_metrics.merge(metrics);
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
Self::new_file_meta(
self.region_id,
max_sequence,
sst_info,
partition_expr.clone(),
)
}));
};
}
Ok(DoFlushMemtablesResult {
file_metas,
flushed_bytes,
series_count,
flush_metrics,
})
}
async fn flush_flat_mem_ranges(
&self,
version: &VersionRef,
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
) -> Result<FlushFlatMemResult> {
let batch_schema =
to_flat_sst_arrow_schema(&version.metadata, &FlatSchemaOptions::default());
let flat_sources = memtable_flat_sources(
batch_schema,
mem_ranges,
&version.options,
version.metadata.primary_key.len(),
)?;
let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
let num_encoded = flat_sources.encoded.len();
let max_sequence = flat_sources.max_sequence;
for source in flat_sources.sources {
let source = Either::Right(source);
let write_request = self.new_write_request(version, max_sequence, source);
let access_layer = self.access_layer.clone();
let write_opts = write_opts.clone();
let task = common_runtime::spawn_global(async move {
access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.await
});
tasks.push(task);
}
for encoded in flat_sources.encoded {
let access_layer = self.access_layer.clone();
let cache_manager = self.cache_manager.clone();
let region_id = version.metadata.region_id;
let task = common_runtime::spawn_global(async move {
let metrics = access_layer
.put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
.await?;
Ok((smallvec![encoded.sst_info], metrics))
});
tasks.push(task);
}
let num_sources = tasks.len();
let results = futures::future::try_join_all(tasks)
.await
.context(JoinSnafu)?;
Ok(FlushFlatMemResult {
num_encoded,
max_sequence,
num_sources,
results,
})
}
fn new_file_meta(
region_id: RegionId,
max_sequence: u64,
sst_info: SstInfo,
partition_expr: Option<PartitionExpr>,
) -> FileMeta {
FileMeta {
region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
partition_expr,
}
}
fn new_write_request(
&self,
version: &VersionRef,
max_sequence: u64,
source: Either<Source, FlatSource>,
) -> SstWriteRequest {
SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
}
}
/// Notify flush job status.
async fn send_worker_request(&self, request: WorkerRequest) {
pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
if let Err(e) = self
.request_sender
.send(WorkerRequestWithTime::new(request))
@@ -515,6 +666,148 @@ impl RegionFlushTask {
}
}
struct FlushFlatMemResult {
num_encoded: usize,
max_sequence: u64,
num_sources: usize,
results: Vec<Result<(SstInfoArray, Metrics)>>,
}
struct DoFlushMemtablesResult {
file_metas: Vec<FileMeta>,
flushed_bytes: u64,
series_count: usize,
flush_metrics: Metrics,
}
/// Returns a [Source] for the given memtable.
async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
let source = if mem_ranges.ranges.len() == 1 {
let only_range = mem_ranges.ranges.into_values().next().unwrap();
let iter = only_range.build_iter()?;
Source::Iter(iter)
} else {
// todo(hl): a workaround since sync version of MergeReader is wip.
let sources = mem_ranges
.ranges
.into_values()
.map(|r| r.build_iter().map(Source::Iter))
.collect::<Result<Vec<_>>>()?;
let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
let maybe_dedup = if options.append_mode {
// no dedup in append mode
Box::new(merge_reader) as _
} else {
// dedup according to merge mode
match options.merge_mode.unwrap_or(MergeMode::LastRow) {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
}
}
};
Source::Reader(maybe_dedup)
};
Ok(source)
}
struct FlatSources {
max_sequence: u64,
sources: SmallVec<[FlatSource; 4]>,
encoded: SmallVec<[EncodedRange; 4]>,
}
// TODO(yingwen): Flushes into multiple files in parallel.
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,
mem_ranges: MemtableRanges,
options: &RegionOptions,
field_column_start: usize,
) -> Result<FlatSources> {
let MemtableRanges { ranges, stats } = mem_ranges;
let max_sequence = stats.max_sequence();
let mut flat_sources = FlatSources {
max_sequence,
sources: SmallVec::new(),
encoded: SmallVec::new(),
};
if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
if let Some(encoded) = only_range.encoded() {
flat_sources.encoded.push(encoded);
} else {
let iter = only_range.build_record_batch_iter(None)?;
flat_sources.sources.push(FlatSource::Iter(iter));
};
} else {
let min_flush_rows = stats.num_rows / 8;
let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
let mut last_iter_rows = 0;
let num_ranges = ranges.len();
let mut input_iters = Vec::with_capacity(num_ranges);
for (_range_id, range) in ranges {
if let Some(encoded) = range.encoded() {
flat_sources.encoded.push(encoded);
continue;
}
let iter = range.build_record_batch_iter(None)?;
input_iters.push(iter);
last_iter_rows += range.num_rows();
if last_iter_rows > min_flush_rows {
let maybe_dedup = merge_and_dedup(
&schema,
options,
field_column_start,
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
last_iter_rows = 0;
}
}
// Handle remaining iters.
if !input_iters.is_empty() {
let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
}
}
Ok(flat_sources)
}
fn merge_and_dedup(
schema: &SchemaRef,
options: &RegionOptions,
field_column_start: usize,
input_iters: Vec<BoxedRecordBatchIterator>,
) -> Result<BoxedRecordBatchIterator> {
let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
let maybe_dedup = if options.append_mode {
// No dedup in append mode
Box::new(merge_iter) as _
} else {
// Dedup according to merge mode.
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
merge_iter,
FlatLastNonNull::new(field_column_start, false),
)) as _,
}
};
Ok(maybe_dedup)
}
/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Tracks regions need to flush.

View File

@@ -151,6 +151,19 @@ pub struct MemtableRanges {
pub stats: MemtableStats,
}
impl IterBuilder for MemtableRanges {
fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
UnsupportedOperationSnafu {
err_msg: "MemtableRanges does not support build iterator",
}
.fail()
}
fn is_record_batch(&self) -> bool {
self.ranges.values().all(|range| range.is_record_batch())
}
}
/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
@@ -528,6 +541,11 @@ impl MemtableRange {
pub fn num_rows(&self) -> usize {
self.num_rows
}
/// Returns the encoded range if available.
pub fn encoded(&self) -> Option<EncodedRange> {
self.context.builder.encoded_range()
}
}
#[cfg(test)]

View File

@@ -139,6 +139,27 @@ impl SeqScan {
Ok(Box::new(reader))
}
/// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
assert!(self.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::merge_all_flat_ranges_for_compaction(
&self.stream_ctx,
partition_ranges,
&part_metrics,
)
.await?;
Ok(reader)
}
/// Builds a merge reader that reads all ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_ranges_for_compaction(
@@ -172,6 +193,39 @@ impl SeqScan {
Self::build_reader_from_sources(stream_ctx, sources, None).await
}
/// Builds a merge reader that reads all flat ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_flat_ranges_for_compaction(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
part_metrics: &PartitionMetrics,
) -> Result<BoxedRecordBatchStream> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
for part_range in partition_ranges {
build_flat_sources(
stream_ctx,
part_range,
true,
part_metrics,
range_builder_list.clone(),
&mut sources,
)
.await?;
}
common_telemetry::debug!(
"Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
stream_ctx.input.mapper.metadata().region_id,
partition_ranges.len(),
sources.len()
);
Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]

View File

@@ -43,7 +43,7 @@ pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
/// Parquet write options.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct WriteOptions {
/// Buffer size for async writer.
pub write_buffer_size: ReadableSize,