feat: Implements FlatCompatBatch to adapt schema in flat format (#6771)

* feat: compat flat wip

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: rewrite key

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support append key

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: Split rewrite logic

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename CompatFlatBatch to FlatCompatBatch

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: compat primary key

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address fixme

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: new CompatBatch if need convert

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use different compat

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: plain_projection -> flat_projection

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: test compat

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test FlatCompatBatch

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: fix warnings and remove unused code

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: support compat tags with dictionary types

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: reuse column_id_values

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: avoid zero pk values len

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-20 20:36:14 +08:00
committed by GitHub
parent 05529387d9
commit 2995eddca5
7 changed files with 696 additions and 41 deletions

View File

@@ -61,7 +61,7 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::memtable::BoxedRecordBatchIterator;
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -217,9 +217,6 @@ impl BulkPart {
}
}
/// Builder type for primary key dictionary array.
type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
/// Primary key column builder for handling strings specially.
enum PrimaryKeyColumnBuilder {
/// String dictionary builder for string types.

View File

@@ -17,7 +17,15 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
};
use datatypes::arrow::compute::{take, TakeOptions};
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{
@@ -29,17 +37,22 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result, UnexpectedSnafu,
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
NewRecordBatchSnafu, Result, UnexpectedSnafu,
};
use crate::read::flat_projection::{flat_projected_columns, FlatProjectionMapper};
use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM};
use crate::sst::{internal_fields, to_dictionary_field};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
/// Underlying reader.
reader: R,
/// Helper to compat batches.
compat: CompatBatch,
compat: PrimaryKeyCompatBatch,
}
impl<R> CompatReader<R> {
@@ -54,7 +67,7 @@ impl<R> CompatReader<R> {
) -> Result<CompatReader<R>> {
Ok(CompatReader {
reader,
compat: CompatBatch::new(mapper, reader_meta)?,
compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
})
}
}
@@ -72,8 +85,36 @@ impl<R: BatchReader> BatchReader for CompatReader<R> {
}
}
/// Helper to adapt schema of the batch to an expected schema.
pub(crate) enum CompatBatch {
/// Adapter for primary key format.
PrimaryKey(PrimaryKeyCompatBatch),
/// Adapter for flat format.
#[allow(dead_code)]
Flat(FlatCompatBatch),
}
impl CompatBatch {
/// Returns the inner primary key batch adapter if this is a PrimaryKey format.
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
match self {
CompatBatch::PrimaryKey(batch) => Some(batch),
_ => None,
}
}
/// Returns the inner flat batch adapter if this is a Flat format.
#[allow(dead_code)]
pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
match self {
CompatBatch::Flat(batch) => Some(batch),
_ => None,
}
}
}
/// A helper struct to adapt schema of the batch to an expected schema.
pub(crate) struct CompatBatch {
pub(crate) struct PrimaryKeyCompatBatch {
/// Optional primary key adapter.
rewrite_pk: Option<RewritePrimaryKey>,
/// Optional primary key adapter.
@@ -82,7 +123,7 @@ pub(crate) struct CompatBatch {
compat_fields: Option<CompatFields>,
}
impl CompatBatch {
impl PrimaryKeyCompatBatch {
/// Creates a new [CompatBatch].
/// - `mapper` is built from the metadata users expect to see.
/// - `reader_meta` is the metadata of the input reader.
@@ -144,6 +185,170 @@ pub(crate) fn has_same_columns_and_pk_encoding(
true
}
/// A helper struct to adapt schema of the batch to an expected schema.
#[allow(dead_code)]
pub(crate) struct FlatCompatBatch {
/// Indices to convert actual fields to expect fields.
index_or_defaults: Vec<IndexOrDefault>,
/// Expected arrow schema.
arrow_schema: SchemaRef,
/// Primary key adapter.
compat_pk: FlatCompatPrimaryKey,
}
impl FlatCompatBatch {
/// Creates a [FlatCompatBatch].
///
/// - `mapper` is built from the metadata users expect to see.
/// - `actual` is the [RegionMetadata] of the input parquet.
/// - `format_projection` is the projection of the read format for the input parquet.
pub(crate) fn try_new(
mapper: &FlatProjectionMapper,
actual: &RegionMetadataRef,
format_projection: &FormatProjection,
) -> Result<Self> {
let actual_schema = flat_projected_columns(actual, format_projection);
let expect_schema = mapper.batch_schema();
// has_same_columns_and_pk_encoding() already checks columns and encodings.
debug_assert_ne!(expect_schema, actual_schema);
// Maps column id to the index and data type in the actual schema.
let actual_schema_index: HashMap<_, _> = actual_schema
.iter()
.enumerate()
.map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
.collect();
let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
let mut fields = Vec::with_capacity(expect_schema.len());
for (column_id, expect_data_type) in expect_schema {
// Safety: expect_schema comes from the same mapper.
let column_index = mapper.metadata().column_index_by_id(*column_id).unwrap();
let expect_column = &mapper.metadata().column_metadatas[column_index];
let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index];
// For tag columns, we need to create a dictionary field.
if expect_column.semantic_type == SemanticType::Tag {
fields.push(Arc::new(to_dictionary_field(column_field)));
} else {
fields.push(column_field.clone());
};
if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
let mut cast_type = None;
// Same column different type.
if expect_data_type != *actual_data_type {
cast_type = Some(expect_data_type.clone())
}
// Source has this column.
index_or_defaults.push(IndexOrDefault::Index {
pos: *index,
cast_type,
});
} else {
// Create a default vector with 1 element for that column.
let default_vector = expect_column
.column_schema
.create_default_vector(1)
.context(CreateDefaultSnafu {
region_id: mapper.metadata().region_id,
column: &expect_column.column_schema.name,
})?
.with_context(|| CompatReaderSnafu {
region_id: mapper.metadata().region_id,
reason: format!(
"column {} does not have a default value to read",
expect_column.column_schema.name
),
})?;
index_or_defaults.push(IndexOrDefault::DefaultValue {
column_id: expect_column.column_id,
default_vector,
semantic_type: expect_column.semantic_type,
});
};
}
fields.extend_from_slice(&internal_fields());
let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
Ok(Self {
index_or_defaults,
arrow_schema: Arc::new(Schema::new(fields)),
compat_pk,
})
}
/// Make columns of the `batch` compatible.
#[allow(dead_code)]
pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
let len = batch.num_rows();
let columns = self
.index_or_defaults
.iter()
.map(|index_or_default| match index_or_default {
IndexOrDefault::Index { pos, cast_type } => {
let old_column = batch.column(*pos);
if let Some(ty) = cast_type {
// Safety: We ensure type can be converted and the new batch should be valid.
// Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
let casted =
datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
.context(ComputeArrowSnafu)?;
Ok(casted)
} else {
Ok(old_column.clone())
}
}
IndexOrDefault::DefaultValue {
column_id: _,
default_vector,
semantic_type,
} => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
})
.chain(
// Adds internal columns.
batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
.iter()
.map(|col| Ok(col.clone())),
)
.collect::<Result<Vec<_>>>()?;
let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
.context(NewRecordBatchSnafu)?;
// Handles primary keys.
self.compat_pk.compat(compat_batch)
}
}
/// Repeats the vector value `to_len` times.
fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
assert_eq!(1, vector.len());
if is_tag {
let values = vector.to_arrow_array();
if values.is_null(0) {
// Creates a dictionary array with `to_len` null keys.
let keys = UInt32Array::new_null(to_len);
Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
} else {
let keys = UInt32Array::from_value(0, to_len);
Ok(Arc::new(DictionaryArray::new(keys, values)))
}
} else {
let keys = UInt32Array::from_value(0, to_len);
take(
&vector.to_arrow_array(),
&keys,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)
}
}
/// Helper to make primary key compatible.
#[derive(Debug)]
struct CompatPrimaryKey {
@@ -218,6 +423,7 @@ impl CompatFields {
IndexOrDefault::DefaultValue {
column_id,
default_vector,
semantic_type: _,
} => {
let data = default_vector.replicate(&[len]);
BatchColumn {
@@ -252,11 +458,8 @@ fn may_rewrite_primary_key(
})
}
/// Creates a [CompatPrimaryKey] if needed.
fn may_compat_primary_key(
expect: &RegionMetadata,
actual: &RegionMetadata,
) -> Result<Option<CompatPrimaryKey>> {
/// Returns true if the actual primary keys is the same as expected.
fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
ensure!(
actual.primary_key.len() <= expect.primary_key.len(),
CompatReaderSnafu {
@@ -278,7 +481,16 @@ fn may_compat_primary_key(
),
}
);
if actual.primary_key.len() == expect.primary_key.len() {
Ok(actual.primary_key.len() == expect.primary_key.len())
}
/// Creates a [CompatPrimaryKey] if needed.
fn may_compat_primary_key(
expect: &RegionMetadata,
actual: &RegionMetadata,
) -> Result<Option<CompatPrimaryKey>> {
if is_primary_key_same(expect, actual)? {
return Ok(None);
}
@@ -368,6 +580,7 @@ fn may_compat_fields(
Ok(IndexOrDefault::DefaultValue {
column_id: column.column_id,
default_vector,
semantic_type: SemanticType::Field,
})
}
})
@@ -393,6 +606,8 @@ enum IndexOrDefault {
column_id: ColumnId,
/// Default value. The vector has only 1 element.
default_vector: VectorRef,
/// Semantic type of the column.
semantic_type: SemanticType,
},
}
@@ -450,11 +665,248 @@ impl RewritePrimaryKey {
}
}
/// Helper to rewrite primary key to another encoding for flat format.
struct FlatRewritePrimaryKey {
/// New primary key encoder.
codec: Arc<dyn PrimaryKeyCodec>,
/// Metadata of the expected region.
metadata: RegionMetadataRef,
/// Original primary key codec.
/// If we need to rewrite the primary key.
old_codec: Arc<dyn PrimaryKeyCodec>,
}
impl FlatRewritePrimaryKey {
fn new(
expect: &RegionMetadataRef,
actual: &RegionMetadataRef,
) -> Option<FlatRewritePrimaryKey> {
if expect.primary_key_encoding == actual.primary_key_encoding {
return None;
}
let codec = build_primary_key_codec(expect);
let old_codec = build_primary_key_codec(actual);
Some(FlatRewritePrimaryKey {
codec,
metadata: expect.clone(),
old_codec,
})
}
/// Rewrites the primary key of the `batch`.
/// It also appends the values to the primary key.
fn rewrite_key(
&self,
append_values: &[(ColumnId, Value)],
batch: RecordBatch,
) -> Result<RecordBatch> {
let old_pk_dict_array = batch
.column(primary_key_column_index(batch.num_columns()))
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let old_pk_values_array = old_pk_dict_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let mut builder = BinaryBuilder::with_capacity(
old_pk_values_array.len(),
old_pk_values_array.value_data().len(),
);
// Binary buffer for the primary key.
let mut buffer = Vec::with_capacity(
old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
);
let mut column_id_values = Vec::new();
// Iterates the binary array and rewrites the primary key.
for value in old_pk_values_array.iter() {
let Some(old_pk) = value else {
builder.append_null();
continue;
};
// Decodes the old primary key.
let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
pk_values.extend(append_values);
buffer.clear();
column_id_values.clear();
// Encodes the new primary key.
match pk_values {
CompositeValues::Dense(dense_values) => {
self.codec
.encode_values(dense_values.as_slice(), &mut buffer)
.context(EncodeSnafu)?;
}
CompositeValues::Sparse(sparse_values) => {
for id in &self.metadata.primary_key {
let value = sparse_values.get_or_null(*id);
column_id_values.push((*id, value.clone()));
}
self.codec
.encode_values(&column_id_values, &mut buffer)
.context(EncodeSnafu)?;
}
}
builder.append_value(&buffer);
}
let new_pk_values_array = Arc::new(builder.finish());
let new_pk_dict_array =
PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
let mut columns = batch.columns().to_vec();
columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
}
}
/// Helper to make primary key compatible for flat format.
struct FlatCompatPrimaryKey {
/// Primary key rewriter.
rewriter: Option<FlatRewritePrimaryKey>,
/// Converter to append values to primary keys.
converter: Option<Arc<dyn PrimaryKeyCodec>>,
/// Default values to append.
values: Vec<(ColumnId, Value)>,
}
impl FlatCompatPrimaryKey {
fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
let rewriter = FlatRewritePrimaryKey::new(expect, actual);
if is_primary_key_same(expect, actual)? {
return Ok(Self {
rewriter,
converter: None,
values: Vec::new(),
});
}
// We need to append default values to the primary key.
let to_add = &expect.primary_key[actual.primary_key.len()..];
let mut values = Vec::with_capacity(to_add.len());
let mut fields = Vec::with_capacity(to_add.len());
for column_id in to_add {
// Safety: The id comes from expect region metadata.
let column = expect.column_by_id(*column_id).unwrap();
fields.push((
*column_id,
SortField::new(column.column_schema.data_type.clone()),
));
let default_value = column
.column_schema
.create_default()
.context(CreateDefaultSnafu {
region_id: expect.region_id,
column: &column.column_schema.name,
})?
.with_context(|| CompatReaderSnafu {
region_id: expect.region_id,
reason: format!(
"key column {} does not have a default value to read",
column.column_schema.name
),
})?;
values.push((*column_id, default_value));
}
// is_primary_key_same() is false so we have different number of primary key columns.
debug_assert!(!fields.is_empty());
// Create converter to append values.
let converter = Some(build_primary_key_codec_with_fields(
expect.primary_key_encoding,
fields.into_iter(),
));
Ok(Self {
rewriter,
converter,
values,
})
}
/// Makes primary key of the `batch` compatible.
///
/// Callers must ensure other columns except the `__primary_key` column is compatible.
fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
if let Some(rewriter) = &self.rewriter {
// If we have different encoding, rewrite the whole primary key.
return rewriter.rewrite_key(&self.values, batch);
}
self.append_key(batch)
}
/// Appends values to the primary key of the `batch`.
fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
let Some(converter) = &self.converter else {
return Ok(batch);
};
let old_pk_dict_array = batch
.column(primary_key_column_index(batch.num_columns()))
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.unwrap();
let old_pk_values_array = old_pk_dict_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let mut builder = BinaryBuilder::with_capacity(
old_pk_values_array.len(),
old_pk_values_array.value_data().len()
+ converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
);
// Binary buffer for the primary key.
let mut buffer = Vec::with_capacity(
old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
+ converter.estimated_size().unwrap_or_default(),
);
// Iterates the binary array and appends values to the primary key.
for value in old_pk_values_array.iter() {
let Some(old_pk) = value else {
builder.append_null();
continue;
};
buffer.clear();
buffer.extend_from_slice(old_pk);
converter
.encode_values(&self.values, &mut buffer)
.context(EncodeSnafu)?;
builder.append_value(&buffer);
}
let new_pk_values_array = Arc::new(builder.finish());
let new_pk_dict_array =
PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
// Overrides the primary key column.
let mut columns = batch.columns().to_vec();
columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
@@ -467,6 +919,9 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].
@@ -941,4 +1396,180 @@ mod tests {
)
.await;
}
/// Creates a primary key array for flat format testing.
fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for &pk in primary_keys {
builder.append(pk).unwrap();
}
Arc::new(builder.finish())
}
#[test]
fn test_flat_compat_batch_with_missing_columns() {
let actual_metadata = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expected_metadata = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
// Adds a new field.
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter());
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
tag_builder.append_value("tag1");
let tag_dict_array = Arc::new(tag_builder.finish());
let k1 = encode_key(&[Some("tag1")]);
let input_columns: Vec<ArrayRef> = vec![
tag_dict_array.clone(),
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&k1, &k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let input_schema =
to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
let result = compat_batch.compat(input_batch).unwrap();
let expected_schema =
to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
let expected_columns: Vec<ArrayRef> = vec![
tag_dict_array.clone(),
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&k1, &k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
#[test]
fn test_flat_compat_batch_with_different_pk_encoding() {
let mut actual_metadata = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
);
actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
let actual_metadata = Arc::new(actual_metadata);
let mut expected_metadata = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Tag, ConcreteDataType::string_datatype()),
],
&[1, 3],
);
expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let expected_metadata = Arc::new(expected_metadata);
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter());
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
// Tag array.
let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag1_builder.append_value("tag1");
tag1_builder.append_value("tag1");
let tag1_dict_array = Arc::new(tag1_builder.finish());
let k1 = encode_key(&[Some("tag1")]);
let input_columns: Vec<ArrayRef> = vec![
tag1_dict_array.clone(),
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&k1, &k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let input_schema =
to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
let result = compat_batch.compat(input_batch).unwrap();
let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
null_tag_builder.append_nulls(2);
let null_tag_dict_array = Arc::new(null_tag_builder.finish());
let expected_columns: Vec<ArrayRef> = vec![
tag1_dict_array.clone(),
null_tag_dict_array,
Arc::new(Int64Array::from(vec![100, 200])),
Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
Arc::new(UInt64Array::from_iter_values([1, 2])),
Arc::new(UInt8Array::from_iter_values([
OpType::Put as u8,
OpType::Put as u8,
])),
];
let output_schema =
to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
}

View File

@@ -45,7 +45,7 @@ use crate::error::Result;
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
@@ -907,17 +907,29 @@ impl ScanInput {
}
}
};
if !compat::has_same_columns_and_pk_encoding(
let need_compat = !compat::has_same_columns_and_pk_encoding(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
);
if need_compat {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
let mapper = self.mapper.as_flat().unwrap();
Some(CompatBatch::Flat(FlatCompatBatch::try_new(
mapper,
flat_format.metadata(),
flat_format.format_projection(),
)?))
} else {
let compact_batch = PrimaryKeyCompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
Some(CompatBatch::PrimaryKey(compact_batch))
};
file_range_ctx.set_compat_batch(compat);
}
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
}

View File

@@ -687,7 +687,7 @@ pub fn build_file_range_scan_stream(
let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? {
if let Some(compact_batch) = compat_batch {
batch = compact_batch.compat_batch(batch)?;
batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
}
yield batch;
}

View File

@@ -127,12 +127,7 @@ pub fn to_flat_sst_arrow_schema(
.is_string()
{
let field = &schema.fields[pk_index];
let field = Arc::new(Field::new_dictionary(
field.name(),
datatypes::arrow::datatypes::DataType::UInt32,
field.data_type().clone(),
field.is_nullable(),
));
let field = Arc::new(to_dictionary_field(field));
fields.push(field);
} else {
fields.push(schema.fields[pk_index].clone());
@@ -159,8 +154,18 @@ pub fn to_flat_sst_arrow_schema(
Arc::new(Schema::new(fields))
}
/// Helper function to create a dictionary field from a field.
pub(crate) fn to_dictionary_field(field: &Field) -> Field {
Field::new_dictionary(
field.name(),
datatypes::arrow::datatypes::DataType::UInt32,
field.data_type().clone(),
field.is_nullable(),
)
}
/// Fields for internal columns.
fn internal_fields() -> [FieldRef; 3] {
pub(crate) fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.
[
Arc::new(Field::new_dictionary(

View File

@@ -128,11 +128,8 @@ pub struct FlatReadFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
/// Indices of columns to read from the SST. It contains all internal columns.
projection_indices: Vec<usize>,
/// Column id to their index in the projected schema (
/// the schema after projection).
column_id_to_projected_index: HashMap<ColumnId, usize>,
/// Projection.
format_projection: FormatProjection,
/// Column id to index in SST.
column_id_to_sst_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
@@ -159,8 +156,7 @@ impl FlatReadFormat {
FlatReadFormat {
metadata,
arrow_schema,
projection_indices: format_projection.projection_indices,
column_id_to_projected_index: format_projection.column_id_to_projected_index,
format_projection,
column_id_to_sst_index: id_to_index,
override_sequence: None,
}
@@ -174,7 +170,10 @@ impl FlatReadFormat {
/// Index of a column in the projected batch by its column id.
pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.column_id_to_projected_index.get(&column_id).copied()
self.format_projection
.column_id_to_projected_index
.get(&column_id)
.copied()
}
/// Returns min values of specific column in row groups.
@@ -225,7 +224,7 @@ impl FlatReadFormat {
/// Gets sorted projection indices to read.
pub(crate) fn projection_indices(&self) -> &[usize] {
&self.projection_indices
&self.format_projection.projection_indices
}
/// Creates a sequence array to override.
@@ -263,6 +262,11 @@ impl FlatReadFormat {
RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu)
}
/// Returns the format projection.
pub(crate) fn format_projection(&self) -> &FormatProjection {
&self.format_projection
}
fn get_stat_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],

View File

@@ -33,7 +33,9 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
use datatypes::arrow::array::{
ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
@@ -55,11 +57,15 @@ use crate::sst::to_sst_arrow_schema;
/// Arrow array type for the primary key dictionary.
pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
/// Builder type for primary key dictionary array.
pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
/// Number of columns that have fixed positions.
///
/// Contains: time index and internal columns.
pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
/// Number of internal columns.
pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
/// Helper for writing the SST format with primary key.
pub(crate) struct PrimaryKeyWriteFormat {