feat(inverted_index): implement apply for SstIndexApplier (#3088)

* feat(inverted_index): implement apply for SstIndexApplier

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename metrics

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-01-04 15:33:03 +08:00
committed by GitHub
parent 284a496f54
commit d973cf81f0
15 changed files with 598 additions and 33 deletions

2
Cargo.lock generated
View File

@@ -4986,8 +4986,10 @@ dependencies = [
"object-store",
"parquet",
"paste",
"pin-project",
"prometheus",
"prost 0.12.3",
"puffin",
"regex",
"serde",
"serde_json",

View File

@@ -180,6 +180,7 @@ operator = { path = "src/operator" }
partition = { path = "src/partition" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }
query = { path = "src/query" }
script = { path = "src/script" }
servers = { path = "src/servers" }

View File

@@ -14,6 +14,8 @@
mod predicates_apply;
use std::collections::BTreeSet;
use async_trait::async_trait;
pub use predicates_apply::PredicatesIndexApplier;
@@ -24,15 +26,16 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
pub trait IndexApplier {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>>;
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>>;
/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::mem::size_of;
use async_trait::async_trait;
@@ -43,11 +44,11 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>> {
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>> {
let metadata = reader.metadata().await?;
let mut bitmap = Self::bitmap_full_range(&metadata);
@@ -60,7 +61,7 @@ impl IndexApplier for PredicatesIndexApplier {
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(vec![]);
return Ok(BTreeSet::default());
}
IndexNotFoundStrategy::Ignore => {
continue;
@@ -209,7 +210,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 2, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6]));
// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
@@ -263,7 +264,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 4, 6]));
}
#[tokio::test]
@@ -281,7 +282,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan
}
#[tokio::test]
@@ -353,7 +354,7 @@ mod tests {
)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]);
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7]));
}
#[test]

View File

@@ -48,8 +48,10 @@ num_cpus = "1.13"
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true

View File

@@ -14,13 +14,14 @@
use std::sync::Arc;
use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use crate::error::{DeleteSstSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
@@ -61,7 +62,7 @@ impl AccessLayer {
/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
self.object_store
.delete(&path)
.await
@@ -81,12 +82,7 @@ impl AccessLayer {
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}
/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
}
}

View File

@@ -440,6 +440,33 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},
#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},
#[snafu(display("Failed to read puffin blob"))]
PuffinReadBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},
#[snafu(display("Blob type not found, blob_type: {blob_type}"))]
PuffinBlobTypeNotFound {
blob_type: String,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -477,6 +504,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
@@ -522,8 +550,11 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
BuildIndexApplier { source, .. } => source.status_code(),
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
}
}

View File

@@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage";
pub const TYPE_LABEL: &str = "type";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";
lazy_static! {
/// Global write buffer size in bytes.
@@ -143,4 +145,50 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
// ------- End of cache metrics.
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!(
"index_apply_elapsed",
"index apply elapsed",
)
.unwrap();
/// Gauge of index apply memory usage.
pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
"index_apply_memory_usage",
"index apply memory usage",
)
.unwrap();
/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_bytes_total",
"index io bytes total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_op_total",
"index io op total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read operations on puffin files.
pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of seek operations on puffin files.
pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "puffin"]);
/// Counter of write operations on puffin files.
pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
// ------- End of index metrics.
}

View File

@@ -16,6 +16,7 @@
pub mod file;
pub mod file_purger;
mod index;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;

View File

@@ -20,13 +20,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_time::Timestamp;
use object_store::util::join_path;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::sst::location;
/// Type to store SST level.
pub type Level = u8;
@@ -57,6 +57,11 @@ impl FileId {
pub fn as_parquet(&self) -> String {
format!("{}{}", self, ".parquet")
}
/// Append `.puffin` to file id to make a complete file name
pub fn as_puffin(&self) -> String {
format!("{}{}", self, ".puffin")
}
}
impl fmt::Display for FileId {
@@ -131,7 +136,7 @@ impl FileHandle {
/// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str) -> String {
join_path(file_dir, &self.file_id().as_parquet())
location::sst_file_path(file_dir, self.file_id())
}
/// Returns the time range of the file.

View File

@@ -102,12 +102,13 @@ impl FilePurger for LocalFilePurger {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};
use crate::sst::location;
#[tokio::test]
async fn test_file_purge() {
@@ -119,7 +120,7 @@ mod tests {
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = util::join_path(sst_dir, &sst_file_id.as_parquet());
let path = location::sst_file_path(sst_dir, sst_file_id);
object_store.write(&path, vec![0; 4096]).await.unwrap();
@@ -145,9 +146,6 @@ mod tests {
scheduler.stop(true).await.unwrap();
assert!(!object_store
.is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet()))
.await
.unwrap());
assert!(!object_store.is_exist(&path).await.unwrap());
}
}

View File

@@ -16,3 +16,6 @@
pub mod applier;
mod codec;
mod store;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

View File

@@ -14,8 +14,29 @@
pub mod builder;
use index::inverted_index::search::index_apply::IndexApplier;
use std::collections::BTreeSet;
use futures::{AsyncRead, AsyncSeek};
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};
use crate::error::{
ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu,
Result,
};
use crate::metrics::{
INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;
/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
@@ -23,8 +44,8 @@ pub struct SstIndexApplier {
/// The root directory of the region.
region_dir: String,
/// Object store responsible for accessing SST files.
object_store: ObjectStore,
/// Store responsible for accessing SST files.
store: InstrumentedStore,
/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
@@ -38,10 +59,149 @@ impl SstIndexApplier {
object_store: ObjectStore,
index_applier: Box<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
Self {
region_dir,
object_store,
store: InstrumentedStore::new(object_store),
index_applier,
}
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<usize>> {
let _timer = INDEX_APPLY_ELAPSED.start_timer();
let mut puffin_reader = self.puffin_reader(file_id).await?;
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
}
/// Helper function to create a [`PuffinFileReader`] for the provided SST file id.
async fn puffin_reader(
&self,
file_id: FileId,
) -> Result<PuffinFileReader<impl AsyncRead + AsyncSeek>> {
let file_path = location::index_file_path(&self.region_dir, file_id);
let file_reader = self
.store
.reader(
&file_path,
&INDEX_PUFFIN_READ_BYTES_TOTAL,
&INDEX_PUFFIN_READ_OP_TOTAL,
&INDEX_PUFFIN_SEEK_OP_TOTAL,
)
.await?;
Ok(PuffinFileReader::new(file_reader))
}
/// Helper function to create a [`PuffinBlobReader`] for the index blob of the provided index file reader.
async fn index_blob_reader(
puffin_reader: &mut PuffinFileReader<impl AsyncRead + AsyncSeek + Unpin + Send>,
) -> Result<impl AsyncRead + AsyncSeek + '_> {
let file_meta = puffin_reader
.metadata()
.await
.context(PuffinReadMetadataSnafu)?;
let blob_meta = file_meta
.blobs
.iter()
.find(|blob| blob.blob_type == INDEX_BLOB_TYPE)
.context(PuffinBlobTypeNotFoundSnafu {
blob_type: INDEX_BLOB_TYPE,
})?;
puffin_reader
.blob_reader(blob_meta)
.context(PuffinReadBlobSnafu)
}
}
impl Drop for SstIndexApplier {
fn drop(&mut self) {
INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
}
}
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use index::inverted_index::search::index_apply::MockIndexApplier;
use object_store::services::Memory;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use super::*;
use crate::error::Error;
#[tokio::test]
async fn test_index_applier_apply_basic() {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
puffin_writer
.add_blob(Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
data: Cursor::new(vec![]),
properties: Default::default(),
})
.await
.unwrap();
puffin_writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier
.expect_apply()
.returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3])));
let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
object_store,
Box::new(mock_index_applier),
);
let ids = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(ids, BTreeSet::from_iter([1, 2, 3]));
}
#[tokio::test]
async fn test_index_applier_apply_invalid_blob_type() {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap());
puffin_writer
.add_blob(Blob {
blob_type: "invalid_blob_type".to_string(),
data: Cursor::new(vec![]),
properties: Default::default(),
})
.await
.unwrap();
puffin_writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().never();
let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
object_store,
Box::new(mock_index_applier),
);
let res = sst_index_applier.apply(file_id).await;
assert!(matches!(res, Err(Error::PuffinBlobTypeNotFound { .. })));
}
}

View File

@@ -0,0 +1,261 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use object_store::ObjectStore;
use pin_project::pin_project;
use prometheus::IntCounter;
use snafu::ResultExt;
use crate::error::{OpenDalSnafu, Result};
/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring
/// metrics such as bytes read, bytes written, and the number of seek operations.
#[derive(Clone)]
pub(crate) struct InstrumentedStore {
/// The underlying object store.
object_store: ObjectStore,
}
impl InstrumentedStore {
/// Create a new `InstrumentedStore`.
pub fn new(object_store: ObjectStore) -> Self {
Self { object_store }
}
/// Returns an [`InstrumentedAsyncRead`] for the given path.
/// Metrics like the number of bytes read, read and seek operations
/// are recorded using the provided `IntCounter`s.
pub async fn reader<'a>(
&self,
path: &str,
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
seek_count: &'a IntCounter,
) -> Result<InstrumentedAsyncRead<'a, object_store::Reader>> {
let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(
reader,
read_byte_count,
read_count,
seek_count,
))
}
/// Returns an [`InstrumentedAsyncWrite`] for the given path.
/// Metrics like the number of bytes written, write and flush operations
/// are recorded using the provided `IntCounter`s.
pub async fn writer<'a>(
&self,
path: &str,
write_byte_count: &'a IntCounter,
write_count: &'a IntCounter,
flush_count: &'a IntCounter,
) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncWrite::new(
writer,
write_byte_count,
write_count,
flush_count,
))
}
}
/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring
#[pin_project]
pub(crate) struct InstrumentedAsyncRead<'a, R> {
#[pin]
inner: R,
read_byte_count: CounterGuard<'a>,
read_count: CounterGuard<'a>,
seek_count: CounterGuard<'a>,
}
impl<'a, R> InstrumentedAsyncRead<'a, R> {
/// Create a new `InstrumentedAsyncRead`.
fn new(
inner: R,
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
seek_count: &'a IntCounter,
) -> Self {
Self {
inner,
read_byte_count: CounterGuard::new(read_byte_count),
read_count: CounterGuard::new(read_count),
seek_count: CounterGuard::new(seek_count),
}
}
}
impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let poll = self.as_mut().project().inner.poll_read(cx, buf);
if let Poll::Ready(Ok(n)) = &poll {
self.read_count.inc_by(1);
self.read_byte_count.inc_by(*n);
}
poll
}
}
impl<'a, R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'a, R> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
let poll = self.as_mut().project().inner.poll_seek(cx, pos);
if let Poll::Ready(Ok(_)) = &poll {
self.seek_count.inc_by(1);
}
poll
}
}
/// A wrapper around [`AsyncWrite`] that adds instrumentation for monitoring
#[pin_project]
pub(crate) struct InstrumentedAsyncWrite<'a, W> {
#[pin]
inner: W,
write_byte_count: CounterGuard<'a>,
write_count: CounterGuard<'a>,
flush_count: CounterGuard<'a>,
}
impl<'a, W> InstrumentedAsyncWrite<'a, W> {
/// Create a new `InstrumentedAsyncWrite`.
fn new(
inner: W,
write_byte_count: &'a IntCounter,
write_count: &'a IntCounter,
flush_count: &'a IntCounter,
) -> Self {
Self {
inner,
write_byte_count: CounterGuard::new(write_byte_count),
write_count: CounterGuard::new(write_count),
flush_count: CounterGuard::new(flush_count),
}
}
}
impl<'a, W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'a, W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let poll = self.as_mut().project().inner.poll_write(cx, buf);
if let Poll::Ready(Ok(n)) = &poll {
self.write_count.inc_by(1);
self.write_byte_count.inc_by(*n);
}
poll
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let poll = self.as_mut().project().inner.poll_flush(cx);
if let Poll::Ready(Ok(())) = &poll {
self.flush_count.inc_by(1);
}
poll
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}
/// A guard that increments a counter when dropped.
struct CounterGuard<'a> {
count: usize,
counter: &'a IntCounter,
}
impl<'a> CounterGuard<'a> {
/// Create a new `CounterGuard`.
fn new(counter: &'a IntCounter) -> Self {
Self { count: 0, counter }
}
/// Increment the counter by `n`.
fn inc_by(&mut self, n: usize) {
self.count += n;
}
}
impl<'a> Drop for CounterGuard<'a> {
fn drop(&mut self) {
if self.count > 0 {
self.counter.inc_by(self.count as _);
}
}
}
#[cfg(test)]
mod tests {
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use object_store::services::Memory;
use super::*;
#[tokio::test]
async fn test_instrumented_store_read_write() {
let instrumented_store =
InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish());
let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap();
let read_count = IntCounter::new("read_count", "read_count").unwrap();
let seek_count = IntCounter::new("seek_count", "seek_count").unwrap();
let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap();
let write_count = IntCounter::new("write_count", "write_count").unwrap();
let flush_count = IntCounter::new("flush_count", "flush_count").unwrap();
let mut writer = instrumented_store
.writer("my_file", &write_byte_count, &write_count, &flush_count)
.await
.unwrap();
writer.write_all(b"hello").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
drop(writer);
let mut reader = instrumented_store
.reader("my_file", &read_byte_count, &read_count, &seek_count)
.await
.unwrap();
let mut buf = vec![0; 5];
reader.read_exact(&mut buf).await.unwrap();
reader.seek(io::SeekFrom::Start(0)).await.unwrap();
reader.read_exact(&mut buf).await.unwrap();
drop(reader);
assert_eq!(read_byte_count.get(), 10);
assert_eq!(read_count.get(), 2);
assert_eq!(seek_count.get(), 1);
assert_eq!(write_byte_count.get(), 5);
assert_eq!(write_count.get(), 1);
assert_eq!(flush_count.get(), 1);
}
}

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use object_store::util;
use crate::sst::file::FileId;
/// Returns the path of the SST file in the object store:
/// `{region_dir}/{sst_file_id}.parquet`
pub fn sst_file_path(region_dir: &str, sst_file_id: FileId) -> String {
util::join_path(region_dir, &sst_file_id.as_parquet())
}
/// Returns the path of the index file in the object store:
/// `{region_dir}/index/{sst_file_id}.puffin`
pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String {
let dir = util::join_dir(region_dir, "index");
util::join_path(&dir, &sst_file_id.as_puffin())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sst_file_path() {
let file_id = FileId::random();
assert_eq!(
sst_file_path("region_dir", file_id),
format!("region_dir/{file_id}.parquet")
);
}
#[test]
fn test_index_file_path() {
let file_id = FileId::random();
assert_eq!(
index_file_path("region_dir", file_id),
format!("region_dir/index/{file_id}.puffin")
);
}
}