feat: enable concurrent write (#3214)

* feat: enable concurrent write

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-23 18:20:12 +09:00
committed by GitHub
parent 8485c9af33
commit 26535f577d
11 changed files with 48 additions and 26 deletions

View File

@@ -47,7 +47,7 @@ pub trait ArrowWriterCloser {
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: FnMut(String) -> Fut,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
@@ -75,7 +75,7 @@ impl<
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: FnMut(String) -> Fut,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
@@ -149,7 +149,7 @@ impl<
if let Some(ref mut writer) = self.writer {
Ok(writer)
} else {
let writer = (self.writer_factory)(self.path.clone()).await?;
let writer = (self.writer_factory)(self.path.to_string()).await?;
Ok(self.writer.insert(writer))
}
}

View File

@@ -193,13 +193,15 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
store: ObjectStore,
path: &str,
threshold: usize,
concurrency: usize,
encoder_factory: U,
) -> Result<usize> {
let buffer = SharedBuffer::with_capacity(threshold);
let encoder = encoder_factory(buffer.clone());
let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
store
.writer(&path)
.writer_with(&path)
.concurrent(concurrency)
.await
.context(error::WriteObjectSnafu { path })
});

View File

@@ -193,8 +193,9 @@ pub async fn stream_to_csv(
store: ObjectStore,
path: &str,
threshold: usize,
concurrency: usize,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, |buffer| {
stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
csv::Writer::new(buffer)
})
.await

View File

@@ -152,8 +152,9 @@ pub async fn stream_to_json(
store: ObjectStore,
path: &str,
threshold: usize,
concurrency: usize,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, |buffer| {
stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
json::LineDelimitedWriter::new(buffer)
})
.await

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::pin::Pin;
use std::result;
use std::sync::Arc;
@@ -31,7 +29,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{ObjectStore, Reader};
use object_store::{ObjectStore, Reader, Writer};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
@@ -171,22 +169,33 @@ pub struct BufferedWriter {
type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
ArrowWriter<SharedBuffer>,
Box<
dyn FnMut(
String,
)
-> Pin<Box<dyn Future<Output = error::Result<object_store::Writer>> + Send>>
+ Send,
>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
>;
impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer_with(&path)
.concurrent(concurrency)
.await
.context(error::WriteObjectSnafu { path })
})
}
}
pub async fn try_new(
path: String,
store: ObjectStore,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
concurrency: usize,
) -> error::Result<Self> {
let buffer = SharedBuffer::with_capacity(buffer_threshold);
@@ -199,15 +208,7 @@ impl BufferedWriter {
buffer,
arrow_writer,
&path,
Box::new(move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer(&path)
.await
.context(error::WriteObjectSnafu { path })
})
}),
Self::make_write_factory(store, concurrency),
),
})
}
@@ -236,6 +237,7 @@ pub async fn stream_to_parquet(
store: ObjectStore,
path: &str,
threshold: usize,
concurrency: usize,
) -> Result<usize> {
let write_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
@@ -247,6 +249,7 @@ pub async fn stream_to_parquet(
schema,
Some(write_props),
threshold,
concurrency,
)
.await?;
let mut rows_written = 0;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(type_alias_impl_trait)]
pub mod buffered_writer;
pub mod compression;

View File

@@ -113,6 +113,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
tmp_store.clone(),
&output_path,
threshold(size),
8
)
.await
.is_ok());
@@ -150,6 +151,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
tmp_store.clone(),
&output_path,
threshold(size),
8
)
.await
.is_ok());

View File

@@ -30,7 +30,7 @@ use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
/// A cache for uploading files to remote object stores.
///
@@ -180,6 +180,7 @@ impl WriteCache {
let mut writer = remote_store
.writer_with(upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(error::OpenDalSnafu)?;

View File

@@ -25,3 +25,6 @@ pub(crate) mod version;
/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
/// Default number of concurrent write, it only works on object store backend(e.g., S3).
pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;

View File

@@ -35,6 +35,7 @@ use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
/// Parquet SST writer.
pub struct ParquetWriter {
@@ -90,6 +91,7 @@ impl ParquetWriter {
write_format.arrow_schema(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.context(WriteBufferSnafu)?;

View File

@@ -43,6 +43,9 @@ use crate::statement::StatementExecutor;
/// Buffer size to flush data to object stores.
const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
/// Default number of concurrent write, it only works on object store backend(e.g., S3).
const WRITE_CONCURRENCY: usize = 8;
impl StatementExecutor {
async fn stream_to_file(
&self,
@@ -59,6 +62,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENCY,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
@@ -67,6 +71,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENCY,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
@@ -75,6 +80,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENCY,
)
.await
.context(error::WriteStreamToFileSnafu { path }),