feat(mito): Cache repeated vector for tags (#2523)

* feat: add vector_cache to CacheManager

* feat: cache repeated vectors

* feat: skip decoding pk if output doesn't contain tags

* test: add TestRegionMetadataBuilder

* test: test ProjectionMapper

* test: test vector cache

* test: test projection mapper convert

* style: fix clippy

* feat: do not cache vector if it is too large

* docs: update comment
This commit is contained in:
Yingwen
2023-10-07 19:36:00 +08:00
committed by GitHub
parent 0ad3fb6040
commit 657542c0b8
9 changed files with 379 additions and 69 deletions

View File

@@ -21,6 +21,8 @@ pub(crate) mod test_util;
use std::mem;
use std::sync::Arc;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::sync::Cache;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;
@@ -32,13 +34,15 @@ use crate::sst::file::FileId;
pub struct CacheManager {
/// Cache for SST metadata.
sst_meta_cache: Option<SstMetaCache>,
/// Cache for vectors.
vector_cache: Option<VectorCache>,
}
pub type CacheManagerRef = Arc<CacheManager>;
impl CacheManager {
/// Creates a new manager with specific cache size in bytes.
pub fn new(sst_meta_cache_size: u64) -> CacheManager {
pub fn new(sst_meta_cache_size: u64, vector_cache_size: u64) -> CacheManager {
let sst_meta_cache = if sst_meta_cache_size == 0 {
None
} else {
@@ -51,8 +55,23 @@ impl CacheManager {
.build();
Some(cache)
};
let vector_cache = if vector_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(vector_cache_size)
.weigher(|_k, v: &VectorRef| {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
})
.build();
Some(cache)
};
CacheManager { sst_meta_cache }
CacheManager {
sst_meta_cache,
vector_cache,
}
}
/// Gets cached [ParquetMetaData].
@@ -84,9 +103,23 @@ impl CacheManager {
cache.remove(&SstMetaKey(region_id, file_id));
}
}
/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(&self, key: &Value) -> Option<VectorRef> {
self.vector_cache
.as_ref()
.and_then(|vector_cache| vector_cache.get(key))
}
/// Puts a vector with repeated value into the cache.
pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
cache.insert(key, vector);
}
}
}
/// Cache key for SST meta.
/// Cache key (region id, file id) for SST meta.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SstMetaKey(RegionId, FileId);
@@ -97,16 +130,23 @@ impl SstMetaKey {
}
}
/// Maps (region id, file id) to [ParquetMetaData].
type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
/// Maps [Value] to a vector that holds this value repeatedly.
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
type VectorCache = Cache<Value, VectorRef>;
#[cfg(test)]
mod tests {
use datatypes::vectors::Int64Vector;
use super::*;
use crate::cache::test_util::parquet_meta;
#[test]
fn test_disable_meta_cache() {
let cache = CacheManager::new(0);
fn test_disable_cache() {
let cache = CacheManager::new(0, 0);
assert!(cache.sst_meta_cache.is_none());
let region_id = RegionId::new(1, 1);
@@ -114,11 +154,16 @@ mod tests {
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
assert!(cache.get_repeated_vector(&value).is_none());
}
#[test]
fn test_parquet_meta_cache() {
let cache = CacheManager::new(2000);
let cache = CacheManager::new(2000, 0);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
@@ -128,4 +173,15 @@ mod tests {
cache.remove_parquet_meta_data(region_id, file_id);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
}
#[test]
fn test_repeated_vector_cache() {
let cache = CacheManager::new(0, 4096);
let value = Value::Int64(10);
assert!(cache.get_repeated_vector(&value).is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
let cached = cache.get_repeated_vector(&value).unwrap();
assert_eq!(vector, cached);
}
}

View File

@@ -58,8 +58,10 @@ pub struct MitoConfig {
pub global_write_buffer_reject_size: ReadableSize,
// Cache configs:
/// Cache size for SST metadata (default 128MB). Setting it to 0 to disable cache.
/// Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache.
pub sst_meta_cache_size: ReadableSize,
/// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
pub vector_cache_size: ReadableSize,
}
impl Default for MitoConfig {
@@ -75,6 +77,7 @@ impl Default for MitoConfig {
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
}
}
}

View File

@@ -188,59 +188,21 @@ impl ReadRowHelper {
mod tests {
use api::v1;
use api::v1::ColumnDataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::test_util::i64_value;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;
/// Creates a region: `ts, k0, k1, ..., v0, v1, ...`
fn new_region_metadata(num_tag: usize, num_field: usize) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
let mut column_id = 0;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
TS_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id,
});
// For simplicity, we use the same data type for tag/field columns.
let mut primary_key = Vec::with_capacity(num_tag);
for i in 0..num_tag {
column_id += 1;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("k{i}"),
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id,
});
primary_key.push(i as u32 + 1);
}
for i in 0..num_field {
column_id += 1;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("v{i}"),
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id,
});
}
builder.primary_key(primary_key);
builder.build().unwrap()
fn new_region_metadata(num_tags: usize, num_fields: usize) -> RegionMetadata {
TestRegionMetadataBuilder::default()
.ts_name(TS_NAME)
.num_tags(num_tags)
.num_fields(num_fields)
.build()
}
/// Creates rows `[ 0, 1, ..., n ] x num_rows`

View File

@@ -14,6 +14,7 @@
//! Utilities for projection.
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;
@@ -23,22 +24,28 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatch;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::ValueRef;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::cache::CacheManager;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
pub struct ProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Maps column in [RecordBatch] to index in [Batch].
batch_indices: Vec<BatchIndex>,
/// Output record batch contains tags.
has_tags: bool,
/// Decoder for primary key.
codec: McmpRowCodec,
/// Schema for converted [RecordBatch].
@@ -92,6 +99,7 @@ impl ProjectionMapper {
.collect();
// For each projected column, compute its index in batches.
let mut batch_indices = Vec::with_capacity(projection.len());
let mut has_tags = false;
for idx in &projection {
// Safety: idx is valid.
let column = &metadata.column_metadatas[*idx];
@@ -100,6 +108,8 @@ impl ProjectionMapper {
SemanticType::Tag => {
// Safety: It is a primary key column.
let index = metadata.primary_key_index(column.column_id).unwrap();
// We need to output a tag.
has_tags = true;
// We always read all primary key so the column always exists and the tag
// index is always valid.
BatchIndex::Tag(index)
@@ -117,6 +127,7 @@ impl ProjectionMapper {
Ok(ProjectionMapper {
metadata: metadata.clone(),
batch_indices,
has_tags,
codec,
output_schema,
column_ids,
@@ -152,7 +163,11 @@ impl ProjectionMapper {
/// Converts a [Batch] to a [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result<RecordBatch> {
pub(crate) fn convert(
&self,
batch: &Batch,
cache_manager: Option<&CacheManager>,
) -> common_recordbatch::error::Result<RecordBatch> {
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
.batch_fields
@@ -160,11 +175,15 @@ impl ProjectionMapper {
.zip(batch.fields())
.all(|(id, batch_col)| *id == batch_col.column_id));
let pk_values = self
.codec
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// Skips decoding pk if we don't need to output it.
let pk_values = if self.has_tags {
self.codec
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?
} else {
Vec::new()
};
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
let num_rows = batch.num_rows();
@@ -175,8 +194,16 @@ impl ProjectionMapper {
{
match index {
BatchIndex::Tag(idx) => {
let value = pk_values[*idx].as_value_ref();
let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?;
let value = &pk_values[*idx];
let vector = match cache_manager {
Some(cache) => repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache,
)?,
None => new_repeated_vector(&column_schema.data_type, value, num_rows)?,
};
columns.push(vector);
}
BatchIndex::Timestamp => {
@@ -203,21 +230,161 @@ enum BatchIndex {
Field(usize),
}
/// Gets a vector with repeated values from specific cache or creates a new one.
fn repeated_vector_with_cache(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
cache_manager: &CacheManager,
) -> common_recordbatch::error::Result<VectorRef> {
if let Some(vector) = cache_manager.get_repeated_vector(value) {
// Tries to get the vector from cache manager. If the vector doesn't
// have enough length, creates a new one.
match vector.len().cmp(&num_rows) {
Ordering::Less => (),
Ordering::Equal => return Ok(vector),
Ordering::Greater => return Ok(vector.slice(0, num_rows)),
}
}
// Creates a new one.
let vector = new_repeated_vector(data_type, value, num_rows)?;
// Updates cache.
if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
cache_manager.put_repeated_vector(value.clone(), vector.clone());
}
Ok(vector)
}
/// Returns a vector with repeated values.
fn new_repeated_vector(
data_type: &ConcreteDataType,
value: ValueRef,
value: &Value,
num_rows: usize,
) -> common_recordbatch::error::Result<VectorRef> {
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector
.try_push_value_ref(value)
.try_push_value_ref(value.as_value_ref())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// This requires an additional allocation.
// TODO(yingwen): Add a way to create repeated vector to data type.
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
}
// TODO(yingwen): Add tests for mapper.
#[cfg(test)]
mod tests {
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use super::*;
use crate::read::BatchBuilder;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
ts_start: i64,
tags: &[i64],
fields: &[(ColumnId, i64)],
num_rows: usize,
) -> Batch {
let converter = McmpRowCodec::new(
(0..tags.len())
.map(|_| SortField::new(ConcreteDataType::int64_datatype()))
.collect(),
);
let primary_key = converter
.encode(tags.iter().map(|v| ValueRef::Int64(*v)))
.unwrap();
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
(0..num_rows).map(|i| ts_start + i as i64 * 1000),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
(0..num_rows).map(|_| OpType::Put as u8),
)))
.unwrap();
for (column_id, field) in fields {
builder
.push_field_array(
*column_id,
Arc::new(Int64Array::from_iter_values(
std::iter::repeat(*field).take(num_rows),
)),
)
.unwrap();
}
builder.build().unwrap()
}
fn print_record_batch(record_batch: RecordBatch) -> String {
pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
.unwrap()
.to_string()
}
#[test]
fn test_projection_mapper_all() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
let mapper = ProjectionMapper::all(&metadata).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!([3, 4], mapper.batch_fields());
let cache = CacheManager::new(0, 1024);
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
+---------------------+----+----+----+----+
| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
+---------------------+----+----+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
assert!(cache.get_repeated_vector(&Value::Int64(1)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(2)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(3)).is_none());
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_with_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!([4], mapper.batch_fields());
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let record_batch = mapper.convert(&batch, None).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
}

View File

@@ -110,9 +110,16 @@ impl SeqScan {
// Creates a stream to poll the batch reader and convert batch into record batch.
let mapper = self.mapper.clone();
let cache_manager = self.cache_manager.clone();
let stream = try_stream! {
while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield mapper.convert(&batch)?;
let cache = cache_manager.as_ref().map(|cache| cache.as_ref());
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
yield mapper.convert(&batch, cache)?;
}
};
let stream = Box::pin(RecordBatchStreamAdaptor::new(

View File

@@ -15,6 +15,7 @@
//! Utilities for testing.
pub mod memtable_util;
pub mod meta_util;
pub mod scheduler_util;
pub mod version_util;

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata).
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
/// Builder to builds a region with schema `ts, k0, k1, ..., v0, v1, ...`.
///
/// All tags and fields have int64 type.
#[derive(Debug)]
pub struct TestRegionMetadataBuilder {
region_id: RegionId,
ts_name: String,
num_tags: usize,
num_fields: usize,
}
impl Default for TestRegionMetadataBuilder {
fn default() -> Self {
Self {
region_id: RegionId::new(1, 1),
ts_name: "ts".to_string(),
num_tags: 1,
num_fields: 1,
}
}
}
impl TestRegionMetadataBuilder {
/// Sets ts name.
pub fn ts_name(&mut self, value: &str) -> &mut Self {
self.ts_name = value.to_string();
self
}
/// Sets tags num.
pub fn num_tags(&mut self, value: usize) -> &mut Self {
self.num_tags = value;
self
}
/// Sets fields num.
pub fn num_fields(&mut self, value: usize) -> &mut Self {
self.num_fields = value;
self
}
/// Builds a metadata.
pub fn build(&self) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(self.region_id);
let mut column_id = 0;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
&self.ts_name,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id,
});
// For simplicity, we use the same data type for tag/field columns.
let mut primary_key = Vec::with_capacity(self.num_tags);
for i in 0..self.num_tags {
column_id += 1;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("k{i}"),
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id,
});
primary_key.push(i as u32 + 1);
}
for i in 0..self.num_fields {
column_id += 1;
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
format!("v{i}"),
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id,
});
}
builder.primary_key(primary_key);
builder.build().unwrap()
}
}

View File

@@ -120,7 +120,10 @@ impl WorkerGroup {
config.global_write_buffer_size.as_bytes() as usize,
));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes()));
let cache_manager = Arc::new(CacheManager::new(
config.sst_meta_cache_size.as_bytes(),
config.vector_cache_size.as_bytes(),
));
let workers = (0..config.num_workers)
.map(|id| {
@@ -215,7 +218,10 @@ impl WorkerGroup {
))
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes()));
let cache_manager = Arc::new(CacheManager::new(
config.sst_meta_cache_size.as_bytes(),
config.vector_cache_size.as_bytes(),
));
let workers = (0..config.num_workers)
.map(|id| {

View File

@@ -663,6 +663,7 @@ auto_flush_interval = "30m"
global_write_buffer_size = "1GiB"
global_write_buffer_reject_size = "2GiB"
sst_meta_cache_size = "128MiB"
vector_cache_size = "512MiB"
[[region_engine]]