diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 9070e2babe..50f2dba270 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -16,8 +16,8 @@ mod column_schema; pub mod constraint; use std::collections::HashMap; -use std::fmt; use std::sync::Arc; +use std::{fmt, mem}; use arrow::datatypes::{Field, Schema as ArrowSchema}; use datafusion_common::DFSchemaRef; @@ -177,6 +177,26 @@ impl Schema { &self.arrow_schema.metadata } + /// Returns the estimated memory footprint of this schema. + pub fn estimated_size(&self) -> usize { + mem::size_of_val(self) + + mem::size_of::() * self.column_schemas.capacity() + + self + .column_schemas + .iter() + .map(|column_schema| { + column_schema.estimated_size() - mem::size_of::() + }) + .sum::() + + mem::size_of::<(String, usize)>() * self.name_to_index.capacity() + + self + .name_to_index + .keys() + .map(|name| name.capacity()) + .sum::() + + arrow_schema_size(self.arrow_schema.as_ref()) + } + /// Generate a new projected schema /// /// # Panic @@ -213,6 +233,17 @@ impl Schema { } } +fn arrow_schema_size(schema: &ArrowSchema) -> usize { + mem::size_of_val(schema) + + schema.fields.size() + + mem::size_of::<(String, String)>() * schema.metadata.capacity() + + schema + .metadata + .iter() + .map(|(key, value)| key.capacity() + value.capacity()) + .sum::() +} + #[derive(Default)] pub struct SchemaBuilder { column_schemas: Vec, diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 183cf05da8..2479f4fc41 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -13,8 +13,8 @@ // limitations under the License. use std::collections::HashMap; -use std::fmt; use std::str::FromStr; +use std::{fmt, mem}; use arrow::datatypes::Field; use arrow_schema::extension::{ @@ -178,6 +178,19 @@ impl ColumnSchema { self } + /// Returns the estimated memory footprint of this schema. + pub fn estimated_size(&self) -> usize { + mem::size_of_val(self) - mem::size_of_val(&self.data_type) + + self.data_type.as_arrow_type().size() + + self.name.capacity() + + self + .default_constraint + .as_ref() + .map(column_default_constraint_size) + .unwrap_or_default() + + metadata_size(&self.metadata) + } + /// Set the inverted index for the column. /// Similar to [with_inverted_index] but don't take the ownership. /// @@ -493,6 +506,21 @@ impl ColumnSchema { } } +fn metadata_size(metadata: &Metadata) -> usize { + mem::size_of::<(String, String)>() * metadata.capacity() + + metadata + .iter() + .map(|(key, value)| key.capacity() + value.capacity()) + .sum::() +} + +fn column_default_constraint_size(default_constraint: &ColumnDefaultConstraint) -> usize { + match default_constraint { + ColumnDefaultConstraint::Function(expr) => expr.capacity(), + ColumnDefaultConstraint::Value(value) => value.as_value_ref().data_size(), + } +} + /// Column extended type set in column schema's metadata. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ColumnExtType { diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 57fb11a8d1..9e1887e126 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -78,12 +78,12 @@ const SELECTOR_RESULT_TYPE: &str = "selector_result"; pub(crate) struct CachedSstMeta { parquet_metadata: Arc, region_metadata: RegionMetadataRef, - region_metadata_size_hint: usize, + region_metadata_weight: usize, } impl CachedSstMeta { pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result { - let (region_metadata, region_metadata_size_hint) = { + let (region_metadata, region_metadata_weight) = { let file_metadata = parquet_metadata.file_metadata(); let key_values = file_metadata .key_value_metadata() @@ -109,14 +109,18 @@ impl CachedSstMeta { store_api::metadata::RegionMetadata::from_json(json) .context(InvalidMetadataSnafu)?, ); - (region_metadata, json.len()) + // Keep the previous JSON-byte floor and charge the decoded structures as well. + ( + region_metadata.clone(), + region_metadata.estimated_size().max(json.len()), + ) }; let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata)); Ok(Self { parquet_metadata, region_metadata, - region_metadata_size_hint, + region_metadata_weight, }) } @@ -877,8 +881,7 @@ impl CacheManagerBuilder { fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { // We ignore the size of `Arc`. - (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_size_hint) - as u32 + (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32 } fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 { @@ -1055,15 +1058,20 @@ type SelectorResultCache = Cache>; mod tests { use std::sync::Arc; + use api::v1::SemanticType; use api::v1::index::{BloomFilterMeta, InvertedIndexMetas}; + use datatypes::schema::ColumnSchema; use datatypes::vectors::Int64Vector; use puffin::file_metadata::FileMetadata; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::storage::ColumnId; use super::*; use crate::cache::index::bloom_filter_index::Tag; use crate::cache::index::result_cache::PredicateKey; - use crate::cache::test_util::{parquet_meta, sst_parquet_meta}; + use crate::cache::test_util::{ + parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata, + }; use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] @@ -1141,6 +1149,25 @@ mod tests { ); } + #[test] + fn test_meta_cache_weight_accounts_for_decoded_region_metadata() { + let region_metadata = Arc::new(wide_region_metadata(128)); + let json_len = region_metadata.to_json().unwrap().len(); + let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone()); + let cached = Arc::new( + CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(), + ); + let key = SstMetaKey(region_metadata.region_id, FileId::random()); + + assert!(cached.region_metadata_weight > json_len); + assert_eq!( + meta_cache_weight(&key, &cached) as usize, + key.estimated_size() + + parquet_meta_size(&cached.parquet_metadata) + + cached.region_metadata_weight + ); + } + #[test] fn test_repeated_vector_cache() { let cache = CacheManager::builder().vector_cache_size(4096).build(); @@ -1280,4 +1307,45 @@ mod tests { assert!(result_cache.get(&predicate, index_id.file_id()).is_none()); assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none()); } + + fn wide_region_metadata(column_count: u32) -> RegionMetadata { + let region_id = RegionId::new(1024, 7); + let mut builder = RegionMetadataBuilder::new(region_id); + let mut primary_key = Vec::new(); + + for column_id in 0..column_count { + let semantic_type = if column_id < 32 { + primary_key.push(column_id); + SemanticType::Tag + } else { + SemanticType::Field + }; + let mut column_schema = ColumnSchema::new( + format!("wide_column_{column_id}"), + ConcreteDataType::string_datatype(), + true, + ); + column_schema + .mut_metadata() + .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4)); + builder.push_column_metadata(ColumnMetadata { + column_schema, + semantic_type, + column_id, + }); + } + + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: column_count, + }); + builder.primary_key(primary_key); + + builder.build().unwrap() + } } diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index e3de745d5d..ef3d8e9315 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -46,6 +46,15 @@ pub(crate) fn sst_parquet_meta() -> (Arc, RegionMetadataRef) { (builder.metadata().clone(), region_metadata) } +/// Returns parquet metadata for an SST parquet file with custom region metadata. +pub(crate) fn sst_parquet_meta_with_region_metadata( + region_metadata: RegionMetadataRef, +) -> Arc { + let file_data = parquet_file_data_with_region_metadata(®ion_metadata); + let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap(); + builder.metadata().clone() +} + /// Write a test parquet file to a buffer fn parquet_file_data() -> Vec { parquet_file_data_inner(None) diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index d571a5392f..0c663bccc0 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -18,8 +18,8 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; -use std::fmt; use std::sync::Arc; +use std::{fmt, mem}; use api::v1::SemanticType; use api::v1::column_def::try_as_column_schema; @@ -99,6 +99,12 @@ impl ColumnMetadata { pub fn is_same_datatype(&self, other: &Self) -> bool { self.column_schema.data_type == other.column_schema.data_type } + + /// Returns the estimated memory footprint of this metadata. + pub fn estimated_size(&self) -> usize { + mem::size_of_val(self) - mem::size_of_val(&self.column_schema) + + self.column_schema.estimated_size() + } } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -226,6 +232,25 @@ impl RegionMetadata { serde_json::from_str(s).context(SerdeJsonSnafu) } + /// Returns the estimated memory footprint of this metadata. + pub fn estimated_size(&self) -> usize { + mem::size_of_val(self) + + mem::size_of::() * self.column_metadatas.capacity() + + self + .column_metadatas + .iter() + .map(|column| column.estimated_size() - mem::size_of::()) + .sum::() + + mem::size_of::() * self.primary_key.capacity() + + mem::size_of::<(ColumnId, usize)>() * self.id_to_index.capacity() + + self.schema.estimated_size() + + self + .partition_expr + .as_ref() + .map(|expr| expr.capacity()) + .unwrap_or_default() + } + /// Encode the metadata to a JSON string. pub fn to_json(&self) -> Result { serde_json::to_string(&self).context(SerdeJsonSnafu)