diff --git a/Cargo.lock b/Cargo.lock index b042227a29..7a5dabdef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4986,8 +4986,10 @@ dependencies = [ "object-store", "parquet", "paste", + "pin-project", "prometheus", "prost 0.12.3", + "puffin", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index a3413aa9d4..e6ebf7c620 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,6 +180,7 @@ operator = { path = "src/operator" } partition = { path = "src/partition" } plugins = { path = "src/plugins" } promql = { path = "src/promql" } +puffin = { path = "src/puffin" } query = { path = "src/query" } script = { path = "src/script" } servers = { path = "src/servers" } diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index e87472bdeb..0dd2f7a547 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -14,6 +14,8 @@ mod predicates_apply; +use std::collections::BTreeSet; + use async_trait::async_trait; pub use predicates_apply::PredicatesIndexApplier; @@ -24,15 +26,16 @@ use crate::inverted_index::format::reader::InvertedIndexReader; /// /// Applier instances are reusable and work with various `InvertedIndexReader` instances, /// avoiding repeated compilation of fixed predicates such as regex patterns. +#[mockall::automock] #[async_trait] pub trait IndexApplier { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). - async fn apply( + async fn apply<'a>( &self, context: SearchContext, - reader: &mut dyn InvertedIndexReader, - ) -> Result>; + reader: &mut (dyn InvertedIndexReader + 'a), + ) -> Result>; /// Returns the memory usage of the applier. fn memory_usage(&self) -> usize; diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index addf1a0744..aba2f8c999 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; use std::mem::size_of; use async_trait::async_trait; @@ -43,11 +44,11 @@ pub struct PredicatesIndexApplier { impl IndexApplier for PredicatesIndexApplier { /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual /// bitmaps obtained for each index to result in a final set of indices. - async fn apply( + async fn apply<'a>( &self, context: SearchContext, - reader: &mut dyn InvertedIndexReader, - ) -> Result> { + reader: &mut (dyn InvertedIndexReader + 'a), + ) -> Result> { let metadata = reader.metadata().await?; let mut bitmap = Self::bitmap_full_range(&metadata); @@ -60,7 +61,7 @@ impl IndexApplier for PredicatesIndexApplier { let Some(meta) = metadata.metas.get(name) else { match context.index_not_found_strategy { IndexNotFoundStrategy::ReturnEmpty => { - return Ok(vec![]); + return Ok(BTreeSet::default()); } IndexNotFoundStrategy::Ignore => { continue; @@ -209,7 +210,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 2, 4, 6]); + assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6])); // An index reader with a single tag "tag-0" but without value "tag-0_value-0" let mut mock_reader = MockInvertedIndexReader::new(); @@ -263,7 +264,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 4, 6]); + assert_eq!(indices, BTreeSet::from_iter([0, 4, 6])); } #[tokio::test] @@ -281,7 +282,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan + assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan } #[tokio::test] @@ -353,7 +354,7 @@ mod tests { ) .await .unwrap(); - assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); + assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index e4868b0475..917785fcff 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -48,8 +48,10 @@ num_cpus = "1.13" object-store.workspace = true parquet = { workspace = true, features = ["async"] } paste.workspace = true +pin-project.workspace = true prometheus.workspace = true prost.workspace = true +puffin.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 4c0e8bbde5..1f4fd731ef 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -14,13 +14,14 @@ use std::sync::Arc; -use object_store::{util, ObjectStore}; +use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use crate::error::{DeleteSstSnafu, Result}; use crate::read::Source; use crate::sst::file::{FileHandle, FileId}; +use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -61,7 +62,7 @@ impl AccessLayer { /// Deletes a SST file with given file id. pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> { - let path = self.sst_file_path(&file_id.as_parquet()); + let path = location::sst_file_path(&self.region_dir, file_id); self.object_store .delete(&path) .await @@ -81,12 +82,7 @@ impl AccessLayer { metadata: RegionMetadataRef, source: Source, ) -> ParquetWriter { - let path = self.sst_file_path(&file_id.as_parquet()); + let path = location::sst_file_path(&self.region_dir, file_id); ParquetWriter::new(path, metadata, source, self.object_store.clone()) } - - /// Returns the `file_path` for the `file_name` in the object store. - fn sst_file_path(&self, file_name: &str) -> String { - util::join_path(&self.region_dir, file_name) - } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 68a35123ea..044a4be584 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -440,6 +440,33 @@ pub enum Error { source: datatypes::error::Error, location: Location, }, + + #[snafu(display("Failed to apply index"))] + ApplyIndex { + #[snafu(source)] + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Failed to read puffin metadata"))] + PuffinReadMetadata { + #[snafu(source)] + source: puffin::error::Error, + location: Location, + }, + + #[snafu(display("Failed to read puffin blob"))] + PuffinReadBlob { + #[snafu(source)] + source: puffin::error::Error, + location: Location, + }, + + #[snafu(display("Blob type not found, blob_type: {blob_type}"))] + PuffinBlobTypeNotFound { + blob_type: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -477,6 +504,7 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } + | PuffinBlobTypeNotFound { .. } | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } @@ -522,8 +550,11 @@ impl ErrorExt for Error { JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, - BuildIndexApplier { source, .. } => source.status_code(), ConvertValue { source, .. } => source.status_code(), + BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(), + PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => { + source.status_code() + } } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index d53cbd495d..aa407bfdbc 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage"; pub const TYPE_LABEL: &str = "type"; /// Reason to flush. pub const FLUSH_REASON: &str = "reason"; +/// File type label. +pub const FILE_TYPE_LABEL: &str = "file_type"; lazy_static! { /// Global write buffer size in bytes. @@ -143,4 +145,50 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + // ------- End of cache metrics. + + // Index metrics. + /// Timer of index application. + pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!( + "index_apply_elapsed", + "index apply elapsed", + ) + .unwrap(); + /// Gauge of index apply memory usage. + pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!( + "index_apply_memory_usage", + "index apply memory usage", + ) + .unwrap(); + /// Counter of r/w bytes on index related IO operations. + pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "index_io_bytes_total", + "index io bytes total", + &[TYPE_LABEL, FILE_TYPE_LABEL] + ) + .unwrap(); + /// Counter of read bytes on puffin files. + pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL + .with_label_values(&["read", "puffin"]); + + /// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush. + pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!( + "index_io_op_total", + "index io op total", + &[TYPE_LABEL, FILE_TYPE_LABEL] + ) + .unwrap(); + /// Counter of read operations on puffin files. + pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["read", "puffin"]); + /// Counter of seek operations on puffin files. + pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["seek", "puffin"]); + /// Counter of write operations on puffin files. + pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["write", "puffin"]); + /// Counter of flush operations on puffin files. + pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["flush", "puffin"]); + // ------- End of index metrics. } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 55939c2d24..94e0cb205b 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -16,6 +16,7 @@ pub mod file; pub mod file_purger; -mod index; +pub mod index; +pub mod location; pub mod parquet; pub(crate) mod version; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index a16987690d..d32133d1bc 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -20,13 +20,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use common_time::Timestamp; -use object_store::util::join_path; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use store_api::storage::RegionId; use uuid::Uuid; use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; +use crate::sst::location; /// Type to store SST level. pub type Level = u8; @@ -57,6 +57,11 @@ impl FileId { pub fn as_parquet(&self) -> String { format!("{}{}", self, ".parquet") } + + /// Append `.puffin` to file id to make a complete file name + pub fn as_puffin(&self) -> String { + format!("{}{}", self, ".puffin") + } } impl fmt::Display for FileId { @@ -131,7 +136,7 @@ impl FileHandle { /// Returns the complete file path of the file. pub fn file_path(&self, file_dir: &str) -> String { - join_path(file_dir, &self.file_id().as_parquet()) + location::sst_file_path(file_dir, self.file_id()) } /// Returns the time range of the file. diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 15c3df6cc7..059b1956d7 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -102,12 +102,13 @@ impl FilePurger for LocalFilePurger { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::{util, ObjectStore}; + use object_store::ObjectStore; use super::*; use crate::access_layer::AccessLayer; use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange}; + use crate::sst::location; #[tokio::test] async fn test_file_purge() { @@ -119,7 +120,7 @@ mod tests { let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_id = FileId::random(); let sst_dir = "table1"; - let path = util::join_path(sst_dir, &sst_file_id.as_parquet()); + let path = location::sst_file_path(sst_dir, sst_file_id); object_store.write(&path, vec![0; 4096]).await.unwrap(); @@ -145,9 +146,6 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store - .is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet())) - .await - .unwrap()); + assert!(!object_store.is_exist(&path).await.unwrap()); } } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index baffda27aa..1f89612dee 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -16,3 +16,6 @@ pub mod applier; mod codec; +mod store; + +const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 95ca25ba00..0efc3f8e6a 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -14,8 +14,29 @@ pub mod builder; -use index::inverted_index::search::index_apply::IndexApplier; +use std::collections::BTreeSet; + +use futures::{AsyncRead, AsyncSeek}; +use index::inverted_index::format::reader::InvertedIndexBlobReader; +use index::inverted_index::search::index_apply::{ + IndexApplier, IndexNotFoundStrategy, SearchContext, +}; use object_store::ObjectStore; +use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, + Result, +}; +use crate::metrics::{ + INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, + INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL, +}; +use crate::sst::file::FileId; +use crate::sst::index::store::InstrumentedStore; +use crate::sst::index::INDEX_BLOB_TYPE; +use crate::sst::location; /// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. @@ -23,8 +44,8 @@ pub struct SstIndexApplier { /// The root directory of the region. region_dir: String, - /// Object store responsible for accessing SST files. - object_store: ObjectStore, + /// Store responsible for accessing SST files. + store: InstrumentedStore, /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. @@ -38,10 +59,149 @@ impl SstIndexApplier { object_store: ObjectStore, index_applier: Box, ) -> Self { + INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); + Self { region_dir, - object_store, + store: InstrumentedStore::new(object_store), index_applier, } } + + /// Applies predicates to the provided SST file id and returns the relevant row group ids + pub async fn apply(&self, file_id: FileId) -> Result> { + let _timer = INDEX_APPLY_ELAPSED.start_timer(); + + let mut puffin_reader = self.puffin_reader(file_id).await?; + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + + let context = SearchContext { + // Encountering a non-existing column indicates that it doesn't match predicates. + index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, + }; + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + + /// Helper function to create a [`PuffinFileReader`] for the provided SST file id. + async fn puffin_reader( + &self, + file_id: FileId, + ) -> Result> { + let file_path = location::index_file_path(&self.region_dir, file_id); + let file_reader = self + .store + .reader( + &file_path, + &INDEX_PUFFIN_READ_BYTES_TOTAL, + &INDEX_PUFFIN_READ_OP_TOTAL, + &INDEX_PUFFIN_SEEK_OP_TOTAL, + ) + .await?; + Ok(PuffinFileReader::new(file_reader)) + } + + /// Helper function to create a [`PuffinBlobReader`] for the index blob of the provided index file reader. + async fn index_blob_reader( + puffin_reader: &mut PuffinFileReader, + ) -> Result { + let file_meta = puffin_reader + .metadata() + .await + .context(PuffinReadMetadataSnafu)?; + let blob_meta = file_meta + .blobs + .iter() + .find(|blob| blob.blob_type == INDEX_BLOB_TYPE) + .context(PuffinBlobTypeNotFoundSnafu { + blob_type: INDEX_BLOB_TYPE, + })?; + puffin_reader + .blob_reader(blob_meta) + .context(PuffinReadBlobSnafu) + } +} + +impl Drop for SstIndexApplier { + fn drop(&mut self) { + INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64); + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use index::inverted_index::search::index_apply::MockIndexApplier; + use object_store::services::Memory; + use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; + + use super::*; + use crate::error::Error; + + #[tokio::test] + async fn test_index_applier_apply_basic() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_id = FileId::random(); + let region_dir = "region_dir".to_string(); + let path = location::index_file_path(®ion_dir, file_id); + + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + puffin_writer + .add_blob(Blob { + blob_type: INDEX_BLOB_TYPE.to_string(), + data: Cursor::new(vec![]), + properties: Default::default(), + }) + .await + .unwrap(); + puffin_writer.finish().await.unwrap(); + + let mut mock_index_applier = MockIndexApplier::new(); + mock_index_applier.expect_memory_usage().returning(|| 100); + mock_index_applier + .expect_apply() + .returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3]))); + + let sst_index_applier = SstIndexApplier::new( + region_dir.clone(), + object_store, + Box::new(mock_index_applier), + ); + let ids = sst_index_applier.apply(file_id).await.unwrap(); + assert_eq!(ids, BTreeSet::from_iter([1, 2, 3])); + } + + #[tokio::test] + async fn test_index_applier_apply_invalid_blob_type() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_id = FileId::random(); + let region_dir = "region_dir".to_string(); + let path = location::index_file_path(®ion_dir, file_id); + + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + puffin_writer + .add_blob(Blob { + blob_type: "invalid_blob_type".to_string(), + data: Cursor::new(vec![]), + properties: Default::default(), + }) + .await + .unwrap(); + puffin_writer.finish().await.unwrap(); + + let mut mock_index_applier = MockIndexApplier::new(); + mock_index_applier.expect_memory_usage().returning(|| 100); + mock_index_applier.expect_apply().never(); + + let sst_index_applier = SstIndexApplier::new( + region_dir.clone(), + object_store, + Box::new(mock_index_applier), + ); + let res = sst_index_applier.apply(file_id).await; + assert!(matches!(res, Err(Error::PuffinBlobTypeNotFound { .. }))); + } } diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs new file mode 100644 index 0000000000..dbdbcbf780 --- /dev/null +++ b/src/mito2/src/sst/index/store.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use object_store::ObjectStore; +use pin_project::pin_project; +use prometheus::IntCounter; +use snafu::ResultExt; + +use crate::error::{OpenDalSnafu, Result}; + +/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring +/// metrics such as bytes read, bytes written, and the number of seek operations. +#[derive(Clone)] +pub(crate) struct InstrumentedStore { + /// The underlying object store. + object_store: ObjectStore, +} + +impl InstrumentedStore { + /// Create a new `InstrumentedStore`. + pub fn new(object_store: ObjectStore) -> Self { + Self { object_store } + } + + /// Returns an [`InstrumentedAsyncRead`] for the given path. + /// Metrics like the number of bytes read, read and seek operations + /// are recorded using the provided `IntCounter`s. + pub async fn reader<'a>( + &self, + path: &str, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, + seek_count: &'a IntCounter, + ) -> Result> { + let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncRead::new( + reader, + read_byte_count, + read_count, + seek_count, + )) + } + + /// Returns an [`InstrumentedAsyncWrite`] for the given path. + /// Metrics like the number of bytes written, write and flush operations + /// are recorded using the provided `IntCounter`s. + pub async fn writer<'a>( + &self, + path: &str, + write_byte_count: &'a IntCounter, + write_count: &'a IntCounter, + flush_count: &'a IntCounter, + ) -> Result> { + let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncWrite::new( + writer, + write_byte_count, + write_count, + flush_count, + )) + } +} + +/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring +#[pin_project] +pub(crate) struct InstrumentedAsyncRead<'a, R> { + #[pin] + inner: R, + read_byte_count: CounterGuard<'a>, + read_count: CounterGuard<'a>, + seek_count: CounterGuard<'a>, +} + +impl<'a, R> InstrumentedAsyncRead<'a, R> { + /// Create a new `InstrumentedAsyncRead`. + fn new( + inner: R, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, + seek_count: &'a IntCounter, + ) -> Self { + Self { + inner, + read_byte_count: CounterGuard::new(read_byte_count), + read_count: CounterGuard::new(read_count), + seek_count: CounterGuard::new(seek_count), + } + } +} + +impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_read(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.read_count.inc_by(1); + self.read_byte_count.inc_by(*n); + } + poll + } +} + +impl<'a, R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'a, R> { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: io::SeekFrom, + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_seek(cx, pos); + if let Poll::Ready(Ok(_)) = &poll { + self.seek_count.inc_by(1); + } + poll + } +} + +/// A wrapper around [`AsyncWrite`] that adds instrumentation for monitoring +#[pin_project] +pub(crate) struct InstrumentedAsyncWrite<'a, W> { + #[pin] + inner: W, + write_byte_count: CounterGuard<'a>, + write_count: CounterGuard<'a>, + flush_count: CounterGuard<'a>, +} + +impl<'a, W> InstrumentedAsyncWrite<'a, W> { + /// Create a new `InstrumentedAsyncWrite`. + fn new( + inner: W, + write_byte_count: &'a IntCounter, + write_count: &'a IntCounter, + flush_count: &'a IntCounter, + ) -> Self { + Self { + inner, + write_byte_count: CounterGuard::new(write_byte_count), + write_count: CounterGuard::new(write_count), + flush_count: CounterGuard::new(flush_count), + } + } +} + +impl<'a, W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'a, W> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_write(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.write_count.inc_by(1); + self.write_byte_count.inc_by(*n); + } + poll + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.as_mut().project().inner.poll_flush(cx); + if let Poll::Ready(Ok(())) = &poll { + self.flush_count.inc_by(1); + } + poll + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} + +/// A guard that increments a counter when dropped. +struct CounterGuard<'a> { + count: usize, + counter: &'a IntCounter, +} + +impl<'a> CounterGuard<'a> { + /// Create a new `CounterGuard`. + fn new(counter: &'a IntCounter) -> Self { + Self { count: 0, counter } + } + + /// Increment the counter by `n`. + fn inc_by(&mut self, n: usize) { + self.count += n; + } +} + +impl<'a> Drop for CounterGuard<'a> { + fn drop(&mut self) { + if self.count > 0 { + self.counter.inc_by(self.count as _); + } + } +} + +#[cfg(test)] +mod tests { + use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + use object_store::services::Memory; + + use super::*; + + #[tokio::test] + async fn test_instrumented_store_read_write() { + let instrumented_store = + InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish()); + + let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap(); + let read_count = IntCounter::new("read_count", "read_count").unwrap(); + let seek_count = IntCounter::new("seek_count", "seek_count").unwrap(); + let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap(); + let write_count = IntCounter::new("write_count", "write_count").unwrap(); + let flush_count = IntCounter::new("flush_count", "flush_count").unwrap(); + + let mut writer = instrumented_store + .writer("my_file", &write_byte_count, &write_count, &flush_count) + .await + .unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + drop(writer); + + let mut reader = instrumented_store + .reader("my_file", &read_byte_count, &read_count, &seek_count) + .await + .unwrap(); + let mut buf = vec![0; 5]; + reader.read_exact(&mut buf).await.unwrap(); + reader.seek(io::SeekFrom::Start(0)).await.unwrap(); + reader.read_exact(&mut buf).await.unwrap(); + drop(reader); + + assert_eq!(read_byte_count.get(), 10); + assert_eq!(read_count.get(), 2); + assert_eq!(seek_count.get(), 1); + assert_eq!(write_byte_count.get(), 5); + assert_eq!(write_count.get(), 1); + assert_eq!(flush_count.get(), 1); + } +} diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs new file mode 100644 index 0000000000..179e9159c9 --- /dev/null +++ b/src/mito2/src/sst/location.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::util; + +use crate::sst::file::FileId; + +/// Returns the path of the SST file in the object store: +/// `{region_dir}/{sst_file_id}.parquet` +pub fn sst_file_path(region_dir: &str, sst_file_id: FileId) -> String { + util::join_path(region_dir, &sst_file_id.as_parquet()) +} + +/// Returns the path of the index file in the object store: +/// `{region_dir}/index/{sst_file_id}.puffin` +pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String { + let dir = util::join_dir(region_dir, "index"); + util::join_path(&dir, &sst_file_id.as_puffin()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sst_file_path() { + let file_id = FileId::random(); + assert_eq!( + sst_file_path("region_dir", file_id), + format!("region_dir/{file_id}.parquet") + ); + } + + #[test] + fn test_index_file_path() { + let file_id = FileId::random(); + assert_eq!( + index_file_path("region_dir", file_id), + format!("region_dir/index/{file_id}.puffin") + ); + } +}