mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat: read metadata from write cache (#3224)
* feat: read meta from write cache * test: add case * chore: cr comment * chore: clippy * chore: code style * feat: put metadata to sst cache
This commit is contained in:
@@ -32,6 +32,7 @@ use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::cache::cache_size::parquet_meta_size;
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
@@ -69,15 +70,33 @@ impl CacheManager {
|
||||
}
|
||||
|
||||
/// Gets cached [ParquetMetaData].
|
||||
pub fn get_parquet_meta_data(
|
||||
pub async fn get_parquet_meta_data(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
) -> Option<Arc<ParquetMetaData>> {
|
||||
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
|
||||
// Try to get metadata from sst meta cache
|
||||
let metadata = self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
|
||||
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
|
||||
update_hit_miss(value, SST_META_TYPE)
|
||||
})
|
||||
});
|
||||
|
||||
if metadata.is_some() {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
// Try to get metadata from write cache
|
||||
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
|
||||
if let Some(write_cache) = &self.write_cache {
|
||||
if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
|
||||
let metadata = Arc::new(metadata);
|
||||
// Put metadata into sst meta cache
|
||||
self.put_parquet_meta_data(region_id, file_id, metadata.clone());
|
||||
return Some(metadata);
|
||||
}
|
||||
};
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Puts [ParquetMetaData] into the cache.
|
||||
@@ -315,8 +334,8 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::cache::test_util::parquet_meta;
|
||||
|
||||
#[test]
|
||||
fn test_disable_cache() {
|
||||
#[tokio::test]
|
||||
async fn test_disable_cache() {
|
||||
let cache = CacheManager::default();
|
||||
assert!(cache.sst_meta_cache.is_none());
|
||||
assert!(cache.vector_cache.is_none());
|
||||
@@ -326,7 +345,10 @@ mod tests {
|
||||
let file_id = FileId::random();
|
||||
let metadata = parquet_meta();
|
||||
cache.put_parquet_meta_data(region_id, file_id, metadata);
|
||||
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
|
||||
assert!(cache
|
||||
.get_parquet_meta_data(region_id, file_id)
|
||||
.await
|
||||
.is_none());
|
||||
|
||||
let value = Value::Int64(10);
|
||||
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
|
||||
@@ -346,17 +368,26 @@ mod tests {
|
||||
assert!(cache.write_cache().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parquet_meta_cache() {
|
||||
#[tokio::test]
|
||||
async fn test_parquet_meta_cache() {
|
||||
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = FileId::random();
|
||||
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
|
||||
assert!(cache
|
||||
.get_parquet_meta_data(region_id, file_id)
|
||||
.await
|
||||
.is_none());
|
||||
let metadata = parquet_meta();
|
||||
cache.put_parquet_meta_data(region_id, file_id, metadata);
|
||||
assert!(cache.get_parquet_meta_data(region_id, file_id).is_some());
|
||||
assert!(cache
|
||||
.get_parquet_meta_data(region_id, file_id)
|
||||
.await
|
||||
.is_some());
|
||||
cache.remove_parquet_meta_data(region_id, file_id);
|
||||
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
|
||||
assert!(cache
|
||||
.get_parquet_meta_data(region_id, file_id)
|
||||
.await
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
41
src/mito2/src/cache/file_cache.rs
vendored
41
src/mito2/src/cache/file_cache.rs
vendored
@@ -26,6 +26,7 @@ use moka::future::Cache;
|
||||
use moka::notification::RemovalCause;
|
||||
use object_store::util::join_path;
|
||||
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -34,6 +35,7 @@ use crate::error::{OpenDalSnafu, Result};
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::parquet::helper::fetch_byte_ranges;
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
|
||||
/// Subdirectory of cached files.
|
||||
const FILE_DIR: &str = "files/";
|
||||
@@ -120,7 +122,7 @@ impl FileCache {
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!("Failed to get file for key {:?}, err: {}", key, e);
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
@@ -154,7 +156,7 @@ impl FileCache {
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!("Failed to get file for key {:?}, err: {}", key, e);
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
|
||||
// We removes the file from the index.
|
||||
@@ -226,6 +228,41 @@ impl FileCache {
|
||||
self.local_store.clone()
|
||||
}
|
||||
|
||||
/// Get the parquet metadata in file cache.
|
||||
/// If the file is not in the cache or fail to load metadata, return None.
|
||||
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
|
||||
// Check if file cache contrains the key
|
||||
if let Some(index_value) = self.memory_index.get(&key).await {
|
||||
// Load metadata from file cache
|
||||
let local_store = self.local_store();
|
||||
let file_path = self.cache_file_path(key);
|
||||
let file_size = index_value.file_size as u64;
|
||||
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
|
||||
|
||||
match metadata_loader.load().await {
|
||||
Ok(metadata) => {
|
||||
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
|
||||
Some(metadata)
|
||||
}
|
||||
Err(e) => {
|
||||
if !e.is_object_not_found() {
|
||||
warn!(
|
||||
e; "Failed to get parquet metadata for key {:?}",
|
||||
key
|
||||
);
|
||||
}
|
||||
// We removes the file from the index.
|
||||
self.memory_index.remove(&key).await;
|
||||
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
||||
if self.local_store.is_exist(file_path).await? {
|
||||
Ok(Some(self.local_store.reader(file_path).await?))
|
||||
|
||||
96
src/mito2/src/cache/write_cache.rs
vendored
96
src/mito2/src/cache/write_cache.rs
vendored
@@ -229,15 +229,18 @@ pub struct SstUploadRequest {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use object_store::util::join_dir;
|
||||
|
||||
use super::*;
|
||||
use crate::cache::test_util::new_fs_store;
|
||||
use crate::cache::CacheManager;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::location::{index_file_path, sst_file_path};
|
||||
use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
|
||||
use crate::sst::parquet::reader::ParquetReaderBuilder;
|
||||
use crate::test_util::sst_util::{
|
||||
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
|
||||
sst_region_metadata,
|
||||
};
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -245,27 +248,17 @@ mod tests {
|
||||
// TODO(QuenKar): maybe find a way to create some object server for testing,
|
||||
// and now just use local file system to mock.
|
||||
let mut env = TestEnv::new();
|
||||
let data_home = env.data_home().display().to_string();
|
||||
let mock_store = env.init_object_store_manager();
|
||||
let file_id = FileId::random();
|
||||
let upload_path = sst_file_path("test", file_id);
|
||||
let index_upload_path = index_file_path("test", file_id);
|
||||
let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create WriteCache
|
||||
let local_dir = create_temp_dir("");
|
||||
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
|
||||
let object_store_manager = env.get_object_store_manager().unwrap();
|
||||
let write_cache = WriteCache::new(
|
||||
local_store.clone(),
|
||||
object_store_manager,
|
||||
ReadableSize::mb(10),
|
||||
intm_mgr,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let write_cache = env
|
||||
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
|
||||
.await;
|
||||
|
||||
// Create Source
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
@@ -328,4 +321,73 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(remote_index_data, cache_index_data);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_metadata_from_write_cache() {
|
||||
let mut env = TestEnv::new();
|
||||
let data_home = env.data_home().display().to_string();
|
||||
let mock_store = env.init_object_store_manager();
|
||||
|
||||
let local_dir = create_temp_dir("");
|
||||
let local_path = local_dir.path().to_str().unwrap();
|
||||
let local_store = new_fs_store(local_path);
|
||||
|
||||
// Create a cache manager using only write cache
|
||||
let write_cache = env
|
||||
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
|
||||
.await;
|
||||
let cache_manager = Arc::new(
|
||||
CacheManager::builder()
|
||||
.write_cache(Some(write_cache.clone()))
|
||||
.build(),
|
||||
);
|
||||
|
||||
// Create source
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let handle = sst_file_handle(0, 1000);
|
||||
let file_id = handle.file_id();
|
||||
let source = new_source(&[
|
||||
new_batch_by_range(&["a", "d"], 0, 60),
|
||||
new_batch_by_range(&["b", "f"], 0, 40),
|
||||
new_batch_by_range(&["b", "h"], 100, 200),
|
||||
]);
|
||||
|
||||
// Write to local cache and upload sst to mock remote store
|
||||
let write_request = SstWriteRequest {
|
||||
file_id,
|
||||
metadata,
|
||||
source,
|
||||
storage: None,
|
||||
create_inverted_index: false,
|
||||
mem_threshold_index_create: None,
|
||||
index_write_buffer_size: None,
|
||||
cache_manager: cache_manager.clone(),
|
||||
};
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size: 512,
|
||||
..Default::default()
|
||||
};
|
||||
let upload_path = sst_file_path(&data_home, file_id);
|
||||
let index_upload_path = index_file_path(&data_home, file_id);
|
||||
let upload_request = SstUploadRequest {
|
||||
upload_path: upload_path.clone(),
|
||||
index_upload_path: index_upload_path.clone(),
|
||||
remote_store: mock_store.clone(),
|
||||
};
|
||||
|
||||
let sst_info = write_cache
|
||||
.write_and_upload_sst(write_request, upload_request, &write_opts)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let write_parquet_metadata = sst_info.file_metadata.unwrap();
|
||||
|
||||
// Read metadata from write cache
|
||||
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
|
||||
.cache(Some(cache_manager.clone()));
|
||||
let reader = builder.build().await.unwrap();
|
||||
|
||||
// Check parquet metadata
|
||||
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
mod format;
|
||||
pub(crate) mod helper;
|
||||
mod metadata;
|
||||
pub(crate) mod metadata;
|
||||
mod page_reader;
|
||||
pub mod reader;
|
||||
pub mod row_group;
|
||||
@@ -88,7 +88,8 @@ mod tests {
|
||||
use crate::sst::parquet::reader::ParquetReaderBuilder;
|
||||
use crate::sst::parquet::writer::ParquetWriter;
|
||||
use crate::test_util::sst_util::{
|
||||
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
|
||||
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
|
||||
sst_region_metadata,
|
||||
};
|
||||
use crate::test_util::{check_reader_result, TestEnv};
|
||||
|
||||
@@ -258,34 +259,7 @@ mod tests {
|
||||
let reader = builder.build().await.unwrap();
|
||||
let reader_metadata = reader.parquet_metadata();
|
||||
|
||||
// Because ParquetMetaData doesn't implement PartialEq,
|
||||
// check all fields manually
|
||||
macro_rules! assert_metadata {
|
||||
( $writer:expr, $reader:expr, $($method:ident,)+ ) => {
|
||||
$(
|
||||
assert_eq!($writer.$method(), $reader.$method());
|
||||
)+
|
||||
}
|
||||
}
|
||||
|
||||
assert_metadata!(
|
||||
writer_metadata.file_metadata(),
|
||||
reader_metadata.file_metadata(),
|
||||
version,
|
||||
num_rows,
|
||||
created_by,
|
||||
key_value_metadata,
|
||||
schema_descr,
|
||||
column_orders,
|
||||
);
|
||||
|
||||
assert_metadata!(
|
||||
writer_metadata,
|
||||
reader_metadata,
|
||||
row_groups,
|
||||
column_index,
|
||||
offset_index,
|
||||
);
|
||||
assert_parquet_metadata_eq(writer_metadata, reader_metadata)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -243,15 +243,15 @@ impl ParquetReaderBuilder {
|
||||
file_path: &str,
|
||||
file_size: u64,
|
||||
) -> Result<Arc<ParquetMetaData>> {
|
||||
let region_id = self.file_handle.region_id();
|
||||
let file_id = self.file_handle.file_id();
|
||||
// Tries to get from global cache.
|
||||
if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| {
|
||||
cache.get_parquet_meta_data(self.file_handle.region_id(), self.file_handle.file_id())
|
||||
}) {
|
||||
return Ok(metadata);
|
||||
if let Some(manager) = &self.cache_manager {
|
||||
if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await {
|
||||
return Ok(metadata);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(QuenKar): should also check write cache to get parquet metadata.
|
||||
|
||||
// Cache miss, load metadata directly.
|
||||
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
|
||||
let metadata = metadata_loader.load().await?;
|
||||
|
||||
@@ -29,6 +29,7 @@ use api::greptime_proto::v1;
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{OpType, Row, Rows, SemanticType};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
|
||||
@@ -38,6 +39,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::test_util::log_store_util;
|
||||
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
@@ -47,6 +49,7 @@ use store_api::region_request::{
|
||||
};
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::EventListenerRef;
|
||||
use crate::engine::{MitoEngine, MITO_ENGINE_NAME};
|
||||
@@ -55,6 +58,7 @@ use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::read::{Batch, BatchBuilder, BatchReader};
|
||||
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::worker::WorkerGroup;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -325,6 +329,26 @@ impl TestEnv {
|
||||
RegionManifestManager::open(manifest_opts).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a write cache with local store.
|
||||
pub async fn create_write_cache(
|
||||
&self,
|
||||
local_store: ObjectStore,
|
||||
capacity: ReadableSize,
|
||||
) -> WriteCacheRef {
|
||||
let data_home = self.data_home().display().to_string();
|
||||
|
||||
let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let object_store_manager = self.get_object_store_manager().unwrap();
|
||||
let write_cache = WriteCache::new(local_store, object_store_manager, capacity, intm_mgr)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Arc::new(write_cache)
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to mock a [RegionCreateRequest].
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
|
||||
//! Utilities for testing SSTs.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{OpType, SemanticType};
|
||||
use common_time::Timestamp;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -124,3 +127,27 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
|
||||
pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
|
||||
macro_rules! assert_metadata {
|
||||
( $a:expr, $b:expr, $($method:ident,)+ ) => {
|
||||
$(
|
||||
assert_eq!($a.$method(), $b.$method());
|
||||
)+
|
||||
}
|
||||
}
|
||||
|
||||
assert_metadata!(
|
||||
a.file_metadata(),
|
||||
b.file_metadata(),
|
||||
version,
|
||||
num_rows,
|
||||
created_by,
|
||||
key_value_metadata,
|
||||
schema_descr,
|
||||
column_orders,
|
||||
);
|
||||
|
||||
assert_metadata!(a, b, row_groups, column_index, offset_index,);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user