feat: opendal metrics

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-09-29 18:23:41 +08:00
parent 26bdb6a413
commit 0d5b423eb7
2 changed files with 135 additions and 11 deletions

View File

@@ -52,6 +52,12 @@ pub struct Metrics {
pub index_finish: Duration,
pub close: Duration,
pub num_series: usize,
// SST Opendal metrics.
pub opendal_create_cost: Duration,
pub opendal_num_writes: usize,
pub opendal_write_cost: Duration,
pub opendal_complete_cost: Duration,
}
impl Metrics {
@@ -186,9 +192,16 @@ impl AccessLayer {
path_provider,
)
.await;
writer
let sst_info = writer
.write_all(request.source, request.max_sequence, write_opts, metrics)
.await?
.await?;
let opendal_metrics = writer.opendal_metrics_val();
metrics.opendal_create_cost += opendal_metrics.create_cost;
metrics.opendal_num_writes += opendal_metrics.num_writes;
metrics.opendal_write_cost += opendal_metrics.write_cost;
metrics.opendal_complete_cost += opendal_metrics.complete_cost;
sst_info
};
// Put parquet metadata to cache manager.

View File

@@ -17,15 +17,19 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Instant;
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use object_store::{FuturesAsyncWriter, ObjectStore};
use futures::future::BoxFuture;
use object_store::{FuturesAsyncWriter, ObjectStore, Writer};
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
@@ -52,7 +56,7 @@ use crate::Metrics;
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
writer: Option<AsyncArrowWriter<OpenDalWriter>>,
/// Current active file id.
current_file: FileId,
writer_factory: F,
@@ -63,11 +67,18 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
opendal_metrics: Arc<Mutex<OpenDalMetrics>>,
}
pub trait WriterFactory {
type Writer: AsyncWrite + Send + Unpin;
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> impl Future<Output = Result<OpenDalWriter>>;
}
pub struct ObjectStoreWriterFactory {
@@ -86,6 +97,22 @@ impl WriterFactory for ObjectStoreWriterFactory {
.map(|v| v.into_futures_async_write().compat_write())
.context(OpenDalSnafu)
}
async fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> Result<OpenDalWriter> {
let writer = self
.object_store
.writer_with(file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(OpenDalSnafu)?;
Ok(OpenDalWriter::new(writer, size))
}
}
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
@@ -107,6 +134,10 @@ where
)
.await
}
pub fn opendal_metrics_val(&self) -> OpenDalMetrics {
self.opendal_metrics.lock().unwrap().clone()
}
}
impl<F, I, P> ParquetWriter<F, I, P>
@@ -134,6 +165,7 @@ where
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
opendal_metrics: Arc::new(Mutex::new(OpenDalMetrics::default())),
}
}
@@ -282,7 +314,7 @@ where
&mut self,
schema: &SchemaRef,
opts: &WriteOptions,
) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
) -> Result<&mut AsyncArrowWriter<OpenDalWriter>> {
if let Some(ref mut w) = self.writer {
Ok(w)
} else {
@@ -300,10 +332,17 @@ where
let writer_props = props_builder.build();
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
let writer = SizeAwareWriter::new(
self.writer_factory.create(&sst_file_path).await?,
self.bytes_written.clone(),
);
// let writer = SizeAwareWriter::new(
// self.writer_factory.create(&sst_file_path).await?,
// self.bytes_written.clone(),
// );
let create_start = Instant::now();
let mut writer = self
.writer_factory
.create_opendal(&sst_file_path, self.bytes_written.clone())
.await?;
self.opendal_metrics.lock().unwrap().create_cost += create_start.elapsed();
writer = writer.with_metrics(self.opendal_metrics.clone());
let arrow_writer =
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
@@ -343,6 +382,78 @@ impl SourceStats {
}
}
#[derive(Default, Debug, Clone)]
pub(crate) struct OpenDalMetrics {
pub(crate) create_cost: Duration,
pub(crate) num_writes: usize,
pub(crate) write_cost: Duration,
pub(crate) complete_cost: Duration,
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
pub struct OpenDalWriter {
inner: Writer,
size: Arc<AtomicUsize>,
metrics: Option<Arc<Mutex<OpenDalMetrics>>>,
}
impl OpenDalWriter {
fn new(inner: Writer, size: Arc<AtomicUsize>) -> Self {
Self {
inner,
size: size.clone(),
metrics: None,
}
}
fn with_metrics(mut self, metrics: Arc<Mutex<OpenDalMetrics>>) -> Self {
self.metrics = Some(metrics);
self
}
}
impl AsyncFileWriter for OpenDalWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ParquetError>> {
let write_start = Instant::now();
let size = self.size.clone();
let metrics = self.metrics.clone();
Box::pin(async move {
let bytes_written = bs.len();
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
size.fetch_add(bytes_written, Ordering::Relaxed);
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.num_writes += 1;
m.write_cost += write_start.elapsed();
}
Ok(())
})
}
fn complete(&mut self) -> BoxFuture<'_, Result<(), ParquetError>> {
let complete_start = Instant::now();
let metrics = self.metrics.clone();
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))?;
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.complete_cost += complete_start.elapsed();
}
Ok(())
})
}
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
struct SizeAwareWriter<W> {