fix: fix panic and limit concurrency in flat format (#7035)

* feat: add a semaphore to control flush concurrency

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: build FlatSchemaOptions from encoding in FlatWriteFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove allow dead_code

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle sparse encoding in FlatCompatBatch

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: add time index column in try_new_compact_sparse

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for compaction and sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-29 10:20:06 +08:00
committed by GitHub
parent 4a3c5f85e5
commit 90d37cb10e
6 changed files with 182 additions and 23 deletions

View File

@@ -28,7 +28,7 @@ use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::{mpsc, watch};
use tokio::sync::{Semaphore, mpsc, watch};
use crate::access_layer::{
AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
@@ -250,6 +250,8 @@ pub(crate) struct RegionFlushTask {
/// Index options for the region.
pub(crate) index_options: IndexOptions,
/// Semaphore to control flush concurrency.
pub(crate) flush_semaphore: Arc<Semaphore>,
}
impl RegionFlushTask {
@@ -586,7 +588,9 @@ impl RegionFlushTask {
let write_request = self.new_write_request(version, max_sequence, source);
let access_layer = self.access_layer.clone();
let write_opts = write_opts.clone();
let semaphore = self.flush_semaphore.clone();
let task = common_runtime::spawn_global(async move {
let _permit = semaphore.acquire().await.unwrap();
access_layer
.write_sst(write_request, &write_opts, WriteType::Flush)
.await
@@ -597,7 +601,9 @@ impl RegionFlushTask {
let access_layer = self.access_layer.clone();
let cache_manager = self.cache_manager.clone();
let region_id = version.metadata.region_id;
let semaphore = self.flush_semaphore.clone();
let task = common_runtime::spawn_global(async move {
let _permit = semaphore.acquire().await.unwrap();
let metrics = access_layer
.put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
.await?;
@@ -1251,6 +1257,7 @@ mod tests {
.mock_manifest_context(version_control.current().version.metadata.clone())
.await,
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1292,6 +1299,7 @@ mod tests {
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
})
.collect();
// Schedule first task.

View File

@@ -22,7 +22,7 @@ use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
};
use datatypes::arrow::compute::{TakeOptions, take};
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
@@ -33,12 +33,13 @@ use mito_codec::row_converter::{
build_primary_key_codec_with_fields,
};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
NewRecordBatchSnafu, Result, UnexpectedSnafu,
NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
};
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
@@ -90,7 +91,6 @@ pub(crate) enum CompatBatch {
/// Adapter for primary key format.
PrimaryKey(PrimaryKeyCompatBatch),
/// Adapter for flat format.
#[allow(dead_code)]
Flat(FlatCompatBatch),
}
@@ -104,7 +104,6 @@ impl CompatBatch {
}
/// Returns the inner flat batch adapter if this is a Flat format.
#[allow(dead_code)]
pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
match self {
CompatBatch::Flat(batch) => Some(batch),
@@ -186,7 +185,6 @@ pub(crate) fn has_same_columns_and_pk_encoding(
}
/// A helper struct to adapt schema of the batch to an expected schema.
#[allow(dead_code)]
pub(crate) struct FlatCompatBatch {
/// Indices to convert actual fields to expect fields.
index_or_defaults: Vec<IndexOrDefault>,
@@ -202,10 +200,12 @@ impl FlatCompatBatch {
/// - `mapper` is built from the metadata users expect to see.
/// - `actual` is the [RegionMetadata] of the input parquet.
/// - `format_projection` is the projection of the read format for the input parquet.
/// - `compaction` indicates whether the reader is for compaction.
pub(crate) fn try_new(
mapper: &FlatProjectionMapper,
actual: &RegionMetadataRef,
format_projection: &FormatProjection,
compaction: bool,
) -> Result<Option<Self>> {
let actual_schema = flat_projected_columns(actual, format_projection);
let expect_schema = mapper.batch_schema();
@@ -215,6 +215,28 @@ impl FlatCompatBatch {
return Ok(None);
}
if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
// Special handling for sparse encoding in compaction.
return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
}
let (index_or_defaults, fields) =
Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
Ok(Some(Self {
index_or_defaults,
arrow_schema: Arc::new(Schema::new(fields)),
compat_pk,
}))
}
fn compute_index_and_fields(
actual_schema: &[(ColumnId, ConcreteDataType)],
expect_schema: &[(ColumnId, ConcreteDataType)],
expect_metadata: &RegionMetadata,
) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
// Maps column id to the index and data type in the actual schema.
let actual_schema_index: HashMap<_, _> = actual_schema
.iter()
@@ -226,9 +248,9 @@ impl FlatCompatBatch {
let mut fields = Vec::with_capacity(expect_schema.len());
for (column_id, expect_data_type) in expect_schema {
// Safety: expect_schema comes from the same mapper.
let column_index = mapper.metadata().column_index_by_id(*column_id).unwrap();
let expect_column = &mapper.metadata().column_metadatas[column_index];
let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index];
let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
let expect_column = &expect_metadata.column_metadatas[column_index];
let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
// For tag columns, we need to create a dictionary field.
if expect_column.semantic_type == SemanticType::Tag {
fields.push(tag_maybe_to_dictionary_field(
@@ -257,11 +279,11 @@ impl FlatCompatBatch {
.column_schema
.create_default_vector(1)
.context(CreateDefaultSnafu {
region_id: mapper.metadata().region_id,
region_id: expect_metadata.region_id,
column: &expect_column.column_schema.name,
})?
.with_context(|| CompatReaderSnafu {
region_id: mapper.metadata().region_id,
region_id: expect_metadata.region_id,
reason: format!(
"column {} does not have a default value to read",
expect_column.column_schema.name
@@ -276,7 +298,40 @@ impl FlatCompatBatch {
}
fields.extend_from_slice(&internal_fields());
let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
Ok((index_or_defaults, fields))
}
fn try_new_compact_sparse(
mapper: &FlatProjectionMapper,
actual: &RegionMetadataRef,
) -> Result<Option<Self>> {
// Currently, we don't support converting sparse encoding back to dense encoding in
// flat format.
ensure!(
mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
UnsupportedOperationSnafu {
err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
}
);
// For sparse encoding, we don't need to check the primary keys.
// Since this is for compaction, we always read all columns.
let actual_schema: Vec<_> = actual
.field_columns()
.chain([actual.time_index_column()])
.map(|col| (col.column_id, col.column_schema.data_type.clone()))
.collect();
let expect_schema: Vec<_> = mapper
.metadata()
.field_columns()
.chain([mapper.metadata().time_index_column()])
.map(|col| (col.column_id, col.column_schema.data_type.clone()))
.collect();
let (index_or_defaults, fields) =
Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
let compat_pk = FlatCompatPrimaryKey::default();
Ok(Some(Self {
index_or_defaults,
@@ -286,7 +341,6 @@ impl FlatCompatBatch {
}
/// Make columns of the `batch` compatible.
#[allow(dead_code)]
pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
let len = batch.num_rows();
let columns = self
@@ -769,6 +823,7 @@ impl FlatRewritePrimaryKey {
}
/// Helper to make primary key compatible for flat format.
#[derive(Default)]
struct FlatCompatPrimaryKey {
/// Primary key rewriter.
rewriter: Option<FlatRewritePrimaryKey>,
@@ -1456,9 +1511,10 @@ mod tests {
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection)
.unwrap()
.unwrap();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
@@ -1547,9 +1603,10 @@ mod tests {
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection)
.unwrap()
.unwrap();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
// Tag array.
let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
@@ -1597,4 +1654,86 @@ mod tests {
assert_eq!(expected_batch, result);
}
#[test]
fn test_flat_compat_batch_compact_sparse() {
let mut actual_metadata = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[],
);
actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let actual_metadata = Arc::new(actual_metadata);
let mut expected_metadata = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[],
);
expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let expected_metadata = Arc::new(expected_metadata);
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 2, 3].into_iter(),
None,
"test",
true,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
.unwrap()
.unwrap();
let sparse_k1 = encode_sparse_key(&[]);
let input_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let input_schema =
to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
let result = compat_batch.compat(input_batch).unwrap();
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let output_schema =
to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
}

View File

@@ -961,6 +961,7 @@ impl ScanInput {
mapper,
flat_format.metadata(),
flat_format.format_projection(),
self.compaction,
)?
.map(CompatBatch::Flat)
} else {

View File

@@ -304,9 +304,11 @@ where
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let flat_format =
FlatWriteFormat::new(self.metadata.clone(), &FlatSchemaOptions::default())
.with_override_sequence(None);
let flat_format = FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(None);
let mut stats = SourceStats::default();
while let Some(record_batch) = self

View File

@@ -49,7 +49,7 @@ use store_api::region_engine::{
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Mutex, mpsc, oneshot, watch};
use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
@@ -165,6 +165,7 @@ impl WorkerGroup {
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
// We use another scheduler to avoid purge jobs blocking other jobs.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
@@ -211,6 +212,7 @@ impl WorkerGroup {
schema_metadata_manager: schema_metadata_manager.clone(),
file_ref_manager: file_ref_manager.clone(),
partition_expr_fetcher: partition_expr_fetcher.clone(),
flush_semaphore: flush_semaphore.clone(),
}
.start()
})
@@ -319,6 +321,7 @@ impl WorkerGroup {
});
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
@@ -367,6 +370,7 @@ impl WorkerGroup {
schema_metadata_manager: schema_metadata_manager.clone(),
file_ref_manager: file_ref_manager.clone(),
partition_expr_fetcher: partition_expr_fetcher.clone(),
flush_semaphore: flush_semaphore.clone(),
}
.start()
})
@@ -448,6 +452,7 @@ struct WorkerStarter<S> {
schema_metadata_manager: SchemaMetadataManagerRef,
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
flush_semaphore: Arc<Semaphore>,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -502,6 +507,7 @@ impl<S: LogStore> WorkerStarter<S> {
schema_metadata_manager: self.schema_metadata_manager,
file_ref_manager: self.file_ref_manager.clone(),
partition_expr_fetcher: self.partition_expr_fetcher,
flush_semaphore: self.flush_semaphore,
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -755,6 +761,8 @@ struct RegionWorkerLoop<S> {
file_ref_manager: FileReferenceManagerRef,
/// Partition expr fetcher used to backfill partition expr on open for compatibility.
partition_expr_fetcher: PartitionExprFetcherRef,
/// Semaphore to control flush concurrency.
flush_semaphore: Arc<Semaphore>,
}
impl<S: LogStore> RegionWorkerLoop<S> {

View File

@@ -119,6 +119,7 @@ impl<S> RegionWorkerLoop<S> {
cache_manager: self.cache_manager.clone(),
manifest_ctx: region.manifest_ctx.clone(),
index_options: region.version().options.index_options.clone(),
flush_semaphore: self.flush_semaphore.clone(),
}
}
}