From 8fc42aeb27db2b59b655e48282f2d79a2a3d01f0 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 2 Sep 2025 20:48:34 +0800 Subject: [PATCH] feat: Update parquet writer and indexer to support the flat format (#6866) * feat: implements method to write flat batch for ParquetWriter Signed-off-by: evenyag * feat: add update method for flat RecordBatch in Indexer Signed-off-by: evenyag * feat: calls indexer to write flat batch in ParquetWriter Signed-off-by: evenyag * fix: handle empty projection for flat format Signed-off-by: evenyag * fix: eval array in precise_filter_flat Signed-off-by: evenyag * feat: cache column lookup result in inverted indexer Signed-off-by: evenyag * test: add test Signed-off-by: evenyag * feat: support dict type in dense codec Signed-off-by: evenyag * test: remove read part in test as it need modifying the reader Signed-off-by: evenyag * feat: support dictionary type in other methods for dense codec Signed-off-by: evenyag * refactor: fulltext use string array directly Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito-codec/src/row_converter/dense.rs | 113 ++++++++++- src/mito2/src/read.rs | 20 +- src/mito2/src/read/flat_projection.rs | 24 +-- src/mito2/src/sst/index.rs | 8 + .../src/sst/index/bloom_filter/creator.rs | 82 ++++++++ .../src/sst/index/fulltext_index/creator.rs | 68 ++++++- src/mito2/src/sst/index/indexer/update.rs | 92 +++++++++ .../src/sst/index/inverted_index/creator.rs | 97 +++++++++ src/mito2/src/sst/parquet.rs | 154 ++++++++++++++- src/mito2/src/sst/parquet/file_range.rs | 9 +- src/mito2/src/sst/parquet/writer.rs | 187 +++++++++++++++++- 11 files changed, 815 insertions(+), 39 deletions(-) diff --git a/src/mito-codec/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs index 6bd1bdde8f..07ca67260f 100644 --- a/src/mito-codec/src/row_converter/dense.rs +++ b/src/mito-codec/src/row_converter/dense.rs @@ -59,6 +59,15 @@ impl SortField { pub fn estimated_size(&self) -> usize { match &self.data_type { + ConcreteDataType::Dictionary(dict_type) => { + Self::estimated_size_by_type(dict_type.value_type()) + } + data_type => Self::estimated_size_by_type(data_type), + } + } + + fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize { + match data_type { ConcreteDataType::Boolean(_) => 2, ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, @@ -88,16 +97,29 @@ impl SortField { &self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef, + ) -> Result<()> { + match self.data_type() { + ConcreteDataType::Dictionary(dict_type) => { + Self::serialize_by_type(dict_type.value_type(), serializer, value) + } + data_type => Self::serialize_by_type(data_type, serializer, value), + } + } + + fn serialize_by_type( + data_type: &ConcreteDataType, + serializer: &mut Serializer<&mut Vec>, + value: &ValueRef, ) -> Result<()> { macro_rules! cast_value_and_serialize { ( - $self: ident; + $data_type: ident; $serializer: ident; $( $ty: ident, $f: ident ),* ) => { - match &$self.data_type { + match $data_type { $( ConcreteDataType::$ty(_) => { paste!{ @@ -139,13 +161,13 @@ impl SortField { ConcreteDataType::Dictionary(_) | ConcreteDataType::Null(_) => { return error::NotSupportedFieldSnafu { - data_type: $self.data_type.clone() + data_type: $data_type.clone() }.fail() } } }; } - cast_value_and_serialize!(self; serializer; + cast_value_and_serialize!(data_type; serializer; Boolean, boolean, Binary, binary, Int8, i8, @@ -172,16 +194,28 @@ impl SortField { /// Deserialize a value from the deserializer. pub fn deserialize(&self, deserializer: &mut Deserializer) -> Result { + match &self.data_type { + ConcreteDataType::Dictionary(dict_type) => { + Self::deserialize_by_type(dict_type.value_type(), deserializer) + } + data_type => Self::deserialize_by_type(data_type, deserializer), + } + } + + fn deserialize_by_type( + data_type: &ConcreteDataType, + deserializer: &mut Deserializer, + ) -> Result { macro_rules! deserialize_and_build_value { ( - $self: ident; + $data_type: ident; $serializer: ident; $( $ty: ident, $f: ident ),* ) => { - match &$self.data_type { + match $data_type { $( ConcreteDataType::$ty(_) => { Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) @@ -235,7 +269,7 @@ impl SortField { } }; } - deserialize_and_build_value!(self; deserializer; + deserialize_and_build_value!(data_type; deserializer; Boolean, bool, Int8, i8, Int16, i16, @@ -267,7 +301,20 @@ impl SortField { return Ok(1); } - let to_skip = match &self.data_type { + match &self.data_type { + ConcreteDataType::Dictionary(dict_type) => { + Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer) + } + data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer), + } + } + + fn skip_deserialize_by_type( + data_type: &ConcreteDataType, + bytes: &[u8], + deserializer: &mut Deserializer<&[u8]>, + ) -> Result { + let to_skip = match data_type { ConcreteDataType::Boolean(_) => 2, ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, @@ -629,6 +676,51 @@ mod tests { ) } + #[test] + fn test_memcmp_dictionary() { + // Test Dictionary + check_encode_and_decode( + &[ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + )], + vec![Value::String("hello".into())], + ); + + // Test Dictionary + check_encode_and_decode( + &[ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + )], + vec![Value::Int64(42)], + ); + + // Test Dictionary with null value + check_encode_and_decode( + &[ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + )], + vec![Value::Null], + ); + + // Test multiple Dictionary columns + check_encode_and_decode( + &[ + ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + ), + ConcreteDataType::dictionary_datatype( + ConcreteDataType::int16_datatype(), + ConcreteDataType::int64_datatype(), + ), + ], + vec![Value::String("world".into()), Value::Int64(123)], + ); + } + #[test] fn test_encode_multiple_rows() { check_encode_and_decode( @@ -691,6 +783,10 @@ mod tests { ConcreteDataType::interval_month_day_nano_datatype(), ConcreteDataType::decimal128_default_datatype(), ConcreteDataType::vector_datatype(3), + ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + ), ], vec![ Value::Boolean(true), @@ -715,6 +811,7 @@ mod tests { Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)), Value::Decimal128(Decimal128::from(16)), Value::Binary(Bytes::from(vec![0; 12])), + Value::String("dict_value".into()), ], ); } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index bb79ea7503..01f07692c7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -66,7 +66,7 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, Result, }; -use crate::memtable::BoxedBatchIterator; +use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator}; use crate::read::prune::PruneReader; /// Storage internal representation of a batch of rows for a primary key (time series). @@ -994,6 +994,24 @@ impl Source { } } +/// Async [RecordBatch] reader and iterator wrapper for flat format. +pub enum FlatSource { + /// Source from a [BoxedRecordBatchIterator]. + Iter(BoxedRecordBatchIterator), + /// Source from a [BoxedRecordBatchStream]. + Stream(BoxedRecordBatchStream), +} + +impl FlatSource { + /// Returns next [RecordBatch] from this data source. + pub async fn next_batch(&mut self) -> Result> { + match self { + FlatSource::Iter(iter) => iter.next().transpose(), + FlatSource::Stream(stream) => stream.try_next().await, + } + } +} + /// Async batch reader. /// /// The reader must guarantee [Batch]es returned by it have the same schema. diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 86d7014521..0716a041d3 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -89,6 +89,18 @@ impl FlatProjectionMapper { column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); } + // Creates a map to lookup index. + let id_to_index = sst_column_id_indices(metadata); + // TODO(yingwen): Support different flat schema options. + let format_projection = FormatProjection::compute_format_projection( + &id_to_index, + // All columns with internal columns. + metadata.column_metadatas.len() + 3, + column_ids.iter().copied(), + ); + + let batch_schema = flat_projected_columns(metadata, &format_projection); + if is_empty_projection { // If projection is empty, we don't output any column. return Ok(FlatProjectionMapper { @@ -104,16 +116,6 @@ impl FlatProjectionMapper { // Safety: Columns come from existing schema. let output_schema = Arc::new(Schema::new(column_schemas)); - // Creates a map to lookup index. - let id_to_index = sst_column_id_indices(metadata); - // TODO(yingwen): Support different flat schema options. - let format_projection = FormatProjection::compute_format_projection( - &id_to_index, - // All columns with internal columns. - metadata.column_metadatas.len() + 3, - column_ids.iter().copied(), - ); - let batch_indices: Vec<_> = column_ids .iter() .map(|id| { @@ -126,8 +128,6 @@ impl FlatProjectionMapper { }) .collect(); - let batch_schema = flat_projected_columns(metadata, &format_projection); - Ok(FlatProjectionMapper { metadata: metadata.clone(), output_schema, diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 9cb61e8bb0..0913161c95 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -25,6 +25,7 @@ use std::num::NonZeroUsize; use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, warn}; +use datatypes::arrow::record_batch::RecordBatch; use puffin_manager::SstPuffinManager; use smallvec::SmallVec; use statistics::{ByteCount, RowCount}; @@ -120,6 +121,13 @@ impl Indexer { self.flush_mem_metrics(); } + /// Updates the index with the given flat format RecordBatch. + pub async fn update_flat(&mut self, batch: &RecordBatch) { + self.do_update_flat(batch).await; + + self.flush_mem_metrics(); + } + /// Finalizes the index creation. pub async fn finish(&mut self) -> IndexOutput { let output = self.do_finish().await; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 52e68c9610..1b0f40114f 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -17,7 +17,9 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::{debug, warn}; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SkippingIndexType; +use datatypes::vectors::Helper; use index::bloom_filter::creator::BloomFilterCreator; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; use mito_codec::row_converter::SortField; @@ -63,6 +65,9 @@ pub struct BloomFilterIndexer { /// The global memory usage. global_memory_usage: Arc, + + /// Region metadata for column lookups. + metadata: RegionMetadataRef, } impl BloomFilterIndexer { @@ -120,6 +125,7 @@ impl BloomFilterIndexer { aborted: false, stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX), global_memory_usage, + metadata: metadata.clone(), }; Ok(Some(indexer)) } @@ -150,6 +156,29 @@ impl BloomFilterIndexer { Ok(()) } + /// Updates the bloom filter index with the given flat format RecordBatch. + pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() || batch.num_rows() == 0 { + return Ok(()); + } + + if let Err(update_err) = self.do_update_flat(batch).await { + // clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err:?}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + /// Finishes index creation and cleans up garbage. /// Returns the number of rows and bytes written. /// @@ -254,6 +283,59 @@ impl BloomFilterIndexer { Ok(()) } + async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + for (col_id, creator) in &mut self.creators { + // Get the column name from metadata + if let Some(column_meta) = self.metadata.column_by_id(*col_id) { + let column_name = &column_meta.column_schema.name; + + // Find the column in the RecordBatch by name + if let Some(column_array) = batch.column_by_name(column_name) { + // Convert Arrow array to VectorRef + let vector = Helper::try_into_vector(column_array.clone()) + .context(crate::error::ConvertVectorSnafu)?; + let sort_field = SortField::new(vector.data_type()); + + for i in 0..n { + let value = vector.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf) + .context(EncodeSnafu)?; + Ok(buf) + }) + .transpose()?; + + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + } else { + debug!( + "Column {} not found in the batch during building bloom filter index", + column_name + ); + // Push empty elements to maintain alignment + for _ in 0..n { + creator + .push_row_elems(None) + .await + .context(PushBloomFilterValueSnafu)?; + } + } + } + } + + Ok(()) + } + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { let mut guard = self.stats.record_finish(); diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index bc98b162a6..967e0d9dce 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -17,6 +17,9 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::warn; +use datatypes::arrow::array::{Array, StringArray}; +use datatypes::arrow::datatypes::DataType; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::{FulltextAnalyzer, FulltextBackend}; use index::fulltext_index::create::{ BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator, @@ -29,8 +32,9 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ - CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu, - FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result, + CastVectorSnafu, ComputeArrowSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, + FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, + Result, }; use crate::read::Batch; use crate::sst::file::FileId; @@ -119,6 +123,7 @@ impl FulltextIndexer { column_id, SingleCreator { column_id, + column_name: column.column_schema.name.clone(), inner, compress, }, @@ -150,6 +155,28 @@ impl FulltextIndexer { Ok(()) } + /// Updates the fulltext index with the given flat format RecordBatch. + pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if batch.num_rows() == 0 { + return Ok(()); + } + + if let Err(update_err) = self.do_update_flat(batch).await { + if let Err(err) = self.do_abort().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to abort index creator, err: {err}"); + } else { + warn!(err; "Failed to abort index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + /// Finalizes the index creation. pub async fn finish( &mut self, @@ -204,6 +231,17 @@ impl FulltextIndexer { Ok(()) } + async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + let mut guard = self.stats.record_update(); + guard.inc_row_count(batch.num_rows()); + + for creator in self.creators.values_mut() { + creator.update_flat(batch).await?; + } + + Ok(()) + } + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { let mut guard = self.stats.record_finish(); @@ -233,6 +271,8 @@ impl FulltextIndexer { struct SingleCreator { /// Column ID. column_id: ColumnId, + /// Column name. + column_name: String, /// Inner creator. inner: AltFulltextCreator, /// Whether the index should be compressed. @@ -277,6 +317,30 @@ impl SingleCreator { Ok(()) } + async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + // Find the column in the RecordBatch by name + if let Some(column_array) = batch.column_by_name(&self.column_name) { + // Convert Arrow array to string array. + // TODO(yingwen): Use Utf8View later if possible. + let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8) + .context(ComputeArrowSnafu)?; + let string_array = array.as_any().downcast_ref::().unwrap(); + for text_opt in string_array.iter() { + let text = text_opt.unwrap_or_default(); + self.inner.push_text(text).await?; + } + } else { + // If the column is not found in the batch, push empty text. + // Ensure that the number of texts pushed is the same as the number of rows in the SST, + // so that the texts are aligned with the row ids. + for _ in 0..batch.num_rows() { + self.inner.push_text("").await?; + } + } + + Ok(()) + } + async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result { let options = PutOptions { compression: self.compress.then_some(CompressionCodec::Zstd), diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index 42306673be..b6808ea412 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_telemetry::warn; +use datatypes::arrow::record_batch::RecordBatch; use crate::read::Batch; use crate::sst::index::Indexer; @@ -108,4 +109,95 @@ impl Indexer { false } + + pub(crate) async fn do_update_flat(&mut self, batch: &RecordBatch) { + if batch.num_rows() == 0 { + return; + } + + if !self.do_update_flat_inverted_index(batch).await { + self.do_abort().await; + } + if !self.do_update_flat_fulltext_index(batch).await { + self.do_abort().await; + } + if !self.do_update_flat_bloom_filter(batch).await { + self.do_abort().await; + } + } + + /// Returns false if the update failed. + async fn do_update_flat_inverted_index(&mut self, batch: &RecordBatch) -> bool { + let Some(creator) = self.inverted_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update_flat(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update inverted index with flat format, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update inverted index with flat format, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + + /// Returns false if the update failed. + async fn do_update_flat_fulltext_index(&mut self, batch: &RecordBatch) -> bool { + let Some(creator) = self.fulltext_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update_flat(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update full-text index with flat format, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update full-text index with flat format, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + + /// Returns false if the update failed. + async fn do_update_flat_bloom_filter(&mut self, batch: &RecordBatch) -> bool { + let Some(creator) = self.bloom_filter_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update_flat(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update bloom filter with flat format, region_id: {}, file_id: {}, err: {:?}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update bloom filter with flat format, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 933ac18bb4..43dc3d6801 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -18,6 +18,8 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::{debug, warn}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::vectors::Helper; use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; @@ -73,6 +75,12 @@ pub struct InvertedIndexer { /// Ids of indexed columns and their names (`to_string` of the column id). indexed_column_ids: Vec<(ColumnId, String)>, + + /// Region metadata for column lookups. + metadata: RegionMetadataRef, + /// Cache for mapping indexed column positions to their indices in the RecordBatch. + /// Aligns with indexed_column_ids. Initialized lazily when first batch is processed. + column_index_cache: Option>>, } impl InvertedIndexer { @@ -121,6 +129,8 @@ impl InvertedIndexer { aborted: false, memory_usage, indexed_column_ids, + metadata: metadata.clone(), + column_index_cache: None, } } @@ -148,6 +158,93 @@ impl InvertedIndexer { Ok(()) } + /// Updates the inverted index with the given flat format RecordBatch. + pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if batch.num_rows() == 0 { + return Ok(()); + } + + self.do_update_flat(batch).await + } + + async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> { + // Initialize column index cache if not already done + if self.column_index_cache.is_none() { + self.initialize_column_index_cache(batch); + } + + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + let column_indices = self.column_index_cache.as_ref().unwrap(); + + for ((col_id, col_id_str), &column_index) in + self.indexed_column_ids.iter().zip(column_indices.iter()) + { + if let Some(index) = column_index { + let column_array = batch.column(index); + // Convert Arrow array to VectorRef using Helper + let vector = Helper::try_into_vector(column_array.clone()) + .context(crate::error::ConvertVectorSnafu)?; + let sort_field = SortField::new(vector.data_type()); + + for row in 0..n { + self.value_buf.clear(); + let value_ref = vector.get_ref(row); + + if value_ref.is_null() { + self.index_creator + .push_with_name(col_id_str, None) + .await + .context(PushIndexValueSnafu)?; + } else { + IndexValueCodec::encode_nonnull_value( + value_ref, + &sort_field, + &mut self.value_buf, + ) + .context(EncodeSnafu)?; + self.index_creator + .push_with_name(col_id_str, Some(&self.value_buf)) + .await + .context(PushIndexValueSnafu)?; + } + } + } else { + debug!( + "Column {} not found in the batch during building inverted index", + col_id + ); + } + } + + Ok(()) + } + + /// Initializes the column index cache by mapping indexed column ids to their positions in the RecordBatch. + fn initialize_column_index_cache(&mut self, batch: &RecordBatch) { + let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len()); + + for (col_id, _) in &self.indexed_column_ids { + let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) { + let column_name = &column_meta.column_schema.name; + batch + .schema() + .column_with_name(column_name) + .map(|(index, _)| index) + } else { + None + }; + column_indices.push(column_index); + } + + self.column_index_cache = Some(column_indices); + } + /// Finishes index creation and cleans up garbage. /// Returns the number of rows and bytes written. pub async fn finish( diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a2c42bff7b..d02882a5fb 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -90,12 +90,16 @@ mod tests { use std::collections::HashSet; use std::sync::Arc; + use api::v1::OpType; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{col, lit, BinaryExpr, Expr, Literal, Operator}; use datatypes::arrow; - use datatypes::arrow::array::{RecordBatch, UInt64Array}; - use datatypes::arrow::datatypes::{DataType, Field, Schema}; + use datatypes::arrow::array::{ + ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder, + TimestampMillisecondArray, UInt64Array, UInt8Array, + }; + use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; @@ -109,7 +113,7 @@ mod tests { FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType, }; use crate::cache::{CacheManager, CacheStrategy, PageKey}; - use crate::read::{BatchBuilder, BatchReader}; + use crate::read::{BatchBuilder, BatchReader, FlatSource}; use crate::region::options::{IndexOptions, InvertedIndexOptions}; use crate::sst::file::{FileHandle, FileMeta, RegionFileId}; use crate::sst::file_purger::NoopFilePurger; @@ -119,11 +123,13 @@ mod tests { use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; use crate::sst::parquet::writer::ParquetWriter; - use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; + use crate::sst::{ + location, to_flat_sst_arrow_schema, FlatSchemaOptions, DEFAULT_WRITE_CONCURRENCY, + }; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary, new_batch_with_custom_sequence, new_source, sst_file_handle, - sst_file_handle_with_file_id, sst_region_metadata, + new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source, + sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -969,6 +975,142 @@ mod tests { assert!(cached.contains_row_group(3)); } + /// Creates a flat format RecordBatch for testing. + /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format. + fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch { + assert!(end >= start); + let metadata = Arc::new(sst_region_metadata()); + let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + let num_rows = end - start; + let mut columns = Vec::new(); + + // Add primary key columns (tag_0, tag_1) as dictionary arrays + let mut tag_0_builder = StringDictionaryBuilder::::new(); + let mut tag_1_builder = StringDictionaryBuilder::::new(); + + for _ in 0..num_rows { + tag_0_builder.append_value(tags[0]); + tag_1_builder.append_value(tags[1]); + } + + columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef); + columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef); + + // Add field column (field_0) + let field_values: Vec = (start..end).map(|v| v as u64).collect(); + columns.push(Arc::new(UInt64Array::from(field_values))); + + // Add time index column (ts) + let timestamps: Vec = (start..end).map(|v| v as i64).collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps))); + + // Add encoded primary key column + let pk = new_primary_key(tags); + let mut pk_builder = BinaryDictionaryBuilder::::new(); + for _ in 0..num_rows { + pk_builder.append(&pk).unwrap(); + } + columns.push(Arc::new(pk_builder.finish())); + + // Add sequence column + columns.push(Arc::new(UInt64Array::from_value(1000, num_rows))); + + // Add op_type column + columns.push(Arc::new(UInt8Array::from_value( + OpType::Put as u8, + num_rows, + ))); + + RecordBatch::try_new(flat_schema, columns).unwrap() + } + + /// Creates a FlatSource from flat format RecordBatches. + fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { + FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) + } + + #[tokio::test] + async fn test_write_flat_with_index() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 50; + + // Create flat format RecordBatches + let flat_batches = vec![ + new_record_batch_by_range(&["a", "d"], 0, 20), + new_record_batch_by_range(&["b", "d"], 0, 20), + new_record_batch_by_range(&["c", "d"], 0, 20), + new_record_batch_by_range(&["c", "f"], 0, 40), + new_record_batch_by_range(&["c", "h"], 100, 200), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let puffin_manager = env + .get_puffin_manager() + .build(object_store.clone(), file_path.clone()); + let intermediate_manager = env.get_intermediate_manager(); + + let indexer_builder = IndexerBuilderImpl { + op_type: OperationType::Flush, + metadata: metadata.clone(), + row_group_size, + puffin_manager, + intermediate_manager, + index_options: IndexOptions { + inverted_index: InvertedIndexOptions { + segment_row_count: 1, + ..Default::default() + }, + }, + inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), + }; + + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + Metrics::new(WriteType::Flush), + ) + .await; + + let info = writer + .write_all_flat(flat_source, &write_opts) + .await + .unwrap() + .remove(0); + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + assert!(info.index_metadata.inverted_index.index_size > 0); + assert_eq!(info.index_metadata.inverted_index.row_count, 200); + assert_eq!(info.index_metadata.inverted_index.columns, vec![0]); + + assert!(info.index_metadata.bloom_filter.index_size > 0); + assert_eq!(info.index_metadata.bloom_filter.row_count, 200); + assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]); + + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + } + #[tokio::test] async fn test_read_with_override_sequence() { let mut env = TestEnv::new().await; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 6e3dcc585e..92c0ad2b74 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -29,8 +29,8 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::TimeSeriesRowSelector; use crate::error::{ - ComputeArrowSnafu, ConvertVectorSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, - RecordBatchSnafu, Result, StatsNotPresentSnafu, + ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, + Result, StatsNotPresentSnafu, }; use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; @@ -381,10 +381,7 @@ impl RangeBase { let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id()); if let Some(idx) = column_idx { let column = &input.columns()[idx]; - // Convert Arrow Array to Vector - let vector = datatypes::vectors::Helper::try_into_vector(column.clone()) - .context(ConvertVectorSnafu)?; - let result = filter.evaluate_vector(&vector).context(RecordBatchSnafu)?; + let result = filter.evaluate_array(column).context(RecordBatchSnafu)?; mask = mask.bitand(&result); } else { // Column not found in projection, continue diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index bec76cf128..28295777f9 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -24,7 +24,13 @@ use std::time::Instant; use common_telemetry::debug; use common_time::Timestamp; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use datatypes::arrow::compute::{max, min}; +use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit}; +use datatypes::arrow::record_batch::RecordBatch; use object_store::{FuturesAsyncWriter, ObjectStore}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -40,14 +46,17 @@ use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner}; -use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; -use crate::read::{Batch, Source}; +use crate::error::{ + InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu, +}; +use crate::read::{Batch, FlatSource, Source}; use crate::sst::file::{FileId, RegionFileId}; use crate::sst::index::{Indexer, IndexerBuilder}; +use crate::sst::parquet::flat_format::{time_index_column_index, FlatWriteFormat}; use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; -use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; +use crate::sst::{FlatSchemaOptions, DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// Parquet SST writer. pub struct ParquetWriter { @@ -270,6 +279,73 @@ where Ok(results) } + /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file. + /// + /// Returns the [SstInfo] if the SST is written. + pub async fn write_all_flat( + &mut self, + source: FlatSource, + opts: &WriteOptions, + ) -> Result { + let res = self.write_all_flat_without_cleaning(source, opts).await; + if res.is_err() { + // Clean tmp files explicitly on failure. + let file_id = self.current_file; + if let Some(cleaner) = &self.file_cleaner { + cleaner.clean_by_file_id(file_id).await; + } + } + res + } + + async fn write_all_flat_without_cleaning( + &mut self, + mut source: FlatSource, + opts: &WriteOptions, + ) -> Result { + let mut results = smallvec![]; + let flat_format = + FlatWriteFormat::new(self.metadata.clone(), &FlatSchemaOptions::default()) + .with_override_sequence(None); + let mut stats = SourceStats::default(); + + while let Some(record_batch) = self + .write_next_flat_batch(&mut source, &flat_format, opts) + .await + .transpose() + { + match record_batch { + Ok(batch) => { + stats.update_flat(&batch)?; + let start = Instant::now(); + // safety: self.current_indexer must be set when first batch has been written. + self.current_indexer + .as_mut() + .unwrap() + .update_flat(&batch) + .await; + self.metrics.update_index += start.elapsed(); + if let Some(max_file_size) = opts.max_file_size + && self.bytes_written.load(Ordering::Relaxed) > max_file_size + { + self.finish_current_file(&mut results, &mut stats).await?; + } + } + Err(e) => { + if let Some(indexer) = &mut self.current_indexer { + indexer.abort().await; + } + return Err(e); + } + } + } + + self.finish_current_file(&mut results, &mut stats).await?; + + // object_store.write will make sure all bytes are written or an error is raised. + Ok(results) + } + /// Customizes per-column config according to schema and maybe column cardinality. fn customize_column_config( builder: WriterPropertiesBuilder, @@ -313,6 +389,30 @@ where Ok(Some(batch)) } + async fn write_next_flat_batch( + &mut self, + source: &mut FlatSource, + flat_format: &FlatWriteFormat, + opts: &WriteOptions, + ) -> Result> { + let start = Instant::now(); + let Some(record_batch) = source.next_batch().await? else { + return Ok(None); + }; + self.metrics.iter_source += start.elapsed(); + + let arrow_batch = flat_format.convert_batch(&record_batch)?; + + let start = Instant::now(); + self.maybe_init_writer(flat_format.arrow_schema(), opts) + .await? + .write(&arrow_batch) + .await + .context(WriteParquetSnafu)?; + self.metrics.write_batch += start.elapsed(); + Ok(Some(record_batch)) + } + async fn maybe_init_writer( &mut self, schema: &SchemaRef, @@ -388,6 +488,85 @@ impl SourceStats { self.time_range = Some((min_in_batch, max_in_batch)); } } + + fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> { + if record_batch.num_rows() == 0 { + return Ok(()); + } + + self.num_rows += record_batch.num_rows(); + + // Get the timestamp column by index + let time_index_col_idx = time_index_column_index(record_batch.num_columns()); + let timestamp_array = record_batch.column(time_index_col_idx); + + if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? { + if let Some(time_range) = &mut self.time_range { + time_range.0 = time_range.0.min(min_in_batch); + time_range.1 = time_range.1.max(max_in_batch); + } else { + self.time_range = Some((min_in_batch, max_in_batch)); + } + } + + Ok(()) + } +} + +/// Gets min and max timestamp from an timestamp array. +fn timestamp_range_from_array( + timestamp_array: &ArrayRef, +) -> Result> { + let (min_ts, max_ts) = match timestamp_array.data_type() { + DataType::Timestamp(TimeUnit::Second, _) => { + let array = timestamp_array + .as_any() + .downcast_ref::() + .unwrap(); + let min_val = min(array).map(Timestamp::new_second); + let max_val = max(array).map(Timestamp::new_second); + (min_val, max_val) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let array = timestamp_array + .as_any() + .downcast_ref::() + .unwrap(); + let min_val = min(array).map(Timestamp::new_millisecond); + let max_val = max(array).map(Timestamp::new_millisecond); + (min_val, max_val) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let array = timestamp_array + .as_any() + .downcast_ref::() + .unwrap(); + let min_val = min(array).map(Timestamp::new_microsecond); + let max_val = max(array).map(Timestamp::new_microsecond); + (min_val, max_val) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let array = timestamp_array + .as_any() + .downcast_ref::() + .unwrap(); + let min_val = min(array).map(Timestamp::new_nanosecond); + let max_val = max(array).map(Timestamp::new_nanosecond); + (min_val, max_val) + } + _ => { + return UnexpectedSnafu { + reason: format!( + "Unexpected data type of time index: {:?}", + timestamp_array.data_type() + ), + } + .fail() + } + }; + + // If min timestamp exists, max timestamp should also exist. + Ok(min_ts.zip(max_ts)) } /// Workaround for [AsyncArrowWriter] does not provide a method to