feat: Update parquet writer and indexer to support the flat format (#6866)

* feat: implements method to write flat batch for ParquetWriter

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add update method for flat RecordBatch in Indexer

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: calls indexer to write flat batch in ParquetWriter

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle empty projection for flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: eval array in precise_filter_flat

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cache column lookup result in inverted indexer

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support dict type in dense codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: remove read part in test as it need modifying the reader

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support dictionary type in other methods for dense codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: fulltext use string array directly

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-02 20:48:34 +08:00
committed by GitHub
parent d394f38d18
commit 8fc42aeb27
11 changed files with 815 additions and 39 deletions

View File

@@ -59,6 +59,15 @@ impl SortField {
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::estimated_size_by_type(dict_type.value_type())
}
data_type => Self::estimated_size_by_type(data_type),
}
}
fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
match data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
@@ -88,16 +97,29 @@ impl SortField {
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
match self.data_type() {
ConcreteDataType::Dictionary(dict_type) => {
Self::serialize_by_type(dict_type.value_type(), serializer, value)
}
data_type => Self::serialize_by_type(data_type, serializer, value),
}
}
fn serialize_by_type(
data_type: &ConcreteDataType,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
$data_type: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
match $data_type {
$(
ConcreteDataType::$ty(_) => {
paste!{
@@ -139,13 +161,13 @@ impl SortField {
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
return error::NotSupportedFieldSnafu {
data_type: $self.data_type.clone()
data_type: $data_type.clone()
}.fail()
}
}
};
}
cast_value_and_serialize!(self; serializer;
cast_value_and_serialize!(data_type; serializer;
Boolean, boolean,
Binary, binary,
Int8, i8,
@@ -172,16 +194,28 @@ impl SortField {
/// Deserialize a value from the deserializer.
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::deserialize_by_type(dict_type.value_type(), deserializer)
}
data_type => Self::deserialize_by_type(data_type, deserializer),
}
}
fn deserialize_by_type<B: Buf>(
data_type: &ConcreteDataType,
deserializer: &mut Deserializer<B>,
) -> Result<Value> {
macro_rules! deserialize_and_build_value {
(
$self: ident;
$data_type: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
match $data_type {
$(
ConcreteDataType::$ty(_) => {
Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
@@ -235,7 +269,7 @@ impl SortField {
}
};
}
deserialize_and_build_value!(self; deserializer;
deserialize_and_build_value!(data_type; deserializer;
Boolean, bool,
Int8, i8,
Int16, i16,
@@ -267,7 +301,20 @@ impl SortField {
return Ok(1);
}
let to_skip = match &self.data_type {
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer)
}
data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer),
}
}
fn skip_deserialize_by_type(
data_type: &ConcreteDataType,
bytes: &[u8],
deserializer: &mut Deserializer<&[u8]>,
) -> Result<usize> {
let to_skip = match data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
@@ -629,6 +676,51 @@ mod tests {
)
}
#[test]
fn test_memcmp_dictionary() {
// Test Dictionary<i32, string>
check_encode_and_decode(
&[ConcreteDataType::dictionary_datatype(
ConcreteDataType::int32_datatype(),
ConcreteDataType::string_datatype(),
)],
vec![Value::String("hello".into())],
);
// Test Dictionary<i32, i64>
check_encode_and_decode(
&[ConcreteDataType::dictionary_datatype(
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
)],
vec![Value::Int64(42)],
);
// Test Dictionary with null value
check_encode_and_decode(
&[ConcreteDataType::dictionary_datatype(
ConcreteDataType::int32_datatype(),
ConcreteDataType::string_datatype(),
)],
vec![Value::Null],
);
// Test multiple Dictionary columns
check_encode_and_decode(
&[
ConcreteDataType::dictionary_datatype(
ConcreteDataType::int32_datatype(),
ConcreteDataType::string_datatype(),
),
ConcreteDataType::dictionary_datatype(
ConcreteDataType::int16_datatype(),
ConcreteDataType::int64_datatype(),
),
],
vec![Value::String("world".into()), Value::Int64(123)],
);
}
#[test]
fn test_encode_multiple_rows() {
check_encode_and_decode(
@@ -691,6 +783,10 @@ mod tests {
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::decimal128_default_datatype(),
ConcreteDataType::vector_datatype(3),
ConcreteDataType::dictionary_datatype(
ConcreteDataType::int32_datatype(),
ConcreteDataType::string_datatype(),
),
],
vec![
Value::Boolean(true),
@@ -715,6 +811,7 @@ mod tests {
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
Value::Decimal128(Decimal128::from(16)),
Value::Binary(Bytes::from(vec![0; 12])),
Value::String("dict_value".into()),
],
);
}

View File

@@ -66,7 +66,7 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
use crate::read::prune::PruneReader;
/// Storage internal representation of a batch of rows for a primary key (time series).
@@ -994,6 +994,24 @@ impl Source {
}
}
/// Async [RecordBatch] reader and iterator wrapper for flat format.
pub enum FlatSource {
/// Source from a [BoxedRecordBatchIterator].
Iter(BoxedRecordBatchIterator),
/// Source from a [BoxedRecordBatchStream].
Stream(BoxedRecordBatchStream),
}
impl FlatSource {
/// Returns next [RecordBatch] from this data source.
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::Iter(iter) => iter.next().transpose(),
FlatSource::Stream(stream) => stream.try_next().await,
}
}
}
/// Async batch reader.
///
/// The reader must guarantee [Batch]es returned by it have the same schema.

View File

@@ -89,6 +89,18 @@ impl FlatProjectionMapper {
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(metadata);
// TODO(yingwen): Support different flat schema options.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
// All columns with internal columns.
metadata.column_metadatas.len() + 3,
column_ids.iter().copied(),
);
let batch_schema = flat_projected_columns(metadata, &format_projection);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(FlatProjectionMapper {
@@ -104,16 +116,6 @@ impl FlatProjectionMapper {
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(metadata);
// TODO(yingwen): Support different flat schema options.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
// All columns with internal columns.
metadata.column_metadatas.len() + 3,
column_ids.iter().copied(),
);
let batch_indices: Vec<_> = column_ids
.iter()
.map(|id| {
@@ -126,8 +128,6 @@ impl FlatProjectionMapper {
})
.collect();
let batch_schema = flat_projected_columns(metadata, &format_projection);
Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema,

View File

@@ -25,6 +25,7 @@ use std::num::NonZeroUsize;
use bloom_filter::creator::BloomFilterIndexer;
use common_telemetry::{debug, warn};
use datatypes::arrow::record_batch::RecordBatch;
use puffin_manager::SstPuffinManager;
use smallvec::SmallVec;
use statistics::{ByteCount, RowCount};
@@ -120,6 +121,13 @@ impl Indexer {
self.flush_mem_metrics();
}
/// Updates the index with the given flat format RecordBatch.
pub async fn update_flat(&mut self, batch: &RecordBatch) {
self.do_update_flat(batch).await;
self.flush_mem_metrics();
}
/// Finalizes the index creation.
pub async fn finish(&mut self) -> IndexOutput {
let output = self.do_finish().await;

View File

@@ -17,7 +17,9 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::{debug, warn};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SkippingIndexType;
use datatypes::vectors::Helper;
use index::bloom_filter::creator::BloomFilterCreator;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
@@ -63,6 +65,9 @@ pub struct BloomFilterIndexer {
/// The global memory usage.
global_memory_usage: Arc<AtomicUsize>,
/// Region metadata for column lookups.
metadata: RegionMetadataRef,
}
impl BloomFilterIndexer {
@@ -120,6 +125,7 @@ impl BloomFilterIndexer {
aborted: false,
stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
global_memory_usage,
metadata: metadata.clone(),
};
Ok(Some(indexer))
}
@@ -150,6 +156,29 @@ impl BloomFilterIndexer {
Ok(())
}
/// Updates the bloom filter index with the given flat format RecordBatch.
pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if self.creators.is_empty() || batch.num_rows() == 0 {
return Ok(());
}
if let Err(update_err) = self.do_update_flat(batch).await {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
if cfg!(any(test, feature = "test")) {
panic!("Failed to clean up index creator, err: {err:?}",);
} else {
warn!(err; "Failed to clean up index creator");
}
}
return Err(update_err);
}
Ok(())
}
/// Finishes index creation and cleans up garbage.
/// Returns the number of rows and bytes written.
///
@@ -254,6 +283,59 @@ impl BloomFilterIndexer {
Ok(())
}
async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
let mut guard = self.stats.record_update();
let n = batch.num_rows();
guard.inc_row_count(n);
for (col_id, creator) in &mut self.creators {
// Get the column name from metadata
if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
let column_name = &column_meta.column_schema.name;
// Find the column in the RecordBatch by name
if let Some(column_array) = batch.column_by_name(column_name) {
// Convert Arrow array to VectorRef
let vector = Helper::try_into_vector(column_array.clone())
.context(crate::error::ConvertVectorSnafu)?;
let sort_field = SortField::new(vector.data_type());
for i in 0..n {
let value = vector.get_ref(i);
let elems = (!value.is_null())
.then(|| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
.context(EncodeSnafu)?;
Ok(buf)
})
.transpose()?;
creator
.push_row_elems(elems)
.await
.context(PushBloomFilterValueSnafu)?;
}
} else {
debug!(
"Column {} not found in the batch during building bloom filter index",
column_name
);
// Push empty elements to maintain alignment
for _ in 0..n {
creator
.push_row_elems(None)
.await
.context(PushBloomFilterValueSnafu)?;
}
}
}
}
Ok(())
}
/// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
let mut guard = self.stats.record_finish();

View File

@@ -17,6 +17,9 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::warn;
use datatypes::arrow::array::{Array, StringArray};
use datatypes::arrow::datatypes::DataType;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
use index::fulltext_index::create::{
BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
@@ -29,8 +32,9 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::error::{
CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
CastVectorSnafu, ComputeArrowSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu,
FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
Result,
};
use crate::read::Batch;
use crate::sst::file::FileId;
@@ -119,6 +123,7 @@ impl FulltextIndexer {
column_id,
SingleCreator {
column_id,
column_name: column.column_schema.name.clone(),
inner,
compress,
},
@@ -150,6 +155,28 @@ impl FulltextIndexer {
Ok(())
}
/// Updates the fulltext index with the given flat format RecordBatch.
pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if batch.num_rows() == 0 {
return Ok(());
}
if let Err(update_err) = self.do_update_flat(batch).await {
if let Err(err) = self.do_abort().await {
if cfg!(any(test, feature = "test")) {
panic!("Failed to abort index creator, err: {err}");
} else {
warn!(err; "Failed to abort index creator");
}
}
return Err(update_err);
}
Ok(())
}
/// Finalizes the index creation.
pub async fn finish(
&mut self,
@@ -204,6 +231,17 @@ impl FulltextIndexer {
Ok(())
}
async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
let mut guard = self.stats.record_update();
guard.inc_row_count(batch.num_rows());
for creator in self.creators.values_mut() {
creator.update_flat(batch).await?;
}
Ok(())
}
async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
let mut guard = self.stats.record_finish();
@@ -233,6 +271,8 @@ impl FulltextIndexer {
struct SingleCreator {
/// Column ID.
column_id: ColumnId,
/// Column name.
column_name: String,
/// Inner creator.
inner: AltFulltextCreator,
/// Whether the index should be compressed.
@@ -277,6 +317,30 @@ impl SingleCreator {
Ok(())
}
async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
// Find the column in the RecordBatch by name
if let Some(column_array) = batch.column_by_name(&self.column_name) {
// Convert Arrow array to string array.
// TODO(yingwen): Use Utf8View later if possible.
let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
.context(ComputeArrowSnafu)?;
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
for text_opt in string_array.iter() {
let text = text_opt.unwrap_or_default();
self.inner.push_text(text).await?;
}
} else {
// If the column is not found in the batch, push empty text.
// Ensure that the number of texts pushed is the same as the number of rows in the SST,
// so that the texts are aligned with the row ids.
for _ in 0..batch.num_rows() {
self.inner.push_text("").await?;
}
}
Ok(())
}
async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
let options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_telemetry::warn;
use datatypes::arrow::record_batch::RecordBatch;
use crate::read::Batch;
use crate::sst::index::Indexer;
@@ -108,4 +109,95 @@ impl Indexer {
false
}
pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) {
if batch.num_rows() == 0 {
return;
}
if !self.do_update_flat_inverted_index(batch).await {
self.do_abort().await;
}
if !self.do_update_flat_fulltext_index(batch).await {
self.do_abort().await;
}
if !self.do_update_flat_bloom_filter(batch).await {
self.do_abort().await;
}
}
/// Returns false if the update failed.
async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool {
let Some(creator) = self.inverted_indexer.as_mut() else {
return true;
};
let Err(err) = creator.update_flat(batch).await else {
return true;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
/// Returns false if the update failed.
async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool {
let Some(creator) = self.fulltext_indexer.as_mut() else {
return true;
};
let Err(err) = creator.update_flat(batch).await else {
return true;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
/// Returns false if the update failed.
async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool {
let Some(creator) = self.bloom_filter_indexer.as_mut() else {
return true;
};
let Err(err) = creator.update_flat(batch).await else {
return true;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
}

View File

@@ -18,6 +18,8 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::{debug, warn};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::Helper;
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
@@ -73,6 +75,12 @@ pub struct InvertedIndexer {
/// Ids of indexed columns and their names (`to_string` of the column id).
indexed_column_ids: Vec<(ColumnId, String)>,
/// Region metadata for column lookups.
metadata: RegionMetadataRef,
/// Cache for mapping indexed column positions to their indices in the RecordBatch.
/// Aligns with indexed_column_ids. Initialized lazily when first batch is processed.
column_index_cache: Option<Vec<Option<usize>>>,
}
impl InvertedIndexer {
@@ -121,6 +129,8 @@ impl InvertedIndexer {
aborted: false,
memory_usage,
indexed_column_ids,
metadata: metadata.clone(),
column_index_cache: None,
}
}
@@ -148,6 +158,93 @@ impl InvertedIndexer {
Ok(())
}
/// Updates the inverted index with the given flat format RecordBatch.
pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if batch.num_rows() == 0 {
return Ok(());
}
self.do_update_flat(batch).await
}
async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
// Initialize column index cache if not already done
if self.column_index_cache.is_none() {
self.initialize_column_index_cache(batch);
}
let mut guard = self.stats.record_update();
let n = batch.num_rows();
guard.inc_row_count(n);
let column_indices = self.column_index_cache.as_ref().unwrap();
for ((col_id, col_id_str), &column_index) in
self.indexed_column_ids.iter().zip(column_indices.iter())
{
if let Some(index) = column_index {
let column_array = batch.column(index);
// Convert Arrow array to VectorRef using Helper
let vector = Helper::try_into_vector(column_array.clone())
.context(crate::error::ConvertVectorSnafu)?;
let sort_field = SortField::new(vector.data_type());
for row in 0..n {
self.value_buf.clear();
let value_ref = vector.get_ref(row);
if value_ref.is_null() {
self.index_creator
.push_with_name(col_id_str, None)
.await
.context(PushIndexValueSnafu)?;
} else {
IndexValueCodec::encode_nonnull_value(
value_ref,
&sort_field,
&mut self.value_buf,
)
.context(EncodeSnafu)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.await
.context(PushIndexValueSnafu)?;
}
}
} else {
debug!(
"Column {} not found in the batch during building inverted index",
col_id
);
}
}
Ok(())
}
/// Initializes the column index cache by mapping indexed column ids to their positions in the RecordBatch.
fn initialize_column_index_cache(&mut self, batch: &RecordBatch) {
let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len());
for (col_id, _) in &self.indexed_column_ids {
let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
let column_name = &column_meta.column_schema.name;
batch
.schema()
.column_with_name(column_name)
.map(|(index, _)| index)
} else {
None
};
column_indices.push(column_index);
}
self.column_index_cache = Some(column_indices);
}
/// Finishes index creation and cleans up garbage.
/// Returns the number of rows and bytes written.
pub async fn finish(

View File

@@ -90,12 +90,16 @@ mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::OpType;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{col, lit, BinaryExpr, Expr, Literal, Operator};
use datatypes::arrow;
use datatypes::arrow::array::{RecordBatch, UInt64Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
@@ -109,7 +113,7 @@ mod tests {
FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::{BatchBuilder, BatchReader};
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
use crate::sst::file_purger::NoopFilePurger;
@@ -119,11 +123,13 @@ mod tests {
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::sst::{
location, to_flat_sst_arrow_schema, FlatSchemaOptions, DEFAULT_WRITE_CONCURRENCY,
};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_batch_with_custom_sequence, new_source, sst_file_handle,
sst_file_handle_with_file_id, sst_region_metadata,
new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
@@ -969,6 +975,142 @@ mod tests {
assert!(cached.contains_row_group(3));
}
/// Creates a flat format RecordBatch for testing.
/// Similar to `new_batch_by_range` but returns a RecordBatch in flat format.
fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch {
assert!(end >= start);
let metadata = Arc::new(sst_region_metadata());
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let num_rows = end - start;
let mut columns = Vec::new();
// Add primary key columns (tag_0, tag_1) as dictionary arrays
let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
tag_0_builder.append_value(tags[0]);
tag_1_builder.append_value(tags[1]);
}
columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef);
columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef);
// Add field column (field_0)
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
columns.push(Arc::new(UInt64Array::from(field_values)));
// Add time index column (ts)
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
// Add encoded primary key column
let pk = new_primary_key(tags);
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
pk_builder.append(&pk).unwrap();
}
columns.push(Arc::new(pk_builder.finish()));
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(
OpType::Put as u8,
num_rows,
)));
RecordBatch::try_new(flat_schema, columns).unwrap()
}
/// Creates a FlatSource from flat format RecordBatches.
fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
}
#[tokio::test]
async fn test_write_flat_with_index() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 50;
// Create flat format RecordBatches
let flat_batches = vec![
new_record_batch_by_range(&["a", "d"], 0, 20),
new_record_batch_by_range(&["b", "d"], 0, 20),
new_record_batch_by_range(&["c", "d"], 0, 20),
new_record_batch_by_range(&["c", "f"], 0, 40),
new_record_batch_by_range(&["c", "h"], 100, 200),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let puffin_manager = env
.get_puffin_manager()
.build(object_store.clone(), file_path.clone());
let intermediate_manager = env.get_intermediate_manager();
let indexer_builder = IndexerBuilderImpl {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size,
puffin_manager,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
segment_row_count: 1,
..Default::default()
},
},
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
Metrics::new(WriteType::Flush),
)
.await;
let info = writer
.write_all_flat(flat_source, &write_opts)
.await
.unwrap()
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
assert!(info.index_metadata.inverted_index.index_size > 0);
assert_eq!(info.index_metadata.inverted_index.row_count, 200);
assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
assert!(info.index_metadata.bloom_filter.index_size > 0);
assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);
}
#[tokio::test]
async fn test_read_with_override_sequence() {
let mut env = TestEnv::new().await;

View File

@@ -29,8 +29,8 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
RecordBatchSnafu, Result, StatsNotPresentSnafu,
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
Result, StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
@@ -381,10 +381,7 @@ impl RangeBase {
let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
if let Some(idx) = column_idx {
let column = &input.columns()[idx];
// Convert Arrow Array to Vector
let vector = datatypes::vectors::Helper::try_into_vector(column.clone())
.context(ConvertVectorSnafu)?;
let result = filter.evaluate_vector(&vector).context(RecordBatchSnafu)?;
let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
} else {
// Column not found in projection, continue

View File

@@ -24,7 +24,13 @@ use std::time::Instant;
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::array::{
ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use datatypes::arrow::compute::{max, min};
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::{FuturesAsyncWriter, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -40,14 +46,17 @@ use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source};
use crate::error::{
InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
};
use crate::read::{Batch, FlatSource, Source};
use crate::sst::file::{FileId, RegionFileId};
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{time_index_column_index, FlatWriteFormat};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::sst::{FlatSchemaOptions, DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
@@ -270,6 +279,73 @@ where
Ok(results)
}
/// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all_flat(
&mut self,
source: FlatSource,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let res = self.write_all_flat_without_cleaning(source, opts).await;
if res.is_err() {
// Clean tmp files explicitly on failure.
let file_id = self.current_file;
if let Some(cleaner) = &self.file_cleaner {
cleaner.clean_by_file_id(file_id).await;
}
}
res
}
async fn write_all_flat_without_cleaning(
&mut self,
mut source: FlatSource,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let flat_format =
FlatWriteFormat::new(self.metadata.clone(), &FlatSchemaOptions::default())
.with_override_sequence(None);
let mut stats = SourceStats::default();
while let Some(record_batch) = self
.write_next_flat_batch(&mut source, &flat_format, opts)
.await
.transpose()
{
match record_batch {
Ok(batch) => {
stats.update_flat(&batch)?;
let start = Instant::now();
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update_flat(&batch)
.await;
self.metrics.update_index += start.elapsed();
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
{
self.finish_current_file(&mut results, &mut stats).await?;
}
}
Err(e) => {
if let Some(indexer) = &mut self.current_indexer {
indexer.abort().await;
}
return Err(e);
}
}
}
self.finish_current_file(&mut results, &mut stats).await?;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(results)
}
/// Customizes per-column config according to schema and maybe column cardinality.
fn customize_column_config(
builder: WriterPropertiesBuilder,
@@ -313,6 +389,30 @@ where
Ok(Some(batch))
}
async fn write_next_flat_batch(
&mut self,
source: &mut FlatSource,
flat_format: &FlatWriteFormat,
opts: &WriteOptions,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
let Some(record_batch) = source.next_batch().await? else {
return Ok(None);
};
self.metrics.iter_source += start.elapsed();
let arrow_batch = flat_format.convert_batch(&record_batch)?;
let start = Instant::now();
self.maybe_init_writer(flat_format.arrow_schema(), opts)
.await?
.write(&arrow_batch)
.await
.context(WriteParquetSnafu)?;
self.metrics.write_batch += start.elapsed();
Ok(Some(record_batch))
}
async fn maybe_init_writer(
&mut self,
schema: &SchemaRef,
@@ -388,6 +488,85 @@ impl SourceStats {
self.time_range = Some((min_in_batch, max_in_batch));
}
}
fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
if record_batch.num_rows() == 0 {
return Ok(());
}
self.num_rows += record_batch.num_rows();
// Get the timestamp column by index
let time_index_col_idx = time_index_column_index(record_batch.num_columns());
let timestamp_array = record_batch.column(time_index_col_idx);
if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
if let Some(time_range) = &mut self.time_range {
time_range.0 = time_range.0.min(min_in_batch);
time_range.1 = time_range.1.max(max_in_batch);
} else {
self.time_range = Some((min_in_batch, max_in_batch));
}
}
Ok(())
}
}
/// Gets min and max timestamp from an timestamp array.
fn timestamp_range_from_array(
timestamp_array: &ArrayRef,
) -> Result<Option<(Timestamp, Timestamp)>> {
let (min_ts, max_ts) = match timestamp_array.data_type() {
DataType::Timestamp(TimeUnit::Second, _) => {
let array = timestamp_array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
let min_val = min(array).map(Timestamp::new_second);
let max_val = max(array).map(Timestamp::new_second);
(min_val, max_val)
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
let array = timestamp_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let min_val = min(array).map(Timestamp::new_millisecond);
let max_val = max(array).map(Timestamp::new_millisecond);
(min_val, max_val)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
let array = timestamp_array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
let min_val = min(array).map(Timestamp::new_microsecond);
let max_val = max(array).map(Timestamp::new_microsecond);
(min_val, max_val)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let array = timestamp_array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let min_val = min(array).map(Timestamp::new_nanosecond);
let max_val = max(array).map(Timestamp::new_nanosecond);
(min_val, max_val)
}
_ => {
return UnexpectedSnafu {
reason: format!(
"Unexpected data type of time index: {:?}",
timestamp_array.data_type()
),
}
.fail()
}
};
// If min timestamp exists, max timestamp should also exist.
Ok(min_ts.zip(max_ts))
}
/// Workaround for [AsyncArrowWriter] does not provide a method to