fix: account for decoded sst metadata cache weight

This commit is contained in:
Ruihang Xia
2026-03-14 04:02:25 +08:00
parent ddb34fec2e
commit a337587433
5 changed files with 171 additions and 10 deletions

View File

@@ -16,8 +16,8 @@ mod column_schema;
pub mod constraint;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::{fmt, mem};
use arrow::datatypes::{Field, Schema as ArrowSchema};
use datafusion_common::DFSchemaRef;
@@ -177,6 +177,26 @@ impl Schema {
&self.arrow_schema.metadata
}
/// Returns the estimated memory footprint of this schema.
pub fn estimated_size(&self) -> usize {
mem::size_of_val(self)
+ mem::size_of::<ColumnSchema>() * self.column_schemas.capacity()
+ self
.column_schemas
.iter()
.map(|column_schema| {
column_schema.estimated_size() - mem::size_of::<ColumnSchema>()
})
.sum::<usize>()
+ mem::size_of::<(String, usize)>() * self.name_to_index.capacity()
+ self
.name_to_index
.keys()
.map(|name| name.capacity())
.sum::<usize>()
+ arrow_schema_size(self.arrow_schema.as_ref())
}
/// Generate a new projected schema
///
/// # Panic
@@ -213,6 +233,17 @@ impl Schema {
}
}
fn arrow_schema_size(schema: &ArrowSchema) -> usize {
mem::size_of_val(schema)
+ schema.fields.size()
+ mem::size_of::<(String, String)>() * schema.metadata.capacity()
+ schema
.metadata
.iter()
.map(|(key, value)| key.capacity() + value.capacity())
.sum::<usize>()
}
#[derive(Default)]
pub struct SchemaBuilder {
column_schemas: Vec<ColumnSchema>,

View File

@@ -13,8 +13,8 @@
// limitations under the License.
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::{fmt, mem};
use arrow::datatypes::Field;
use arrow_schema::extension::{
@@ -178,6 +178,19 @@ impl ColumnSchema {
self
}
/// Returns the estimated memory footprint of this schema.
pub fn estimated_size(&self) -> usize {
mem::size_of_val(self) - mem::size_of_val(&self.data_type)
+ self.data_type.as_arrow_type().size()
+ self.name.capacity()
+ self
.default_constraint
.as_ref()
.map(column_default_constraint_size)
.unwrap_or_default()
+ metadata_size(&self.metadata)
}
/// Set the inverted index for the column.
/// Similar to [with_inverted_index] but don't take the ownership.
///
@@ -493,6 +506,21 @@ impl ColumnSchema {
}
}
fn metadata_size(metadata: &Metadata) -> usize {
mem::size_of::<(String, String)>() * metadata.capacity()
+ metadata
.iter()
.map(|(key, value)| key.capacity() + value.capacity())
.sum::<usize>()
}
fn column_default_constraint_size(default_constraint: &ColumnDefaultConstraint) -> usize {
match default_constraint {
ColumnDefaultConstraint::Function(expr) => expr.capacity(),
ColumnDefaultConstraint::Value(value) => value.as_value_ref().data_size(),
}
}
/// Column extended type set in column schema's metadata.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnExtType {

View File

@@ -78,12 +78,12 @@ const SELECTOR_RESULT_TYPE: &str = "selector_result";
pub(crate) struct CachedSstMeta {
parquet_metadata: Arc<ParquetMetaData>,
region_metadata: RegionMetadataRef,
region_metadata_size_hint: usize,
region_metadata_weight: usize,
}
impl CachedSstMeta {
pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
let (region_metadata, region_metadata_size_hint) = {
let (region_metadata, region_metadata_weight) = {
let file_metadata = parquet_metadata.file_metadata();
let key_values = file_metadata
.key_value_metadata()
@@ -109,14 +109,18 @@ impl CachedSstMeta {
store_api::metadata::RegionMetadata::from_json(json)
.context(InvalidMetadataSnafu)?,
);
(region_metadata, json.len())
// Keep the previous JSON-byte floor and charge the decoded structures as well.
(
region_metadata.clone(),
region_metadata.estimated_size().max(json.len()),
)
};
let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
Ok(Self {
parquet_metadata,
region_metadata,
region_metadata_size_hint,
region_metadata_weight,
})
}
@@ -877,8 +881,7 @@ impl CacheManagerBuilder {
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_size_hint)
as u32
(k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
}
fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
@@ -1055,15 +1058,20 @@ type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int64Vector;
use puffin::file_metadata::FileMetadata;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::ColumnId;
use super::*;
use crate::cache::index::bloom_filter_index::Tag;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::{parquet_meta, sst_parquet_meta};
use crate::cache::test_util::{
parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
};
use crate::sst::parquet::row_selection::RowGroupSelection;
#[tokio::test]
@@ -1141,6 +1149,25 @@ mod tests {
);
}
#[test]
fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
let region_metadata = Arc::new(wide_region_metadata(128));
let json_len = region_metadata.to_json().unwrap().len();
let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
let cached = Arc::new(
CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
);
let key = SstMetaKey(region_metadata.region_id, FileId::random());
assert!(cached.region_metadata_weight > json_len);
assert_eq!(
meta_cache_weight(&key, &cached) as usize,
key.estimated_size()
+ parquet_meta_size(&cached.parquet_metadata)
+ cached.region_metadata_weight
);
}
#[test]
fn test_repeated_vector_cache() {
let cache = CacheManager::builder().vector_cache_size(4096).build();
@@ -1280,4 +1307,45 @@ mod tests {
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
}
fn wide_region_metadata(column_count: u32) -> RegionMetadata {
let region_id = RegionId::new(1024, 7);
let mut builder = RegionMetadataBuilder::new(region_id);
let mut primary_key = Vec::new();
for column_id in 0..column_count {
let semantic_type = if column_id < 32 {
primary_key.push(column_id);
SemanticType::Tag
} else {
SemanticType::Field
};
let mut column_schema = ColumnSchema::new(
format!("wide_column_{column_id}"),
ConcreteDataType::string_datatype(),
true,
);
column_schema
.mut_metadata()
.insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
builder.push_column_metadata(ColumnMetadata {
column_schema,
semantic_type,
column_id,
});
}
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: column_count,
});
builder.primary_key(primary_key);
builder.build().unwrap()
}
}

View File

@@ -46,6 +46,15 @@ pub(crate) fn sst_parquet_meta() -> (Arc<ParquetMetaData>, RegionMetadataRef) {
(builder.metadata().clone(), region_metadata)
}
/// Returns parquet metadata for an SST parquet file with custom region metadata.
pub(crate) fn sst_parquet_meta_with_region_metadata(
region_metadata: RegionMetadataRef,
) -> Arc<ParquetMetaData> {
let file_data = parquet_file_data_with_region_metadata(&region_metadata);
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap();
builder.metadata().clone()
}
/// Write a test parquet file to a buffer
fn parquet_file_data() -> Vec<u8> {
parquet_file_data_inner(None)

View File

@@ -18,8 +18,8 @@
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::Arc;
use std::{fmt, mem};
use api::v1::SemanticType;
use api::v1::column_def::try_as_column_schema;
@@ -99,6 +99,12 @@ impl ColumnMetadata {
pub fn is_same_datatype(&self, other: &Self) -> bool {
self.column_schema.data_type == other.column_schema.data_type
}
/// Returns the estimated memory footprint of this metadata.
pub fn estimated_size(&self) -> usize {
mem::size_of_val(self) - mem::size_of_val(&self.column_schema)
+ self.column_schema.estimated_size()
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -226,6 +232,25 @@ impl RegionMetadata {
serde_json::from_str(s).context(SerdeJsonSnafu)
}
/// Returns the estimated memory footprint of this metadata.
pub fn estimated_size(&self) -> usize {
mem::size_of_val(self)
+ mem::size_of::<ColumnMetadata>() * self.column_metadatas.capacity()
+ self
.column_metadatas
.iter()
.map(|column| column.estimated_size() - mem::size_of::<ColumnMetadata>())
.sum::<usize>()
+ mem::size_of::<ColumnId>() * self.primary_key.capacity()
+ mem::size_of::<(ColumnId, usize)>() * self.id_to_index.capacity()
+ self.schema.estimated_size()
+ self
.partition_expr
.as_ref()
.map(|expr| expr.capacity())
.unwrap_or_default()
}
/// Encode the metadata to a JSON string.
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).context(SerdeJsonSnafu)