From 5f65e3ff442e1efb31e15945cb7cafc00f30580c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 24 Jul 2023 18:35:21 +0900 Subject: [PATCH] feat(mito): Port parquet writer and reader to mito2 (#2018) * feat(mito): Port Batch and BufferedWriter * feat: encode metadata to parquet * feat: define BatchReader trait * chore: ParquetWriter write_all takes `&mut self` * feat(mito): port ParquetReader * chore: fix typo * chore: address CR comment --- Cargo.lock | 2 + src/mito2/Cargo.toml | 2 + src/mito2/src/error.rs | 53 ++++++++- src/mito2/src/lib.rs | 3 +- src/mito2/src/metadata.rs | 9 +- src/mito2/src/read.rs | 140 +++++++++++++++++++++++ src/mito2/src/sst.rs | 2 + src/mito2/src/sst/file.rs | 15 ++- src/mito2/src/sst/parquet.rs | 44 +++++++ src/mito2/src/sst/parquet/reader.rs | 171 ++++++++++++++++++++++++++++ src/mito2/src/sst/parquet/writer.rs | 104 +++++++++++++++++ src/mito2/src/sst/stream_writer.rs | 120 +++++++++++++++++++ src/object-store/src/util.rs | 25 +++- 13 files changed, 684 insertions(+), 6 deletions(-) create mode 100644 src/mito2/src/read.rs create mode 100644 src/mito2/src/sst/parquet.rs create mode 100644 src/mito2/src/sst/parquet/reader.rs create mode 100644 src/mito2/src/sst/parquet/writer.rs create mode 100644 src/mito2/src/sst/stream_writer.rs diff --git a/Cargo.lock b/Cargo.lock index c6674f060a..7a7bc5d95b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5511,6 +5511,7 @@ dependencies = [ "anymap", "aquamarine", "arc-swap", + "async-compat", "async-stream", "async-trait", "chrono", @@ -5536,6 +5537,7 @@ dependencies = [ "log-store", "metrics", "object-store", + "parquet", "regex", "serde", "serde_json", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 86e181ab11..f6f54dd731 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -12,6 +12,7 @@ test = ["common-test-util"] aquamarine = "0.3" anymap = "1.0.0-beta.2" arc-swap = "1.0" +async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" chrono.workspace = true @@ -36,6 +37,7 @@ lazy_static = "1.4" log-store = { path = "../log-store" } metrics.workspace = true object-store = { path = "../object-store" } +parquet = { workspace = true, features = ["async"] } regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 16136ef377..6c52e3d1e1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_datasource::compression::CompressionType; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use datatypes::arrow::error::ArrowError; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; @@ -117,6 +118,50 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display( + "Failed to create RecordBatch from vectors, location: {}, source: {}", + location, + source + ))] + NewRecordBatch { + location: Location, + source: ArrowError, + }, + + #[snafu(display( + "Failed to write to buffer, location: {}, source: {}", + location, + source + ))] + WriteBuffer { + location: Location, + source: common_datasource::error::Error, + }, + + #[snafu(display( + "Failed to write parquet file, path: {}, location: {}, source: {}", + path, + location, + source + ))] + WriteParquet { + path: String, + location: Location, + source: parquet::errors::ParquetError, + }, + + #[snafu(display( + "Failed to read parquet file, path: {}, location: {}, source: {}", + path, + location, + source + ))] + ReadParquet { + path: String, + source: parquet::errors::ParquetError, + location: Location, + }, } pub type Result = std::result::Result; @@ -126,12 +171,15 @@ impl ErrorExt for Error { use Error::*; match self { - OpenDal { .. } => StatusCode::StorageUnavailable, + OpenDal { .. } | WriteParquet { .. } | ReadParquet { .. } => { + StatusCode::StorageUnavailable + } CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } - | RegionExists { .. } => StatusCode::Unexpected, + | RegionExists { .. } + | NewRecordBatch { .. } => StatusCode::Unexpected, InvalidScanIndex { .. } | InitialMetadata { .. } | InvalidMeta { .. } @@ -139,6 +187,7 @@ impl ErrorExt for Error { RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } + WriteBuffer { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index a11077c59c..0d378a323c 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -31,10 +31,11 @@ pub mod manifest; pub mod memtable; #[allow(dead_code)] pub mod metadata; +pub mod read; #[allow(dead_code)] mod region; #[allow(dead_code)] -pub(crate) mod sst; +pub mod sst; #[allow(dead_code)] mod worker; diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 00c12c9f38..0b7162fee3 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, RegionId}; -use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result}; +use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result, SerdeJsonSnafu}; use crate::region::VersionNumber; /// Initial version number of a new region. @@ -114,6 +114,13 @@ impl<'de> Deserialize<'de> for RegionMetadata { } } +impl RegionMetadata { + /// Encode the metadata to a JSON string. + pub fn to_json(&self) -> Result { + serde_json::to_string(&self).context(SerdeJsonSnafu) + } +} + /// Fields skipped in serialization. struct SkippedFields { /// Last schema. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs new file mode 100644 index 0000000000..90b6bf1947 --- /dev/null +++ b/src/mito2/src/read.rs @@ -0,0 +1,140 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common structs and utilities for reading data. + +use async_trait::async_trait; +use common_time::Timestamp; +use datatypes::vectors::VectorRef; + +use crate::error::Result; +use crate::metadata::RegionMetadataRef; + +/// Storage internal representation of a batch of rows. +/// +/// Now the structure of [Batch] is still unstable, all pub fields may be changed. +#[derive(Debug, Default, PartialEq, Eq, Clone)] +pub struct Batch { + /// Rows organized in columnar format. + pub columns: Vec, +} + +impl Batch { + /// Create a new `Batch` from `columns`. + /// + /// # Panics + /// Panics if vectors in `columns` have different length. + pub fn new(columns: Vec) -> Batch { + Self::assert_columns(&columns); + + Batch { columns } + } + + /// Returns number of columns in the batch. + pub fn num_columns(&self) -> usize { + self.columns.len() + } + + /// Returns number of rows in the batch. + pub fn num_rows(&self) -> usize { + self.columns.get(0).map(|v| v.len()).unwrap_or(0) + } + + /// Returns true if the number of rows in the batch is 0. + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } + + /// Slice the batch, returning a new batch. + /// + /// # Panics + /// Panics if `offset + length > self.num_rows()`. + pub fn slice(&self, offset: usize, length: usize) -> Batch { + let columns = self + .columns + .iter() + .map(|v| v.slice(offset, length)) + .collect(); + Batch { columns } + } + + fn assert_columns(columns: &[VectorRef]) { + if columns.is_empty() { + return; + } + + let length = columns[0].len(); + assert!(columns.iter().all(|col| col.len() == length)); + } +} + +/// Collected [Source] statistics. +#[derive(Debug, Clone)] +pub struct SourceStats { + /// Number of rows fetched. + pub num_rows: usize, + /// Min timestamp from fetched batches. + /// + /// If no rows fetched, the value of the timestamp is i64::MIN. + pub min_timestamp: Timestamp, + /// Max timestamp from fetched batches. + /// + /// If no rows fetched, the value of the timestamp is i64::MAX. + pub max_timestamp: Timestamp, +} + +/// Async [Batch] reader and iterator wrapper. +/// +/// This is the data source for SST writers or internal readers. +pub enum Source {} + +impl Source { + /// Returns next [Batch] from this data source. + pub(crate) async fn next_batch(&mut self) -> Result> { + unimplemented!() + } + + /// Returns the metadata of the source region. + pub(crate) fn metadata(&self) -> RegionMetadataRef { + unimplemented!() + } + + /// Returns statisics of fetched batches. + pub(crate) fn stats(&self) -> SourceStats { + unimplemented!() + } +} + +/// Async batch reader. +#[async_trait] +pub trait BatchReader: Send { + /// Fetch next [Batch]. + /// + /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` + /// again won't return batch again. + /// + /// If `Err` is returned, caller should not call this method again, the implementor + /// may or may not panic in such case. + async fn next_batch(&mut self) -> Result>; +} + +/// Pointer to [BatchReader]. +pub type BoxedBatchReader = Box; + +#[async_trait::async_trait] +impl BatchReader for Box { + async fn next_batch(&mut self) -> Result> { + (**self).next_batch().await + } +} diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index e0527b787c..bb31fdc3dd 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -15,4 +15,6 @@ //! Sorted strings tables. pub mod file; +pub mod parquet; +mod stream_writer; pub(crate) mod version; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 43655c1a9a..8ebba3a2f9 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -20,6 +20,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use common_time::Timestamp; +use object_store::util::join_path; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use store_api::storage::RegionId; @@ -52,7 +53,7 @@ impl FileId { /// Append `.parquet` to file id to make a complete file name pub fn as_parquet(&self) -> String { - format!("{}{}", self.0.hyphenated(), ".parquet") + format!("{}{}", self, ".parquet") } } @@ -109,6 +110,18 @@ impl fmt::Debug for FileHandle { } } +impl FileHandle { + /// Returns the file id. + pub fn file_id(&self) -> FileId { + self.inner.meta.file_id + } + + /// Returns the complete file path of the file. + pub fn file_path(&self, file_dir: &str) -> String { + join_path(file_dir, &self.file_id().as_parquet()) + } +} + /// Inner data of [FileHandle]. /// /// Contains meta of the file, and other mutable info like whether the file is compacting. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs new file mode 100644 index 0000000000..c7de27199a --- /dev/null +++ b/src/mito2/src/sst/parquet.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! SST in parquet format. + +mod reader; +mod writer; + +use common_base::readable_size::ReadableSize; + +use crate::sst::file::FileTimeRange; + +/// Key of metadata in parquet SST. +pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; + +/// Parquet write options. +#[derive(Debug)] +pub struct WriteOptions { + /// Buffer size for async writer. + pub write_buffer_size: ReadableSize, + /// Row group size. + pub row_group_size: usize, +} + +/// Parquet SST info returned by the writer. +pub struct SstInfo { + /// Time range of the SST. + pub time_range: FileTimeRange, + /// File size in bytes. + pub file_size: u64, + /// Number of rows. + pub num_rows: usize, +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs new file mode 100644 index 0000000000..c750f25162 --- /dev/null +++ b/src/mito2/src/sst/parquet/reader.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet reader. + +use async_compat::CompatExt; +use async_trait::async_trait; +use common_time::range::TimestampRange; +use datatypes::arrow::record_batch::RecordBatch; +use futures::stream::BoxStream; +use futures::TryStreamExt; +use object_store::ObjectStore; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::errors::ParquetError; +use snafu::ResultExt; +use table::predicate::Predicate; +use tokio::io::BufReader; + +use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result}; +use crate::read::{Batch, BatchReader}; +use crate::sst::file::FileHandle; + +/// Parquet SST reader builder. +pub struct ParquetReaderBuilder { + file_dir: String, + file_handle: FileHandle, + object_store: ObjectStore, + predicate: Option, + time_range: Option, +} + +impl ParquetReaderBuilder { + /// Returns a new [ParquetReaderBuilder] to read specific SST. + pub fn new( + file_dir: String, + file_handle: FileHandle, + object_store: ObjectStore, + ) -> ParquetReaderBuilder { + ParquetReaderBuilder { + file_dir, + file_handle, + object_store, + predicate: None, + time_range: None, + } + } + + /// Attaches the predicate to the builder. + pub fn predicate(mut self, predicate: Predicate) -> ParquetReaderBuilder { + self.predicate = Some(predicate); + self + } + + /// Attaches the time range to the builder. + pub fn time_range(mut self, time_range: TimestampRange) -> ParquetReaderBuilder { + self.time_range = Some(time_range); + self + } + + /// Builds a [ParquetReader]. + pub fn build(self) -> ParquetReader { + let file_path = self.file_handle.file_path(&self.file_dir); + ParquetReader { + file_path, + file_handle: self.file_handle, + object_store: self.object_store, + predicate: self.predicate, + time_range: self.time_range, + stream: None, + } + } +} + +type BoxedRecordBatchStream = BoxStream<'static, std::result::Result>; + +/// Parquet batch reader. +pub struct ParquetReader { + /// Path of the file. + file_path: String, + /// SST file to read. + /// + /// Holds the file handle to avoid the file purge purge it. + file_handle: FileHandle, + object_store: ObjectStore, + /// Predicate to push down. + predicate: Option, + /// Time range to filter. + time_range: Option, + + /// Inner parquet record batch stream. + stream: Option, +} + +impl ParquetReader { + /// Initializes the reader and the parquet stream. + async fn maybe_init(&mut self) -> Result<()> { + if self.stream.is_some() { + // Already initialized. + return Ok(()); + } + + let reader = self + .object_store + .reader(&self.file_path) + .await + .context(OpenDalSnafu)? + .compat(); + let buf_reader = BufReader::new(reader); + let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + .await + .context(ReadParquetSnafu { + path: &self.file_path, + })?; + + // TODO(yingwen): Decode region metadata, create read adapter. + + // Prune row groups by metadata. + if let Some(predicate) = &self.predicate { + let pruned_row_groups = predicate + .prune_row_groups(builder.metadata().row_groups()) + .into_iter() + .enumerate() + .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) + .collect::>(); + builder = builder.with_row_groups(pruned_row_groups); + } + + // TODO(yingwen): Projection. + + let stream = builder.build().context(ReadParquetSnafu { + path: &self.file_path, + })?; + self.stream = Some(Box::pin(stream)); + + Ok(()) + } + + /// Converts our [Batch] from arrow's [RecordBatch]. + fn convert_arrow_record_batch(&self, _record_batch: RecordBatch) -> Result { + unimplemented!() + } +} + +#[async_trait] +impl BatchReader for ParquetReader { + async fn next_batch(&mut self) -> Result> { + self.maybe_init().await?; + + self.stream + .as_mut() + .unwrap() + .try_next() + .await + .context(ReadParquetSnafu { + path: &self.file_path, + })? + .map(|rb| self.convert_arrow_record_batch(rb)) + .transpose() + } +} diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs new file mode 100644 index 0000000000..577f998293 --- /dev/null +++ b/src/mito2/src/sst/parquet/writer.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet writer. + +use common_telemetry::logging; +use object_store::ObjectStore; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::metadata::KeyValue; +use parquet::file::properties::WriterProperties; + +use crate::error::Result; +use crate::read::Source; +use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; +use crate::sst::stream_writer::BufferedWriter; + +/// Parquet SST writer. +pub struct ParquetWriter<'a> { + /// SST output file path. + file_path: &'a str, + /// Input data source. + source: Source, + object_store: ObjectStore, +} + +impl<'a> ParquetWriter<'a> { + /// Creates a new parquet SST writer. + pub fn new(file_path: &'a str, source: Source, object_store: ObjectStore) -> ParquetWriter { + ParquetWriter { + file_path, + source, + object_store, + } + } + + /// Iterates source and writes all rows to Parquet file. + /// + /// Returns the [SstInfo] if the SST is written. + pub async fn write_all(&mut self, opts: &WriteOptions) -> Result> { + let metadata = self.source.metadata(); + + let json = metadata.to_json()?; + let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + + // FIXME(yingwen): encode metadata into key value. + let props_builder = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_encoding(Encoding::PLAIN) + .set_max_row_group_size(opts.row_group_size); + // TODO(yingwen): Set column encoding for internal columns and timestamp. + // e.g. Use DELTA_BINARY_PACKED and disable dictionary for sequence. + + let writer_props = props_builder.build(); + + let mut buffered_writer = BufferedWriter::try_new( + self.file_path.to_string(), + self.object_store.clone(), + &metadata.schema, + Some(writer_props), + opts.write_buffer_size.as_bytes() as usize, + ) + .await?; + + while let Some(batch) = self.source.next_batch().await? { + buffered_writer.write(&batch).await?; + } + // Get stats from the source. + let stats = self.source.stats(); + + if stats.num_rows == 0 { + logging::debug!( + "No data written, try to stop the writer: {}", + self.file_path + ); + + buffered_writer.close().await?; + return Ok(None); + } + + let (_file_meta, file_size) = buffered_writer.close().await?; + let time_range = (stats.min_timestamp, stats.max_timestamp); + + // object_store.write will make sure all bytes are written or an error is raised. + Ok(Some(SstInfo { + time_range, + file_size, + num_rows: stats.num_rows, + })) + } +} + +// TODO(yingwen): Port tests. diff --git a/src/mito2/src/sst/stream_writer.rs b/src/mito2/src/sst/stream_writer.rs new file mode 100644 index 0000000000..62fa4df7e6 --- /dev/null +++ b/src/mito2/src/sst/stream_writer.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::pin::Pin; + +use common_datasource::buffered_writer::LazyBufferedWriter; +use common_datasource::share_buffer::SharedBuffer; +use datatypes::arrow; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::schema::SchemaRef; +use object_store::ObjectStore; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use parquet::format::FileMetaData; +use snafu::ResultExt; + +use crate::error; +use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu}; +use crate::read::Batch; + +/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying +/// storage by chunks to reduce memory consumption. +pub struct BufferedWriter { + inner: InnerBufferedWriter, + arrow_schema: arrow::datatypes::SchemaRef, +} + +type InnerBufferedWriter = LazyBufferedWriter< + object_store::Writer, + ArrowWriter, + Box< + dyn FnMut( + String, + ) -> Pin< + Box< + dyn Future> + + Send, + >, + > + Send, + >, +>; + +impl BufferedWriter { + pub async fn try_new( + path: String, + store: ObjectStore, + schema: &SchemaRef, + props: Option, + buffer_threshold: usize, + ) -> error::Result { + let arrow_schema = schema.arrow_schema(); + let buffer = SharedBuffer::with_capacity(buffer_threshold); + + let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props) + .context(WriteParquetSnafu { path: &path })?; + + Ok(Self { + inner: LazyBufferedWriter::new( + buffer_threshold, + buffer, + arrow_writer, + &path, + Box::new(move |path| { + let store = store.clone(); + Box::pin(async move { + store + .writer(&path) + .await + .context(common_datasource::error::WriteObjectSnafu { path }) + }) + }), + ), + arrow_schema: arrow_schema.clone(), + }) + } + + /// Write a record batch to stream writer. + pub async fn write(&mut self, batch: &Batch) -> error::Result<()> { + let arrow_batch = RecordBatch::try_new( + self.arrow_schema.clone(), + batch + .columns + .iter() + .map(|v| v.to_arrow_array()) + .collect::>(), + ) + .context(NewRecordBatchSnafu)?; + + self.inner + .write(&arrow_batch) + .await + .context(error::WriteBufferSnafu)?; + self.inner + .try_flush(false) + .await + .context(error::WriteBufferSnafu)?; + + Ok(()) + } + + /// Close parquet writer. + pub async fn close(self) -> error::Result<(FileMetaData, u64)> { + self.inner + .close_with_arrow_writer() + .await + .context(error::WriteBufferSnafu) + } +} diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 1994c1d74c..3ff71f8fce 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -43,6 +43,15 @@ pub fn join_dir(parent: &str, child: &str) -> String { opendal::raw::normalize_root(&output) } +/// Push `child` to `parent` dir and normalize the output path. +/// +/// - Path endswith `/` means it's a dir path. +/// - Otherwise, it's a file path. +pub fn join_path(parent: &str, child: &str) -> String { + let output = format!("{parent}/{child}"); + opendal::raw::normalize_path(&output) +} + #[cfg(test)] mod tests { use super::*; @@ -55,7 +64,7 @@ mod tests { } #[test] - fn test_join_paths() { + fn test_join_dir() { assert_eq!("/", join_dir("", "")); assert_eq!("/", join_dir("/", "")); assert_eq!("/", join_dir("", "/")); @@ -67,4 +76,18 @@ mod tests { assert_eq!("/a/b/c/", join_dir("/a/b", "/c/")); assert_eq!("/a/b/c/", join_dir("/a/b", "//c")); } + + #[test] + fn test_join_path() { + assert_eq!("/", join_path("", "")); + assert_eq!("/", join_path("/", "")); + assert_eq!("/", join_path("", "/")); + assert_eq!("/", join_path("/", "/")); + assert_eq!("a/", join_path("a", "")); + assert_eq!("a/b/c.txt", join_path("a/b", "c.txt")); + assert_eq!("a/b/c.txt", join_path("/a/b", "c.txt")); + assert_eq!("a/b/c/", join_path("/a/b", "c/")); + assert_eq!("a/b/c/", join_path("/a/b", "/c/")); + assert_eq!("a/b/c.txt", join_path("/a/b", "//c.txt")); + } }