diff --git a/Cargo.lock b/Cargo.lock index 2885e3e5bd..262457400b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8069,6 +8069,19 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "parquet_opendal" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4140ae96f37c170f8d684a544711fabdac1d94adcbd97e8b033329bd37f40446" +dependencies = [ + "async-trait", + "bytes", + "futures", + "opendal", + "parquet", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -11207,11 +11220,20 @@ dependencies = [ name = "sst-convert" version = "0.13.0" dependencies = [ + "api", + "arrow-array", + "async-trait", "catalog", + "common-error", "common-meta", + "common-recordbatch", + "datatypes", + "futures-util", "meta-client", "mito2", "object-store", + "parquet", + "parquet_opendal", "snafu 0.8.5", "store-api", "table", diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 86b310e1ac..f4b32a4dfb 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -42,6 +42,13 @@ use crate::worker::WorkerId; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("External error, context: {}", context))] + External { + source: BoxedError, + context: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Failed to encode sparse primary key, reason: {}", reason))] EncodeSparsePrimaryKey { reason: String, @@ -1085,7 +1092,7 @@ impl ErrorExt for Error { | PuffinPurgeStager { source, .. } => source.status_code(), CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, - StaleLogEntry { .. } => StatusCode::Unexpected, + StaleLogEntry { .. } | External { .. } => StatusCode::Unexpected, FilterRecordBatch { source, .. } => source.status_code(), diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index 73c5a6f03c..c5d1673308 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -5,11 +5,20 @@ edition.workspace = true license.workspace = true [dependencies] +api.workspace = true +arrow-array.workspace = true +async-trait.workspace = true catalog.workspace = true +common-error.workspace = true common-meta.workspace = true +common-recordbatch.workspace = true +datatypes.workspace = true +futures-util.workspace = true meta-client.workspace = true mito2.workspace = true object-store.workspace = true +parquet.workspace = true +parquet_opendal = "0.3.0" snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/sst-convert/src/lib.rs b/src/sst-convert/src/lib.rs index 11cb656ec7..e4ccd8195f 100644 --- a/src/sst-convert/src/lib.rs +++ b/src/sst-convert/src/lib.rs @@ -15,3 +15,5 @@ mod reader; mod table; pub mod writer; + +pub use reader::parquet::{OpenDALParquetReader, RawParquetReader}; diff --git a/src/sst-convert/src/reader.rs b/src/sst-convert/src/reader.rs index 0e7ff5039e..4f24fd6d3b 100644 --- a/src/sst-convert/src/reader.rs +++ b/src/sst-convert/src/reader.rs @@ -14,5 +14,5 @@ //! Reader to read input data in different formats. -mod parquet; +pub(crate) mod parquet; mod remote_write; diff --git a/src/sst-convert/src/reader/parquet.rs b/src/sst-convert/src/reader/parquet.rs index 56102aec7d..b9aa6f16a5 100644 --- a/src/sst-convert/src/reader/parquet.rs +++ b/src/sst-convert/src/reader/parquet.rs @@ -13,3 +13,295 @@ // limitations under the License. //! Parquet file format support. + +use std::collections::{HashMap, VecDeque}; +use std::pin::Pin; +use std::sync::Arc; + +use api::v1::OpType; +use common_error::ext::BoxedError; +use common_error::snafu::{OptionExt, ResultExt}; +use common_recordbatch::error::UnsupportedOperationSnafu; +use common_recordbatch::RecordBatch; +use datatypes::prelude::DataType; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::schema::Schema; +use datatypes::value::Value; +use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder}; +use futures_util::StreamExt; +use mito2::read::{Batch, BatchColumn, BatchReader}; +use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec}; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet_opendal::AsyncReader; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::{ColumnId, SequenceNumber}; + +pub struct OpenDALParquetReader { + inner: RawParquetReader, +} + +impl OpenDALParquetReader { + pub async fn new( + operator: ObjectStore, + path: &str, + metadata: RegionMetadataRef, + override_sequence: Option, + ) -> Result { + let reader = operator.reader_with(path).await.unwrap(); + + let content_len = operator.stat(path).await.unwrap().content_length(); + + let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); + let stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + Ok(Self { + inner: RawParquetReader::new(stream, metadata, override_sequence, path), + }) + } +} + +#[async_trait::async_trait] +impl BatchReader for OpenDALParquetReader { + async fn next_batch(&mut self) -> mito2::error::Result> { + self.inner.next_batch().await + } +} + +pub struct RawParquetReader { + metadata: RegionMetadataRef, + override_sequence: Option, + output_batch_queue: VecDeque, + stream: Pin>>, + path: String, +} + +impl RawParquetReader { + pub fn new( + stream: ParquetRecordBatchStream, + metadata: RegionMetadataRef, + override_sequence: Option, + path: &str, + ) -> Self { + Self { + stream: Box::pin(stream), + metadata, + override_sequence, + output_batch_queue: VecDeque::new(), + path: path.to_string(), + } + } + + pub async fn next_batch_inner(&mut self) -> mito2::error::Result> { + if let Some(batch) = self.output_batch_queue.pop_front() { + return Ok(Some(batch)); + } + let Some(next_input_rb) = self.stream.next().await.transpose().with_context(|_| { + mito2::error::ReadParquetSnafu { + path: self.path.clone(), + } + })? + else { + return Ok(None); + }; + + let schema = Arc::new( + Schema::try_from(next_input_rb.schema()) + .map_err(BoxedError::new) + .with_context(|_| mito2::error::ExternalSnafu { + context: format!( + "Failed to convert Schema from DfSchema: {:?}", + next_input_rb.schema() + ), + })?, + ); + let rb = RecordBatch::try_from_df_record_batch(schema, next_input_rb) + .map_err(BoxedError::new) + .with_context(|_| mito2::error::ExternalSnafu { + context: "Failed to convert RecordBatch from DfRecordBatch".to_string(), + })?; + let new_batches = extract_to_batches(&rb, &self.metadata, self.override_sequence) + .map_err(BoxedError::new) + .with_context(|_| mito2::error::ExternalSnafu { + context: format!("Failed to extract batches from RecordBatch: {:?}", rb), + })?; + + self.output_batch_queue.extend(new_batches); + Ok(self.output_batch_queue.pop_front()) + } +} + +#[async_trait::async_trait] +impl BatchReader for RawParquetReader { + async fn next_batch(&mut self) -> mito2::error::Result> { + self.next_batch_inner().await + } +} + +pub fn extract_to_batches( + rb: &RecordBatch, + metadata: &RegionMetadataRef, + override_sequence: Option, +) -> Result, BoxedError> { + let pk_codec: Box = match metadata.primary_key_encoding { + PrimaryKeyEncoding::Dense => Box::new(DensePrimaryKeyCodec::new(metadata)), + PrimaryKeyEncoding::Sparse => Box::new(SparsePrimaryKeyCodec::new(metadata)), + }; + let pk_ids = metadata.primary_key.clone(); + let pk_names: Vec<_> = pk_ids + .iter() + .map(|id| { + metadata + .column_by_id(*id) + .unwrap() + .column_schema + .name + .clone() + }) + .collect(); + let pk_pos_in_rb: Vec<_> = pk_names + .into_iter() + .map(|name| { + rb.schema + .column_index_by_name(&name) + .context(UnsupportedOperationSnafu { + reason: format!("Can't find column {} in rb={:?}", name, rb), + }) + .map_err(BoxedError::new) + }) + .collect::>()?; + + let mut pk_to_batchs: HashMap, SSTBatchBuilder> = HashMap::new(); + let mut buffer = Vec::new(); + + for row in rb.rows() { + let pk_values: Vec<_> = pk_ids + .iter() + .zip(pk_pos_in_rb.iter()) + .map(|(id, pos)| (*id, row[*pos].clone())) + .collect(); + pk_codec + .encode_values(&pk_values, &mut buffer) + .map_err(BoxedError::new)?; + let cur_pk = &buffer; + let builder = if let Some(builder) = pk_to_batchs.get_mut(cur_pk) { + builder + } else { + let builder = + SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?; + pk_to_batchs.insert(cur_pk.clone(), builder); + pk_to_batchs.get_mut(cur_pk).unwrap() + }; + builder.push_row(&row).map_err(BoxedError::new)?; + } + + let mut batches = Vec::new(); + for (pk, builder) in pk_to_batchs { + batches.push(builder.finish(pk).map_err(BoxedError::new)?); + } + Ok(batches) +} + +struct SSTBatchBuilder { + /// for extract field column from record batch's row + field_column_pos: Vec, + field_ids: Vec, + field_builders: Vec>, + timestamp_pos: usize, + timestamp_builder: Box, + /// override sequence number + override_sequence: Option, + sequence_builder: UInt64VectorBuilder, + op_type_builder: UInt8VectorBuilder, + cur_seq: SequenceNumber, +} + +impl SSTBatchBuilder { + fn finish(mut self, pk: Vec) -> Result { + let fields: Vec<_> = self + .field_ids + .iter() + .zip(self.field_builders) + .map(|(id, mut b)| BatchColumn { + column_id: *id, + data: b.to_vector(), + }) + .collect(); + Batch::new( + pk, + self.timestamp_builder.to_vector(), + Arc::new(self.sequence_builder.finish()), + Arc::new(self.op_type_builder.finish()), + fields, + ) + .map_err(BoxedError::new) + } + + fn push_row(&mut self, row: &[Value]) -> Result<(), BoxedError> { + for (field_pos, field_builder) in self + .field_column_pos + .iter() + .zip(self.field_builders.iter_mut()) + { + field_builder.push_value_ref(row[*field_pos].as_value_ref()); + } + self.timestamp_builder + .push_value_ref(row[self.timestamp_pos].as_value_ref()); + self.sequence_builder + .push(Some(self.override_sequence.unwrap_or(self.cur_seq))); + self.op_type_builder.push(Some(OpType::Put as u8)); + self.cur_seq += 1; + Ok(()) + } + + fn new( + rb: &RecordBatch, + metadata: &RegionMetadataRef, + override_sequence: Option, + ) -> Result { + let timeindex_name = &metadata.time_index_column().column_schema.name; + Ok(Self { + field_ids: metadata.field_columns().map(|c| c.column_id).collect(), + field_column_pos: metadata + .field_columns() + .map(|c| &c.column_schema.name) + .map(|name| { + rb.schema + .column_index_by_name(name) + .context(UnsupportedOperationSnafu { + reason: format!("Can't find column {} in rb={:?}", name, rb), + }) + .map_err(BoxedError::new) + }) + .collect::>()?, + field_builders: metadata + .field_columns() + .map(|c| c.column_schema.data_type.create_mutable_vector(512)) + .collect(), + + timestamp_pos: rb + .schema + .column_index_by_name(timeindex_name) + .context(UnsupportedOperationSnafu { + reason: format!("{} in rb={:?}", timeindex_name, rb), + }) + .map_err(BoxedError::new)?, + timestamp_builder: metadata + .time_index_column() + .column_schema + .data_type + .create_mutable_vector(512), + + override_sequence, + sequence_builder: UInt64VectorBuilder::with_capacity(512), + + op_type_builder: UInt8VectorBuilder::with_capacity(512), + cur_seq: override_sequence.unwrap_or_default(), + }) + } +}