fix: add data type to vector cache key (#3876)

* test: test for null tag

* test: sqlness test

* fix: add type to vector cache key

* test: update sqlness test
This commit is contained in:
Yingwen
2024-05-08 14:30:28 +08:00
committed by GitHub
parent c07a1babd5
commit 9d8f72d611
5 changed files with 228 additions and 16 deletions

View File

@@ -29,7 +29,7 @@ use datatypes::vectors::VectorRef;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;
use store_api::storage::{ConcreteDataType, RegionId};
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
@@ -123,16 +123,21 @@ impl CacheManager {
}
/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(&self, key: &Value) -> Option<VectorRef> {
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
self.vector_cache.as_ref().and_then(|vector_cache| {
let value = vector_cache.get(key);
let value = vector_cache.get(&(data_type.clone(), value.clone()));
update_hit_miss(value, VECTOR_TYPE)
})
}
/// Puts a vector with repeated value into the cache.
pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) {
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
let key = (vector.data_type(), value);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.add(vector_cache_weight(&key, &vector).into());
@@ -249,9 +254,9 @@ fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
(k.estimated_size() + parquet_meta_size(v)) as u32
}
fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 {
fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
(mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
}
fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
@@ -323,7 +328,7 @@ type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
/// Maps [Value] to a vector that holds this value repeatedly.
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
type VectorCache = Cache<Value, VectorRef>;
type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
/// Maps (region, file, row group, column) to [PageValue].
type PageCache = Cache<PageKey, Arc<PageValue>>;
@@ -353,7 +358,9 @@ mod tests {
let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
assert!(cache.get_repeated_vector(&value).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let key = PageKey {
region_id,
@@ -394,10 +401,14 @@ mod tests {
fn test_repeated_vector_cache() {
let cache = CacheManager::builder().vector_cache_size(4096).build();
let value = Value::Int64(10);
assert!(cache.get_repeated_vector(&value).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
let cached = cache.get_repeated_vector(&value).unwrap();
let cached = cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.unwrap();
assert_eq!(vector, cached);
}

View File

@@ -17,13 +17,15 @@
use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::Rows;
use api::v1::{Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest};
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;
use super::*;
@@ -598,3 +600,102 @@ async fn test_engine_with_write_cache() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_cache_null_primary_key() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
vector_cache_size: ReadableSize::mb(32),
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let column_metadatas = vec![
ColumnMetadata {
column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("tag_1", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 2,
},
ColumnMetadata {
column_schema: ColumnSchema::new("field_0", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
},
ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 4,
},
];
let request = RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
column_metadatas,
primary_key: vec![1, 2],
options: HashMap::new(),
region_dir: "test".to_string(),
};
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: vec![
api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue("1".to_string())),
},
api::v1::Value { value_data: None },
api::v1::Value {
value_data: Some(ValueData::F64Value(10.0)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
},
],
},
api::v1::Row {
values: vec![
api::v1::Value { value_data: None },
api::v1::Value {
value_data: Some(ValueData::I64Value(200)),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(20.0)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(2000)),
},
],
},
],
};
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_0 | tag_1 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 200 | 20.0 | 1970-01-01T00:00:02 |
| 1 | | 10.0 | 1970-01-01T00:00:01 |
+-------+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -241,7 +241,7 @@ fn repeated_vector_with_cache(
num_rows: usize,
cache_manager: &CacheManager,
) -> common_recordbatch::error::Result<VectorRef> {
if let Some(vector) = cache_manager.get_repeated_vector(value) {
if let Some(vector) = cache_manager.get_repeated_vector(data_type, value) {
// Tries to get the vector from cache manager. If the vector doesn't
// have enough length, creates a new one.
match vector.len().cmp(&num_rows) {
@@ -366,9 +366,15 @@ mod tests {
+---------------------+----+----+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
assert!(cache.get_repeated_vector(&Value::Int64(1)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(2)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(3)).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
.is_some());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
.is_some());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}

View File

@@ -0,0 +1,54 @@
CREATE TABLE `esT`(
`eT` TIMESTAMP(3) TIME INDEX,
`eAque` BOOLEAN,
`DoLOruM` INT,
`repudiAndae` STRING,
`ULLaM` BOOLEAN,
`COnSECTeTuR` SMALLINT DEFAULT -31852,
`DOLOrIBUS` FLOAT NOT NULL,
`QUiS` SMALLINT NULL,
`consEquatuR` BOOLEAN NOT NULL,
`vERO` BOOLEAN,
PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`)
);
Affected Rows: 0
INSERT INTO `esT` (
`consEquatuR`,
`eAque`,
`eT`,
`repudiAndae`,
`DOLOrIBUS`
)
VALUES
(
false,
false,
'+234049-06-04 01:11:41.163+0000',
'hello',
0.97377783
),
(
false,
true,
'-19578-12-20 11:45:59.875+0000',
NULL,
0.3535998
);
Affected Rows: 2
SELECT * FROM `esT` order by `eT` desc;
+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+
| eT | eAque | DoLOruM | repudiAndae | ULLaM | COnSECTeTuR | DOLOrIBUS | QUiS | consEquatuR | vERO |
+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+
| +234049-06-04T01:11:41.163 | false | | hello | | -31852 | 0.97377783 | | false | |
| -19578-12-20T11:45:59.875 | true | | | | -31852 | 0.3535998 | | false | |
+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+
DROP TABLE `esT`;
Affected Rows: 0

View File

@@ -0,0 +1,40 @@
CREATE TABLE `esT`(
`eT` TIMESTAMP(3) TIME INDEX,
`eAque` BOOLEAN,
`DoLOruM` INT,
`repudiAndae` STRING,
`ULLaM` BOOLEAN,
`COnSECTeTuR` SMALLINT DEFAULT -31852,
`DOLOrIBUS` FLOAT NOT NULL,
`QUiS` SMALLINT NULL,
`consEquatuR` BOOLEAN NOT NULL,
`vERO` BOOLEAN,
PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`)
);
INSERT INTO `esT` (
`consEquatuR`,
`eAque`,
`eT`,
`repudiAndae`,
`DOLOrIBUS`
)
VALUES
(
false,
false,
'+234049-06-04 01:11:41.163+0000',
'hello',
0.97377783
),
(
false,
true,
'-19578-12-20 11:45:59.875+0000',
NULL,
0.3535998
);
SELECT * FROM `esT` order by `eT` desc;
DROP TABLE `esT`;