mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
chore: refine metrics tracking the flush/compaction cost time (#6630)
chore: refine metrics tracking the per-stage cost time during flush and compaction Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{join_dir, with_instrument_layers};
|
||||
@@ -28,6 +29,7 @@ use crate::cache::write_cache::SstUploadRequest;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
|
||||
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
|
||||
use crate::read::Source;
|
||||
use crate::region::options::IndexOptions;
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId};
|
||||
@@ -43,6 +45,87 @@ pub type AccessLayerRef = Arc<AccessLayer>;
|
||||
/// SST write results.
|
||||
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
|
||||
|
||||
/// Write operation type.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub enum WriteType {
|
||||
/// Writes from flush
|
||||
Flush,
|
||||
/// Writes from compaction.
|
||||
Compaction,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Metrics {
|
||||
pub(crate) write_type: WriteType,
|
||||
pub(crate) iter_source: Duration,
|
||||
pub(crate) write_batch: Duration,
|
||||
pub(crate) update_index: Duration,
|
||||
pub(crate) upload_parquet: Duration,
|
||||
pub(crate) upload_puffin: Duration,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub(crate) fn new(write_type: WriteType) -> Self {
|
||||
Self {
|
||||
write_type,
|
||||
iter_source: Default::default(),
|
||||
write_batch: Default::default(),
|
||||
update_index: Default::default(),
|
||||
upload_parquet: Default::default(),
|
||||
upload_puffin: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn merge(mut self, other: Self) -> Self {
|
||||
assert_eq!(self.write_type, other.write_type);
|
||||
self.iter_source += other.iter_source;
|
||||
self.write_batch += other.write_batch;
|
||||
self.update_index += other.update_index;
|
||||
self.upload_parquet += other.upload_parquet;
|
||||
self.upload_puffin += other.upload_puffin;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn observe(self) {
|
||||
match self.write_type {
|
||||
WriteType::Flush => {
|
||||
FLUSH_ELAPSED
|
||||
.with_label_values(&["iter_source"])
|
||||
.observe(self.iter_source.as_secs_f64());
|
||||
FLUSH_ELAPSED
|
||||
.with_label_values(&["write_batch"])
|
||||
.observe(self.write_batch.as_secs_f64());
|
||||
FLUSH_ELAPSED
|
||||
.with_label_values(&["update_index"])
|
||||
.observe(self.update_index.as_secs_f64());
|
||||
FLUSH_ELAPSED
|
||||
.with_label_values(&["upload_parquet"])
|
||||
.observe(self.upload_parquet.as_secs_f64());
|
||||
FLUSH_ELAPSED
|
||||
.with_label_values(&["upload_puffin"])
|
||||
.observe(self.upload_puffin.as_secs_f64());
|
||||
}
|
||||
WriteType::Compaction => {
|
||||
COMPACTION_STAGE_ELAPSED
|
||||
.with_label_values(&["iter_source"])
|
||||
.observe(self.iter_source.as_secs_f64());
|
||||
COMPACTION_STAGE_ELAPSED
|
||||
.with_label_values(&["write_batch"])
|
||||
.observe(self.write_batch.as_secs_f64());
|
||||
COMPACTION_STAGE_ELAPSED
|
||||
.with_label_values(&["update_index"])
|
||||
.observe(self.update_index.as_secs_f64());
|
||||
COMPACTION_STAGE_ELAPSED
|
||||
.with_label_values(&["upload_parquet"])
|
||||
.observe(self.upload_parquet.as_secs_f64());
|
||||
COMPACTION_STAGE_ELAPSED
|
||||
.with_label_values(&["upload_puffin"])
|
||||
.observe(self.upload_puffin.as_secs_f64());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer to access SST files under the same directory.
|
||||
pub struct AccessLayer {
|
||||
table_dir: String,
|
||||
@@ -145,11 +228,12 @@ impl AccessLayer {
|
||||
&self,
|
||||
request: SstWriteRequest,
|
||||
write_opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
write_type: WriteType,
|
||||
) -> Result<(SstInfoArray, Metrics)> {
|
||||
let region_id = request.metadata.region_id;
|
||||
let cache_manager = request.cache_manager.clone();
|
||||
|
||||
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
|
||||
let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
|
||||
// Write to the write cache.
|
||||
write_cache
|
||||
.write_and_upload_sst(
|
||||
@@ -162,6 +246,7 @@ impl AccessLayer {
|
||||
remote_store: self.object_store.clone(),
|
||||
},
|
||||
write_opts,
|
||||
write_type,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
@@ -190,12 +275,15 @@ impl AccessLayer {
|
||||
request.metadata,
|
||||
indexer_builder,
|
||||
path_provider,
|
||||
Metrics::new(write_type),
|
||||
)
|
||||
.await
|
||||
.with_file_cleaner(cleaner);
|
||||
writer
|
||||
let ssts = writer
|
||||
.write_all(request.source, request.max_sequence, write_opts)
|
||||
.await?
|
||||
.await?;
|
||||
let metrics = writer.into_metrics();
|
||||
(ssts, metrics)
|
||||
};
|
||||
|
||||
// Put parquet metadata to cache manager.
|
||||
@@ -210,7 +298,7 @@ impl AccessLayer {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(sst_info)
|
||||
Ok((sst_info, metrics))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
58
src/mito2/src/cache/write_cache.rs
vendored
58
src/mito2/src/cache/write_cache.rs
vendored
@@ -15,7 +15,7 @@
|
||||
//! A write-through cache for remote object stores.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{debug, info};
|
||||
@@ -25,14 +25,13 @@ use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::access_layer::{
|
||||
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
|
||||
TempFileCleaner, WriteCachePathProvider,
|
||||
new_fs_cache_store, FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray,
|
||||
SstWriteRequest, TempFileCleaner, WriteCachePathProvider, WriteType,
|
||||
};
|
||||
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics::{
|
||||
FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
|
||||
WRITE_CACHE_DOWNLOAD_ELAPSED,
|
||||
UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
|
||||
};
|
||||
use crate::sst::file::RegionFileId;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
@@ -108,11 +107,8 @@ impl WriteCache {
|
||||
write_request: SstWriteRequest,
|
||||
upload_request: SstUploadRequest,
|
||||
write_opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let timer = FLUSH_ELAPSED
|
||||
.with_label_values(&["write_sst"])
|
||||
.start_timer();
|
||||
|
||||
write_type: WriteType,
|
||||
) -> Result<(SstInfoArray, Metrics)> {
|
||||
let region_id = write_request.metadata.region_id;
|
||||
|
||||
let store = self.file_cache.local_store();
|
||||
@@ -138,6 +134,7 @@ impl WriteCache {
|
||||
write_request.metadata,
|
||||
indexer,
|
||||
path_provider.clone(),
|
||||
Metrics::new(write_type),
|
||||
)
|
||||
.await
|
||||
.with_file_cleaner(cleaner);
|
||||
@@ -145,12 +142,11 @@ impl WriteCache {
|
||||
let sst_info = writer
|
||||
.write_all(write_request.source, write_request.max_sequence, write_opts)
|
||||
.await?;
|
||||
|
||||
timer.stop_and_record();
|
||||
let mut metrics = writer.into_metrics();
|
||||
|
||||
// Upload sst file to remote object store.
|
||||
if sst_info.is_empty() {
|
||||
return Ok(sst_info);
|
||||
return Ok((sst_info, metrics));
|
||||
}
|
||||
|
||||
let mut upload_tracker = UploadTracker::new(region_id);
|
||||
@@ -161,10 +157,12 @@ impl WriteCache {
|
||||
let parquet_path = upload_request
|
||||
.dest_path_provider
|
||||
.build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
|
||||
let start = Instant::now();
|
||||
if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
|
||||
err = Some(e);
|
||||
break;
|
||||
}
|
||||
metrics.upload_parquet += start.elapsed();
|
||||
upload_tracker.push_uploaded_file(parquet_path);
|
||||
|
||||
if sst.index_metadata.file_size > 0 {
|
||||
@@ -172,10 +170,12 @@ impl WriteCache {
|
||||
let puffin_path = upload_request
|
||||
.dest_path_provider
|
||||
.build_index_file_path(RegionFileId::new(region_id, sst.file_id));
|
||||
let start = Instant::now();
|
||||
if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
|
||||
err = Some(e);
|
||||
break;
|
||||
}
|
||||
metrics.upload_puffin += start.elapsed();
|
||||
upload_tracker.push_uploaded_file(puffin_path);
|
||||
}
|
||||
}
|
||||
@@ -188,7 +188,7 @@ impl WriteCache {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(sst_info)
|
||||
Ok((sst_info, metrics))
|
||||
}
|
||||
|
||||
/// Removes a file from the cache by `index_key`.
|
||||
@@ -298,13 +298,7 @@ impl WriteCache {
|
||||
let file_type = index_key.file_type;
|
||||
let cache_path = self.file_cache.cache_file_path(index_key);
|
||||
|
||||
let timer = FLUSH_ELAPSED
|
||||
.with_label_values(&[match file_type {
|
||||
FileType::Parquet => "upload_parquet",
|
||||
FileType::Puffin => "upload_puffin",
|
||||
}])
|
||||
.start_timer();
|
||||
|
||||
let start = Instant::now();
|
||||
let cached_value = self
|
||||
.file_cache
|
||||
.local_store()
|
||||
@@ -348,11 +342,11 @@ impl WriteCache {
|
||||
UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
|
||||
|
||||
debug!(
|
||||
"Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
|
||||
"Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
|
||||
region_id,
|
||||
file_id,
|
||||
upload_path,
|
||||
timer.stop_and_record()
|
||||
start.elapsed(),
|
||||
);
|
||||
|
||||
let index_value = IndexValue {
|
||||
@@ -496,11 +490,11 @@ mod tests {
|
||||
};
|
||||
|
||||
// Write to cache and upload sst to mock remote store
|
||||
let sst_info = write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts)
|
||||
let (mut sst_infos, _) = write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0); //todo(hl): we assume it only creates one file.
|
||||
.unwrap();
|
||||
let sst_info = sst_infos.remove(0);
|
||||
|
||||
let file_id = sst_info.file_id;
|
||||
let sst_upload_path =
|
||||
@@ -591,11 +585,11 @@ mod tests {
|
||||
remote_store: mock_store.clone(),
|
||||
};
|
||||
|
||||
let sst_info = write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts)
|
||||
let (mut sst_infos, _) = write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
.unwrap();
|
||||
let sst_info = sst_infos.remove(0);
|
||||
let write_parquet_metadata = sst_info.file_metadata.unwrap();
|
||||
|
||||
// Read metadata from write cache
|
||||
@@ -671,7 +665,7 @@ mod tests {
|
||||
};
|
||||
|
||||
write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts)
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
|
||||
|
||||
@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::picker::{new_picker, PickerOutput};
|
||||
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
|
||||
@@ -352,7 +352,7 @@ impl Compactor for DefaultCompactor {
|
||||
}
|
||||
.build_sst_reader()
|
||||
.await?;
|
||||
let output_files = sst_layer
|
||||
let (sst_infos, metrics) = sst_layer
|
||||
.write_sst(
|
||||
SstWriteRequest {
|
||||
op_type: OperationType::Compact,
|
||||
@@ -367,8 +367,10 @@ impl Compactor for DefaultCompactor {
|
||||
bloom_filter_index_config,
|
||||
},
|
||||
&write_opts,
|
||||
WriteType::Compaction,
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
let output_files = sst_infos
|
||||
.into_iter()
|
||||
.map(|sst_info| FileMeta {
|
||||
region_id,
|
||||
@@ -386,9 +388,10 @@ impl Compactor for DefaultCompactor {
|
||||
let output_file_names =
|
||||
output_files.iter().map(|f| f.file_id.to_string()).join(",");
|
||||
info!(
|
||||
"Region {} compaction inputs: [{}], outputs: [{}]",
|
||||
region_id, input_file_names, output_file_names
|
||||
"Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
|
||||
region_id, input_file_names, output_file_names, metrics
|
||||
);
|
||||
metrics.observe();
|
||||
Ok(output_files)
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
|
||||
use strum::IntoStaticStr;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
|
||||
use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType};
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
@@ -345,6 +345,7 @@ impl RegionFlushTask {
|
||||
let mut file_metas = Vec::with_capacity(memtables.len());
|
||||
let mut flushed_bytes = 0;
|
||||
let mut series_count = 0;
|
||||
let mut flush_metrics = Metrics::new(WriteType::Flush);
|
||||
for mem in memtables {
|
||||
if mem.is_empty() {
|
||||
// Skip empty memtables.
|
||||
@@ -399,14 +400,15 @@ impl RegionFlushTask {
|
||||
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
|
||||
};
|
||||
|
||||
let ssts_written = self
|
||||
let (ssts_written, metrics) = self
|
||||
.access_layer
|
||||
.write_sst(write_request, &write_opts)
|
||||
.write_sst(write_request, &write_opts, WriteType::Flush)
|
||||
.await?;
|
||||
if ssts_written.is_empty() {
|
||||
// No data written.
|
||||
continue;
|
||||
}
|
||||
flush_metrics = flush_metrics.merge(metrics);
|
||||
|
||||
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
|
||||
flushed_bytes += sst_info.file_size;
|
||||
@@ -431,13 +433,15 @@ impl RegionFlushTask {
|
||||
|
||||
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
|
||||
info!(
|
||||
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}s",
|
||||
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}",
|
||||
self.region_id,
|
||||
self.reason.as_str(),
|
||||
file_ids,
|
||||
series_count,
|
||||
timer.stop_and_record(),
|
||||
flush_metrics,
|
||||
);
|
||||
flush_metrics.observe();
|
||||
|
||||
let edit = RegionEdit {
|
||||
files_to_add: file_metas,
|
||||
|
||||
@@ -119,6 +119,14 @@ lazy_static! {
|
||||
|
||||
// Compaction metrics
|
||||
/// Timer of different stages in compaction.
|
||||
/// - pick
|
||||
/// - merge (in parallel)
|
||||
/// - iter_source
|
||||
/// - write_batch
|
||||
/// - update_index
|
||||
/// - upload_parquet
|
||||
/// - upload puffin
|
||||
/// - write_manifest
|
||||
pub static ref COMPACTION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_mito_compaction_stage_elapsed",
|
||||
"mito compaction stage elapsed",
|
||||
|
||||
@@ -106,7 +106,9 @@ mod tests {
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
|
||||
use super::*;
|
||||
use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory};
|
||||
use crate::access_layer::{
|
||||
FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
|
||||
};
|
||||
use crate::cache::{CacheManager, CacheStrategy, PageKey};
|
||||
use crate::read::{BatchBuilder, BatchReader};
|
||||
use crate::region::options::{IndexOptions, InvertedIndexOptions};
|
||||
@@ -177,6 +179,7 @@ mod tests {
|
||||
metadata.clone(),
|
||||
NoopIndexBuilder,
|
||||
file_path,
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -239,6 +242,7 @@ mod tests {
|
||||
FixedPathProvider {
|
||||
region_file_id: handle.file_id(),
|
||||
},
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -318,6 +322,7 @@ mod tests {
|
||||
FixedPathProvider {
|
||||
region_file_id: handle.file_id(),
|
||||
},
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -365,6 +370,7 @@ mod tests {
|
||||
FixedPathProvider {
|
||||
region_file_id: handle.file_id(),
|
||||
},
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
writer
|
||||
@@ -422,6 +428,7 @@ mod tests {
|
||||
FixedPathProvider {
|
||||
region_file_id: handle.file_id(),
|
||||
},
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
writer
|
||||
@@ -464,6 +471,7 @@ mod tests {
|
||||
FixedPathProvider {
|
||||
region_file_id: handle.file_id(),
|
||||
},
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -617,6 +625,7 @@ mod tests {
|
||||
metadata.clone(),
|
||||
NoopIndexBuilder,
|
||||
path_provider,
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -692,6 +701,7 @@ mod tests {
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -979,6 +989,7 @@ mod tests {
|
||||
metadata.clone(),
|
||||
NoopIndexBuilder,
|
||||
file_path,
|
||||
Metrics::new(WriteType::Flush),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use common_time::Timestamp;
|
||||
@@ -38,7 +39,7 @@ use store_api::storage::SequenceNumber;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
|
||||
|
||||
use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner};
|
||||
use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
|
||||
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
|
||||
use crate::read::{Batch, Source};
|
||||
use crate::sst::file::{FileId, RegionFileId};
|
||||
@@ -65,6 +66,8 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
|
||||
bytes_written: Arc<AtomicUsize>,
|
||||
/// Cleaner to remove temp files on failure.
|
||||
file_cleaner: Option<TempFileCleaner>,
|
||||
/// Write metrics
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
pub trait WriterFactory {
|
||||
@@ -100,12 +103,14 @@ where
|
||||
metadata: RegionMetadataRef,
|
||||
indexer_builder: I,
|
||||
path_provider: P,
|
||||
metrics: Metrics,
|
||||
) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
|
||||
ParquetWriter::new(
|
||||
ObjectStoreWriterFactory { object_store },
|
||||
metadata,
|
||||
indexer_builder,
|
||||
path_provider,
|
||||
metrics,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -128,6 +133,7 @@ where
|
||||
metadata: RegionMetadataRef,
|
||||
indexer_builder: I,
|
||||
path_provider: P,
|
||||
metrics: Metrics,
|
||||
) -> ParquetWriter<F, I, P> {
|
||||
let init_file = FileId::random();
|
||||
let indexer = indexer_builder.build(init_file).await;
|
||||
@@ -142,6 +148,7 @@ where
|
||||
current_indexer: Some(indexer),
|
||||
bytes_written: Arc::new(AtomicUsize::new(0)),
|
||||
file_cleaner: None,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,12 +241,14 @@ where
|
||||
match res {
|
||||
Ok(mut batch) => {
|
||||
stats.update(&batch);
|
||||
let start = Instant::now();
|
||||
// safety: self.current_indexer must be set when first batch has been written.
|
||||
self.current_indexer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.update(&mut batch)
|
||||
.await;
|
||||
self.metrics.update_index += start.elapsed();
|
||||
if let Some(max_file_size) = opts.max_file_size
|
||||
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
|
||||
{
|
||||
@@ -286,16 +295,21 @@ where
|
||||
write_format: &WriteFormat,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<Option<Batch>> {
|
||||
let start = Instant::now();
|
||||
let Some(batch) = source.next_batch().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
self.metrics.iter_source += start.elapsed();
|
||||
|
||||
let arrow_batch = write_format.convert_batch(&batch)?;
|
||||
|
||||
let start = Instant::now();
|
||||
self.maybe_init_writer(write_format.arrow_schema(), opts)
|
||||
.await?
|
||||
.write(&arrow_batch)
|
||||
.await
|
||||
.context(WriteParquetSnafu)?;
|
||||
self.metrics.write_batch += start.elapsed();
|
||||
Ok(Some(batch))
|
||||
}
|
||||
|
||||
@@ -340,6 +354,11 @@ where
|
||||
Ok(self.writer.as_mut().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes write and return the collected metrics.
|
||||
pub fn into_metrics(self) -> Metrics {
|
||||
self.metrics
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
Reference in New Issue
Block a user