diff --git a/src/common/datasource/src/buffered_writer.rs b/src/common/datasource/src/buffered_writer.rs index 5ab3ce5886..852486ef9f 100644 --- a/src/common/datasource/src/buffered_writer.rs +++ b/src/common/datasource/src/buffered_writer.rs @@ -47,7 +47,7 @@ pub trait ArrowWriterCloser { impl< T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder + ArrowWriterCloser, - F: FnMut(String) -> Fut, + F: Fn(String) -> Fut, Fut: Future>, > LazyBufferedWriter { @@ -75,7 +75,7 @@ impl< impl< T: AsyncWrite + Send + Unpin, U: DfRecordBatchEncoder, - F: FnMut(String) -> Fut, + F: Fn(String) -> Fut, Fut: Future>, > LazyBufferedWriter { @@ -149,7 +149,7 @@ impl< if let Some(ref mut writer) = self.writer { Ok(writer) } else { - let writer = (self.writer_factory)(self.path.clone()).await?; + let writer = (self.writer_factory)(self.path.to_string()).await?; Ok(self.writer.insert(writer)) } } diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index ae300d38e3..8774c946d5 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -193,13 +193,15 @@ pub async fn stream_to_file T>( store: ObjectStore, path: &str, threshold: usize, + concurrency: usize, encoder_factory: U, ) -> Result { let buffer = SharedBuffer::with_capacity(threshold); let encoder = encoder_factory(buffer.clone()); let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async { store - .writer(&path) + .writer_with(&path) + .concurrent(concurrency) .await .context(error::WriteObjectSnafu { path }) }); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index eebe86ac18..0767722d9b 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -193,8 +193,9 @@ pub async fn stream_to_csv( store: ObjectStore, path: &str, threshold: usize, + concurrency: usize, ) -> Result { - stream_to_file(stream, store, path, threshold, |buffer| { + stream_to_file(stream, store, path, threshold, concurrency, |buffer| { csv::Writer::new(buffer) }) .await diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index bcf162971e..77fde3fddb 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -152,8 +152,9 @@ pub async fn stream_to_json( store: ObjectStore, path: &str, threshold: usize, + concurrency: usize, ) -> Result { - stream_to_file(stream, store, path, threshold, |buffer| { + stream_to_file(stream, store, path, threshold, concurrency, |buffer| { json::LineDelimitedWriter::new(buffer) }) .await diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 220e8b5d68..c21bead9f7 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; -use std::pin::Pin; use std::result; use std::sync::Arc; @@ -31,7 +29,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; use futures::future::BoxFuture; use futures::StreamExt; -use object_store::{ObjectStore, Reader}; +use object_store::{ObjectStore, Reader, Writer}; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use snafu::ResultExt; @@ -171,22 +169,33 @@ pub struct BufferedWriter { type InnerBufferedWriter = LazyBufferedWriter< object_store::Writer, ArrowWriter, - Box< - dyn FnMut( - String, - ) - -> Pin> + Send>> - + Send, - >, + impl Fn(String) -> BoxFuture<'static, Result>, >; impl BufferedWriter { + fn make_write_factory( + store: ObjectStore, + concurrency: usize, + ) -> impl Fn(String) -> BoxFuture<'static, Result> { + move |path| { + let store = store.clone(); + Box::pin(async move { + store + .writer_with(&path) + .concurrent(concurrency) + .await + .context(error::WriteObjectSnafu { path }) + }) + } + } + pub async fn try_new( path: String, store: ObjectStore, arrow_schema: SchemaRef, props: Option, buffer_threshold: usize, + concurrency: usize, ) -> error::Result { let buffer = SharedBuffer::with_capacity(buffer_threshold); @@ -199,15 +208,7 @@ impl BufferedWriter { buffer, arrow_writer, &path, - Box::new(move |path| { - let store = store.clone(); - Box::pin(async move { - store - .writer(&path) - .await - .context(error::WriteObjectSnafu { path }) - }) - }), + Self::make_write_factory(store, concurrency), ), }) } @@ -236,6 +237,7 @@ pub async fn stream_to_parquet( store: ObjectStore, path: &str, threshold: usize, + concurrency: usize, ) -> Result { let write_props = WriterProperties::builder() .set_compression(Compression::ZSTD(ZstdLevel::default())) @@ -247,6 +249,7 @@ pub async fn stream_to_parquet( schema, Some(write_props), threshold, + concurrency, ) .await?; let mut rows_written = 0; diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 775ae65ff4..8cb8756e06 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(type_alias_impl_trait)] pub mod buffered_writer; pub mod compression; diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 7e6abd2b9d..04125f1613 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -113,6 +113,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi tmp_store.clone(), &output_path, threshold(size), + 8 ) .await .is_ok()); @@ -150,6 +151,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz tmp_store.clone(), &output_path, threshold(size), + 8 ) .await .is_ok()); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 0840fa22fe..ff6f9dea60 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -30,7 +30,7 @@ use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::IndexerBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; -use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; +use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. /// @@ -180,6 +180,7 @@ impl WriteCache { let mut writer = remote_store .writer_with(upload_path) .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .concurrent(DEFAULT_WRITE_CONCURRENCY) .await .context(error::OpenDalSnafu)?; diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index bdae9d11aa..ad750c7709 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -25,3 +25,6 @@ pub(crate) mod version; /// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb). pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); + +/// Default number of concurrent write, it only works on object store backend(e.g., S3). +pub const DEFAULT_WRITE_CONCURRENCY: usize = 8; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index d2d9cc2d49..6ef02036f7 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -35,6 +35,7 @@ use crate::sst::index::Indexer; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; +use crate::sst::DEFAULT_WRITE_CONCURRENCY; /// Parquet SST writer. pub struct ParquetWriter { @@ -90,6 +91,7 @@ impl ParquetWriter { write_format.arrow_schema(), Some(writer_props), opts.write_buffer_size.as_bytes() as usize, + DEFAULT_WRITE_CONCURRENCY, ) .await .context(WriteBufferSnafu)?; diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 24390e6427..f0498a88e2 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -43,6 +43,9 @@ use crate::statement::StatementExecutor; /// Buffer size to flush data to object stores. const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8); +/// Default number of concurrent write, it only works on object store backend(e.g., S3). +const WRITE_CONCURRENCY: usize = 8; + impl StatementExecutor { async fn stream_to_file( &self, @@ -59,6 +62,7 @@ impl StatementExecutor { object_store, path, threshold, + WRITE_CONCURRENCY, ) .await .context(error::WriteStreamToFileSnafu { path }), @@ -67,6 +71,7 @@ impl StatementExecutor { object_store, path, threshold, + WRITE_CONCURRENCY, ) .await .context(error::WriteStreamToFileSnafu { path }), @@ -75,6 +80,7 @@ impl StatementExecutor { object_store, path, threshold, + WRITE_CONCURRENCY, ) .await .context(error::WriteStreamToFileSnafu { path }),