diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 2950ae6a1c..74f7e1f371 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -52,6 +52,12 @@ pub struct Metrics { pub index_finish: Duration, pub close: Duration, pub num_series: usize, + + // SST Opendal metrics. + pub opendal_create_cost: Duration, + pub opendal_num_writes: usize, + pub opendal_write_cost: Duration, + pub opendal_complete_cost: Duration, } impl Metrics { @@ -186,9 +192,16 @@ impl AccessLayer { path_provider, ) .await; - writer + let sst_info = writer .write_all(request.source, request.max_sequence, write_opts, metrics) - .await? + .await?; + let opendal_metrics = writer.opendal_metrics_val(); + metrics.opendal_create_cost += opendal_metrics.create_cost; + metrics.opendal_num_writes += opendal_metrics.num_writes; + metrics.opendal_write_cost += opendal_metrics.write_cost; + metrics.opendal_complete_cost += opendal_metrics.complete_cost; + + sst_info }; // Put parquet metadata to cache manager. diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index b6a4360581..b873e528e1 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -17,15 +17,19 @@ use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Duration, Instant}; +use bytes::Bytes; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; -use object_store::{FuturesAsyncWriter, ObjectStore}; +use futures::future::BoxFuture; +use object_store::{FuturesAsyncWriter, ObjectStore, Writer}; +use parquet::arrow::async_writer::AsyncFileWriter; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::errors::ParquetError; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; @@ -52,7 +56,7 @@ use crate::Metrics; pub struct ParquetWriter { /// Path provider that creates SST and index file paths according to file id. path_provider: P, - writer: Option>>, + writer: Option>, /// Current active file id. current_file: FileId, writer_factory: F, @@ -63,11 +67,18 @@ pub struct ParquetWriter, bytes_written: Arc, + opendal_metrics: Arc>, } pub trait WriterFactory { type Writer: AsyncWrite + Send + Unpin; fn create(&mut self, file_path: &str) -> impl Future>; + + fn create_opendal( + &mut self, + file_path: &str, + size: Arc, + ) -> impl Future>; } pub struct ObjectStoreWriterFactory { @@ -86,6 +97,22 @@ impl WriterFactory for ObjectStoreWriterFactory { .map(|v| v.into_futures_async_write().compat_write()) .context(OpenDalSnafu) } + + async fn create_opendal( + &mut self, + file_path: &str, + size: Arc, + ) -> Result { + let writer = self + .object_store + .writer_with(file_path) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .concurrent(DEFAULT_WRITE_CONCURRENCY) + .await + .context(OpenDalSnafu)?; + + Ok(OpenDalWriter::new(writer, size)) + } } impl ParquetWriter @@ -107,6 +134,10 @@ where ) .await } + + pub fn opendal_metrics_val(&self) -> OpenDalMetrics { + self.opendal_metrics.lock().unwrap().clone() + } } impl ParquetWriter @@ -134,6 +165,7 @@ where indexer_builder, current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), + opendal_metrics: Arc::new(Mutex::new(OpenDalMetrics::default())), } } @@ -282,7 +314,7 @@ where &mut self, schema: &SchemaRef, opts: &WriteOptions, - ) -> Result<&mut AsyncArrowWriter>> { + ) -> Result<&mut AsyncArrowWriter> { if let Some(ref mut w) = self.writer { Ok(w) } else { @@ -300,10 +332,17 @@ where let writer_props = props_builder.build(); let sst_file_path = self.path_provider.build_sst_file_path(self.current_file); - let writer = SizeAwareWriter::new( - self.writer_factory.create(&sst_file_path).await?, - self.bytes_written.clone(), - ); + // let writer = SizeAwareWriter::new( + // self.writer_factory.create(&sst_file_path).await?, + // self.bytes_written.clone(), + // ); + let create_start = Instant::now(); + let mut writer = self + .writer_factory + .create_opendal(&sst_file_path, self.bytes_written.clone()) + .await?; + self.opendal_metrics.lock().unwrap().create_cost += create_start.elapsed(); + writer = writer.with_metrics(self.opendal_metrics.clone()); let arrow_writer = AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; @@ -343,6 +382,78 @@ impl SourceStats { } } +#[derive(Default, Debug, Clone)] +pub(crate) struct OpenDalMetrics { + pub(crate) create_cost: Duration, + pub(crate) num_writes: usize, + pub(crate) write_cost: Duration, + pub(crate) complete_cost: Duration, +} + +/// Workaround for [AsyncArrowWriter] does not provide a method to +/// get total bytes written after close. +pub struct OpenDalWriter { + inner: Writer, + size: Arc, + metrics: Option>>, +} + +impl OpenDalWriter { + fn new(inner: Writer, size: Arc) -> Self { + Self { + inner, + size: size.clone(), + metrics: None, + } + } + + fn with_metrics(mut self, metrics: Arc>) -> Self { + self.metrics = Some(metrics); + self + } +} + +impl AsyncFileWriter for OpenDalWriter { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ParquetError>> { + let write_start = Instant::now(); + let size = self.size.clone(); + let metrics = self.metrics.clone(); + Box::pin(async move { + let bytes_written = bs.len(); + self.inner + .write(bs) + .await + .map_err(|err| ParquetError::External(Box::new(err)))?; + + size.fetch_add(bytes_written, Ordering::Relaxed); + if let Some(metrics) = metrics { + let mut m = metrics.lock().unwrap(); + m.num_writes += 1; + m.write_cost += write_start.elapsed(); + } + Ok(()) + }) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<(), ParquetError>> { + let complete_start = Instant::now(); + let metrics = self.metrics.clone(); + Box::pin(async move { + self.inner + .close() + .await + .map(|_| ()) + .map_err(|err| ParquetError::External(Box::new(err)))?; + + if let Some(metrics) = metrics { + let mut m = metrics.lock().unwrap(); + m.complete_cost += complete_start.elapsed(); + } + Ok(()) + }) + } +} + /// Workaround for [AsyncArrowWriter] does not provide a method to /// get total bytes written after close. struct SizeAwareWriter {