feat(metric-engine): support to write rows with sparse primary key encoding (#5424)

* feat: support to write rows with sparse primary key encoding

* feat: cache decoded primary key

* chore: remove unused code

* feat: create physical table based on the engine config

* chore: log primary key encoding info

* fix: correct sqlness test

* chore: correct config.md

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-01-24 15:56:09 +09:00
committed by GitHub
parent 2802c8bf28
commit b107384cc6
32 changed files with 494 additions and 112 deletions

View File

@@ -309,8 +309,6 @@ struct MetricEngineInner {
metadata_region: MetadataRegion,
data_region: DataRegion,
state: RwLock<MetricEngineState>,
/// TODO(weny): remove it after the config is used.
#[allow(unused)]
config: EngineConfig,
row_modifier: RowModifier,
}

View File

@@ -77,7 +77,12 @@ impl MetricEngineInner {
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)?;
self.recover_states(region_id, physical_region_options)
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
self.recover_states(region_id, primary_key_encoding, physical_region_options)
.await?;
Ok(0)
}

View File

@@ -32,7 +32,9 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, TTL_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
@@ -122,14 +124,20 @@ impl MetricEngineInner {
.with_context(|_| CreateMitoRegionSnafu {
region_type: DATA_REGION_SUBDIR,
})?;
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
info!("Created physical metric region {region_id}");
info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
PHYSICAL_REGION_COUNT.inc();
// remember this table
self.state.write().unwrap().add_physical_region(
data_region_id,
physical_columns,
primary_key_encoding,
physical_region_options,
);
@@ -516,7 +524,10 @@ impl MetricEngineInner {
data_region_request.primary_key = primary_key;
// set data region options
set_data_region_options(&mut data_region_request.options);
set_data_region_options(
&mut data_region_request.options,
self.config.experimental_sparse_primary_key_encoding,
);
data_region_request
}
@@ -555,6 +566,8 @@ pub(crate) fn region_options_for_metadata_region(
) -> HashMap<String, String> {
// TODO(ruihang, weny): add whitelist for metric engine options.
original.remove(APPEND_MODE_KEY);
// Don't allow to set primary key encoding for metadata region.
original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
original
}

View File

@@ -17,7 +17,8 @@
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
@@ -26,7 +27,7 @@ use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::error::{OpenMitoRegionSnafu, Result};
use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
@@ -49,7 +50,13 @@ impl MetricEngineInner {
// open physical region and recover states
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id, physical_region_options)
let data_region_id = utils::to_data_region_id(region_id);
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
self.recover_states(region_id, primary_key_encoding, physical_region_options)
.await?;
Ok(0)
@@ -80,7 +87,10 @@ impl MetricEngineInner {
};
let mut data_region_options = request.options;
set_data_region_options(&mut data_region_options);
set_data_region_options(
&mut data_region_options,
self.config.experimental_sparse_primary_key_encoding,
);
let open_data_region_request = RegionOpenRequest {
region_dir: data_region_dir,
options: data_region_options,
@@ -125,6 +135,7 @@ impl MetricEngineInner {
pub(crate) async fn recover_states(
&self,
physical_region_id: RegionId,
primary_key_encoding: PrimaryKeyEncoding,
physical_region_options: PhysicalRegionOptions,
) -> Result<()> {
// load logical regions and physical column names
@@ -148,6 +159,7 @@ impl MetricEngineInner {
state.add_physical_region(
physical_region_id,
physical_columns,
primary_key_encoding,
physical_region_options,
);
// recover logical regions

View File

@@ -20,6 +20,7 @@ use store_api::metric_engine_consts::{
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION,
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT, METRIC_ENGINE_INDEX_TYPE_OPTION,
};
use store_api::mito_engine_options::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
use crate::error::{Error, ParseRegionOptionsSnafu, Result};
@@ -46,7 +47,10 @@ pub enum IndexOptions {
}
/// Sets data region specific options.
pub fn set_data_region_options(options: &mut HashMap<String, String>) {
pub fn set_data_region_options(
options: &mut HashMap<String, String>,
sparse_primary_key_encoding_if_absent: bool,
) {
options.remove(METRIC_ENGINE_INDEX_TYPE_OPTION);
options.remove(METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION);
options.insert(
@@ -55,6 +59,14 @@ pub fn set_data_region_options(options: &mut HashMap<String, String>) {
);
// Set memtable options for the data region.
options.insert("memtable.type".to_string(), "partition_tree".to_string());
if sparse_primary_key_encoding_if_absent
&& !options.contains_key(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING)
{
options.insert(
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
"sparse".to_string(),
);
}
}
impl TryFrom<&HashMap<String, String>> for PhysicalRegionOptions {
@@ -108,7 +120,7 @@ mod tests {
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION.to_string(),
"102400".to_string(),
);
set_data_region_options(&mut options);
set_data_region_options(&mut options, false);
for key in [
METRIC_ENGINE_INDEX_TYPE_OPTION,

View File

@@ -59,33 +59,38 @@ impl MetricEngineInner {
.with_label_values(&["put"])
.start_timer();
let physical_region_id = *self
.state
.read()
.unwrap()
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
let (physical_region_id, data_region_id, primary_key_encoding) = {
let state = self.state.read().unwrap();
let physical_region_id = *state
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
(physical_region_id, data_region_id, primary_key_encoding)
};
self.verify_put_request(logical_region_id, physical_region_id, &request)
.await?;
// write to data region
// TODO(weny): retrieve the encoding from the metadata region.
let encoding = PrimaryKeyEncoding::Dense;
// TODO: retrieve table name
self.modify_rows(
physical_region_id,
logical_region_id.table_id(),
&mut request.rows,
encoding,
primary_key_encoding,
)?;
if encoding == PrimaryKeyEncoding::Sparse {
if primary_key_encoding == PrimaryKeyEncoding::Sparse {
request.hint = Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
});

View File

@@ -17,6 +17,7 @@
use std::collections::{HashMap, HashSet};
use snafu::OptionExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::storage::{ColumnId, RegionId};
@@ -28,17 +29,20 @@ use crate::utils::to_data_region_id;
pub struct PhysicalRegionState {
logical_regions: HashSet<RegionId>,
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
}
impl PhysicalRegionState {
pub fn new(
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
) -> Self {
Self {
logical_regions: HashSet::new(),
physical_columns,
primary_key_encoding,
options,
}
}
@@ -83,12 +87,13 @@ impl MetricEngineState {
&mut self,
physical_region_id: RegionId,
physical_columns: HashMap<String, ColumnId>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
) {
let physical_region_id = to_data_region_id(physical_region_id);
self.physical_regions.insert(
physical_region_id,
PhysicalRegionState::new(physical_columns, options),
PhysicalRegionState::new(physical_columns, primary_key_encoding, options),
);
}
@@ -148,6 +153,15 @@ impl MetricEngineState {
self.physical_regions.contains_key(&physical_region_id)
}
pub fn get_primary_key_encoding(
&self,
physical_region_id: RegionId,
) -> Option<PrimaryKeyEncoding> {
self.physical_regions
.get(&physical_region_id)
.map(|state| state.primary_key_encoding)
}
pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
&self.logical_regions
}

View File

@@ -73,6 +73,7 @@ use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
@@ -151,6 +152,14 @@ impl MitoEngine {
.map(|region| region.region_statistic())
}
/// Returns primary key encoding of the region.
pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
self.inner
.workers
.get_region(region_id)
.map(|r| r.primary_key_encoding())
}
/// Handle substrait query and return a stream of record batches
///
/// Notice that the output stream's ordering is not guranateed. If order

View File

@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
EncodeSparsePrimaryKey {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}",
region_id, expected_last_entry_id, replayed_last_entry_id
@@ -1024,7 +1031,7 @@ impl ErrorExt for Error {
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),

View File

@@ -15,10 +15,15 @@
use std::collections::HashMap;
use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use memcomparable::Deserializer;
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::RegionMetadata;
use store_api::storage::SequenceNumber;
use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE};
/// Key value view of a mutation.
#[derive(Debug)]
pub struct KeyValues {
@@ -29,6 +34,8 @@ pub struct KeyValues {
pub(crate) mutation: Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
/// Primary key encoding hint.
primary_key_encoding: PrimaryKeyEncoding,
}
impl KeyValues {
@@ -37,9 +44,15 @@ impl KeyValues {
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option<KeyValues> {
let rows = mutation.rows.as_ref()?;
let helper = SparseReadRowHelper::new(metadata, rows);
let primary_key_encoding =
infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
Some(KeyValues { mutation, helper })
Some(KeyValues {
mutation,
helper,
primary_key_encoding,
})
}
/// Returns a key value iterator.
@@ -54,6 +67,7 @@ impl KeyValues {
sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
// Safety: This is a valid mutation.
op_type: OpType::try_from(self.mutation.op_type).unwrap(),
primary_key_encoding: self.primary_key_encoding,
}
})
}
@@ -94,6 +108,8 @@ pub struct KeyValuesRef<'a> {
mutation: &'a Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
/// Primary key encoding hint.
primary_key_encoding: PrimaryKeyEncoding,
}
impl<'a> KeyValuesRef<'a> {
@@ -102,9 +118,15 @@ impl<'a> KeyValuesRef<'a> {
/// Returns `None` if `rows` of the `mutation` is `None`.
pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option<KeyValuesRef<'a>> {
let rows = mutation.rows.as_ref()?;
let helper = SparseReadRowHelper::new(metadata, rows);
let primary_key_encoding =
infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref());
let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding);
Some(KeyValuesRef { mutation, helper })
Some(KeyValuesRef {
mutation,
helper,
primary_key_encoding,
})
}
/// Returns a key value iterator.
@@ -119,6 +141,7 @@ impl<'a> KeyValuesRef<'a> {
sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row.
// Safety: This is a valid mutation.
op_type: OpType::try_from(self.mutation.op_type).unwrap(),
primary_key_encoding: self.primary_key_encoding,
}
})
}
@@ -143,9 +166,38 @@ pub struct KeyValue<'a> {
helper: &'a SparseReadRowHelper,
sequence: SequenceNumber,
op_type: OpType,
primary_key_encoding: PrimaryKeyEncoding,
}
impl KeyValue<'_> {
/// Returns primary key encoding.
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
self.primary_key_encoding
}
/// Returns the partition key.
pub fn partition_key(&self) -> u32 {
// TODO(yingwen): refactor this code
if self.primary_key_encoding == PrimaryKeyEncoding::Sparse {
let Some(primary_key) = self.primary_keys().next() else {
return 0;
};
let key = primary_key.as_binary().unwrap().unwrap();
let mut deserializer = Deserializer::new(key);
deserializer.advance(COLUMN_ID_ENCODE_SIZE);
let field = SortField::new(ConcreteDataType::uint32_datatype());
let table_id = field.deserialize(&mut deserializer).unwrap();
table_id.as_value_ref().as_u32().unwrap().unwrap()
} else {
let Some(value) = self.primary_keys().next() else {
return 0;
};
value.as_u32().unwrap().unwrap()
}
}
/// Get primary key columns.
pub fn primary_keys(&self) -> impl Iterator<Item = ValueRef> {
self.helper.indices[..self.helper.num_primary_key_column]
@@ -220,7 +272,25 @@ impl SparseReadRowHelper {
///
/// # Panics
/// Time index column must exist.
fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper {
fn new(
metadata: &RegionMetadata,
rows: &Rows,
primary_key_encoding: PrimaryKeyEncoding,
) -> SparseReadRowHelper {
if primary_key_encoding == PrimaryKeyEncoding::Sparse {
// We can skip build the indices for sparse primary key encoding.
// The order of the columns is encoded primary key, timestamp, field columns.
let indices = rows
.schema
.iter()
.enumerate()
.map(|(index, _)| Some(index))
.collect();
return SparseReadRowHelper {
indices,
num_primary_key_column: 1,
};
}
// Build a name to index mapping for rows.
let name_to_index: HashMap<_, _> = rows
.schema

View File

@@ -19,8 +19,6 @@ mod dedup;
mod dict;
mod merger;
mod partition;
// TODO(weny): remove this
#[allow(unused)]
mod primary_key_filter;
mod shard;
mod shard_builder;

View File

@@ -101,8 +101,14 @@ impl Partition {
inner.pk_to_pk_id.insert(sparse_key, pk_id);
}
PrimaryKeyEncoding::Sparse => {
// TODO(weny): support sparse primary key.
todo!()
let sparse_key = primary_key.clone();
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
}
}
} else {
@@ -287,11 +293,7 @@ impl Partition {
return PartitionKey::default();
}
let Some(value) = key_value.primary_keys().next() else {
return PartitionKey::default();
};
value.as_u32().unwrap().unwrap()
key_value.partition_key()
}
/// Returns true if the region can be partitioned.

View File

@@ -34,7 +34,6 @@ struct PrimaryKeyFilterInner {
impl PrimaryKeyFilterInner {
fn evaluate_filters(
&self,
pk: &[u8],
mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result<Value>,
) -> bool {
if self.filters.is_empty() || self.metadata.primary_key.is_empty() {
@@ -101,7 +100,7 @@ impl DensePrimaryKeyFilter {
impl PrimaryKeyFilter for DensePrimaryKeyFilter {
fn matches(&mut self, pk: &[u8]) -> bool {
self.offsets_buf.clear();
self.inner.evaluate_filters(pk, |column_id, metadata| {
self.inner.evaluate_filters(|column_id, metadata| {
// index of tag column in primary key
// Safety: A tag column is always in primary key.
let index = metadata.primary_key_index(column_id).unwrap();
@@ -135,7 +134,7 @@ impl SparsePrimaryKeyFilter {
impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
fn matches(&mut self, pk: &[u8]) -> bool {
self.offsets_map.clear();
self.inner.evaluate_filters(pk, |column_id, _| {
self.inner.evaluate_filters(|column_id, _| {
if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) {
self.codec.decode_value_at(pk, offset, column_id)
} else {
@@ -150,22 +149,13 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{col, lit, BinaryExpr};
use datafusion::physical_expr::create_physical_expr;
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion::logical_expr::BinaryExpr;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{Expr, Operator};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::{OrderedFloat, Value, ValueRef};
use datatypes::value::ValueRef;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ColumnId, RegionId};
use super::*;

View File

@@ -26,11 +26,14 @@ use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
use crate::error::{
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{
@@ -111,6 +114,26 @@ impl PartitionTree {
Ok(())
}
/// Encodes the given key value into a sparse primary key.
fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
// If the primary key encoding is sparse and already encoded in the metric engine,
// we only need to copy the encoded primary key into the destination buffer.
let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
return EncodeSparsePrimaryKeySnafu {
reason: "sparse primary key is not binary".to_string(),
}
.fail();
};
buffer.extend_from_slice(primary_key);
} else {
// For compatibility, use the sparse encoder for dense primary key.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), buffer)?;
}
Ok(())
}
// TODO(yingwen): The size computed from values is inaccurate.
/// Write key-values into the tree.
///
@@ -141,9 +164,7 @@ impl PartitionTree {
// Encode primary key.
pk_buffer.clear();
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}
@@ -185,9 +206,7 @@ impl PartitionTree {
// Encode primary key.
pk_buffer.clear();
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}

View File

@@ -26,6 +26,7 @@ use std::sync::{Arc, RwLock};
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
use snafu::{ensure, OptionExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
@@ -146,6 +147,12 @@ impl MitoRegion {
version_data.version.metadata.clone()
}
/// Returns primary key encoding of the region.
pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
let version_data = self.version_control.current();
version_data.version.metadata.primary_key_encoding
}
/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
let version_data = self.version_control.current();

View File

@@ -544,7 +544,12 @@ where
.as_ref()
.map(|rows| rows.rows.len())
.unwrap_or(0);
region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none());
region_write_ctx.push_mutation(
mutation.op_type,
mutation.rows,
mutation.write_hint,
OptionOutputTx::none(),
);
}
// set next_entry_id and write to memtable.

View File

@@ -15,7 +15,7 @@
use std::mem;
use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry};
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use snafu::ResultExt;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
@@ -131,13 +131,19 @@ impl RegionWriteCtx {
}
/// Push mutation to the context.
pub(crate) fn push_mutation(&mut self, op_type: i32, rows: Option<Rows>, tx: OptionOutputTx) {
pub(crate) fn push_mutation(
&mut self,
op_type: i32,
rows: Option<Rows>,
write_hint: Option<WriteHint>,
tx: OptionOutputTx,
) {
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
self.wal_entry.mutations.push(Mutation {
op_type,
sequence: self.next_sequence,
rows,
write_hint: None,
write_hint,
});
let notify = WriteNotify::new(tx, num_rows);

View File

@@ -23,13 +23,14 @@ use api::helper::{
ColumnDataTypeWrapper,
};
use api::v1::column_def::options_from_column_schema;
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
use common_telemetry::info;
use datatypes::prelude::DataType;
use prometheus::HistogramTimer;
use prost::Message;
use smallvec::SmallVec;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
@@ -63,6 +64,8 @@ pub struct WriteRequest {
name_to_index: HashMap<String, usize>,
/// Whether each column has null.
has_null: Vec<bool>,
/// Write hint.
pub hint: Option<WriteHint>,
}
impl WriteRequest {
@@ -112,9 +115,21 @@ impl WriteRequest {
rows,
name_to_index,
has_null,
hint: None,
})
}
/// Sets the write hint.
pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
self.hint = hint;
self
}
/// Returns the encoding hint.
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
infer_primary_key_encoding_from_hint(self.hint.as_ref())
}
/// Returns estimated size of the request.
pub(crate) fn estimated_size(&self) -> usize {
let row_size = self
@@ -548,7 +563,8 @@ impl WorkerRequest {
let (sender, receiver) = oneshot::channel();
let worker_request = match value {
RegionRequest::Put(v) => {
let write_request = WriteRequest::new(region_id, OpType::Put, v.rows)?;
let write_request =
WriteRequest::new(region_id, OpType::Put, v.rows)?.with_hint(v.hint);
WorkerRequest::Write(SenderWriteRequest {
sender: sender.into(),
request: write_request,

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;

View File

@@ -87,7 +87,7 @@ const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
/// The column id of the table id.
const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
/// The size of the column id in the encoded sparse row.
const COLUMN_ID_ENCODE_SIZE: usize = 4;
pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
impl SparsePrimaryKeyCodec {
/// Creates a new [`SparsePrimaryKeyCodec`] instance.

View File

@@ -117,7 +117,7 @@ pub struct Indexer {
impl Indexer {
/// Updates the index with the given batch.
pub async fn update(&mut self, batch: &Batch) {
pub async fn update(&mut self, batch: &mut Batch) {
self.do_update(batch).await;
self.flush_mem_metrics();

View File

@@ -455,10 +455,10 @@ mod tests {
.unwrap();
// push 20 rows
let batch = new_batch("tag1", 0..10);
indexer.update(&batch).await.unwrap();
let batch = new_batch("tag2", 10..20);
indexer.update(&batch).await.unwrap();
let mut batch = new_batch("tag1", 0..10);
indexer.update(&mut batch).await.unwrap();
let mut batch = new_batch("tag2", 10..20);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());

View File

@@ -127,7 +127,7 @@ impl BloomFilterIndexer {
/// Garbage will be cleaned up if failed to update.
///
/// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if self.creators.is_empty() {
@@ -189,14 +189,20 @@ impl BloomFilterIndexer {
self.do_cleanup().await
}
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
let mut guard = self.stats.record_update();
let n = batch.num_rows();
guard.inc_row_count(n);
// TODO(weny, zhenchi): lazy decode
let values = self.codec.decode(batch.primary_key())?;
if batch.pk_values().is_none() {
let values = self.codec.decode(batch.primary_key())?;
batch.set_pk_values(values);
}
// Safety: the primary key is decoded
let values = batch.pk_values().unwrap();
// Tags
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
let Some(creator) = self.creators.get_mut(col_id) else {
@@ -475,11 +481,11 @@ pub(crate) mod tests {
.unwrap();
// push 20 rows
let batch = new_batch("tag1", 0..10);
indexer.update(&batch).await.unwrap();
let mut batch = new_batch("tag1", 0..10);
indexer.update(&mut batch).await.unwrap();
let batch = new_batch("tag2", 10..20);
indexer.update(&batch).await.unwrap();
let mut batch = new_batch("tag2", 10..20);
indexer.update(&mut batch).await.unwrap();
let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let puffin_manager = factory.build(object_store);

View File

@@ -110,7 +110,7 @@ impl FulltextIndexer {
}
/// Updates the index with the given batch.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if let Err(update_err) = self.do_update(batch).await {
@@ -170,7 +170,7 @@ impl FulltextIndexer {
}
impl FulltextIndexer {
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
let mut guard = self.stats.record_update();
guard.inc_row_count(batch.num_rows());
@@ -217,7 +217,7 @@ struct SingleCreator {
}
impl SingleCreator {
async fn update(&mut self, batch: &Batch) -> Result<()> {
async fn update(&mut self, batch: &mut Batch) -> Result<()> {
let text_column = batch
.fields()
.iter()
@@ -511,8 +511,8 @@ mod tests {
.unwrap()
.unwrap();
let batch = new_batch(rows);
indexer.update(&batch).await.unwrap();
let mut batch = new_batch(rows);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();

View File

@@ -18,7 +18,7 @@ use crate::read::Batch;
use crate::sst::index::Indexer;
impl Indexer {
pub(crate) async fn do_update(&mut self, batch: &Batch) {
pub(crate) async fn do_update(&mut self, batch: &mut Batch) {
if batch.is_empty() {
return;
}
@@ -35,7 +35,7 @@ impl Indexer {
}
/// Returns false if the update failed.
async fn do_update_inverted_index(&mut self, batch: &Batch) -> bool {
async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool {
let Some(creator) = self.inverted_indexer.as_mut() else {
return true;
};
@@ -60,7 +60,7 @@ impl Indexer {
}
/// Returns false if the update failed.
async fn do_update_fulltext_index(&mut self, batch: &Batch) -> bool {
async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool {
let Some(creator) = self.fulltext_indexer.as_mut() else {
return true;
};
@@ -85,7 +85,7 @@ impl Indexer {
}
/// Returns false if the update failed.
async fn do_update_bloom_filter(&mut self, batch: &Batch) -> bool {
async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool {
let Some(creator) = self.bloom_filter_indexer.as_mut() else {
return true;
};

View File

@@ -119,7 +119,7 @@ impl InvertedIndexer {
/// Updates index with a batch of rows.
/// Garbage will be cleaned up if failed to update.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if batch.is_empty() {
@@ -177,14 +177,20 @@ impl InvertedIndexer {
self.do_cleanup().await
}
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
let mut guard = self.stats.record_update();
let n = batch.num_rows();
guard.inc_row_count(n);
// TODO(weny, zhenchi): lazy decode
let values = self.codec.decode(batch.primary_key())?;
if batch.pk_values().is_none() {
let values = self.codec.decode(batch.primary_key())?;
batch.set_pk_values(values);
}
// Safety: the primary key is decoded
let values = batch.pk_values().unwrap();
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
if !self.indexed_column_ids.contains(col_id) {
continue;
@@ -455,8 +461,8 @@ mod tests {
);
for (str_tag, i32_tag, u64_field) in &rows {
let batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
creator.update(&batch).await.unwrap();
let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
creator.update(&mut batch).await.unwrap();
}
let puffin_manager = factory.build(object_store.clone());

View File

@@ -126,9 +126,9 @@ where
.transpose()
{
match res {
Ok(batch) => {
Ok(mut batch) => {
stats.update(&batch);
self.indexer.update(&batch).await;
self.indexer.update(&mut batch).await;
}
Err(e) => {
self.indexer.abort().await;

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use api::v1::OpType;
use common_telemetry::debug;
use snafu::ensure;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
@@ -231,19 +232,24 @@ impl<S> RegionWorkerLoop<S> {
continue;
}
// Checks whether request schema is compatible with region schema.
if let Err(e) =
maybe_fill_missing_columns(&mut sender_req.request, &region_ctx.version().metadata)
{
sender_req.sender.send(Err(e));
// If the primary key is dense, we need to fill missing columns.
if sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense {
// Checks whether request schema is compatible with region schema.
if let Err(e) = maybe_fill_missing_columns(
&mut sender_req.request,
&region_ctx.version().metadata,
) {
sender_req.sender.send(Err(e));
continue;
continue;
}
}
// Collect requests by region.
region_ctx.push_mutation(
sender_req.request.op_type as i32,
Some(sender_req.request.rows),
sender_req.request.hint,
sender_req.sender,
);
}

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::WriteHint;
use serde::{Deserialize, Serialize};
use strum::Display;
/// Primary key encoding mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum PrimaryKeyEncoding {
#[default]
@@ -24,3 +26,18 @@ pub enum PrimaryKeyEncoding {
/// Sparse primary key encoding.
Sparse,
}
impl From<api::v1::PrimaryKeyEncoding> for PrimaryKeyEncoding {
fn from(value: api::v1::PrimaryKeyEncoding) -> Self {
match value {
api::v1::PrimaryKeyEncoding::Dense => PrimaryKeyEncoding::Dense,
api::v1::PrimaryKeyEncoding::Sparse => PrimaryKeyEncoding::Sparse,
}
}
}
/// Infer primary key encoding from hint.
pub fn infer_primary_key_encoding_from_hint(hint: Option<&WriteHint>) -> PrimaryKeyEncoding {
hint.map(|hint| PrimaryKeyEncoding::from(hint.primary_key_encoding()))
.unwrap_or(PrimaryKeyEncoding::Dense)
}

View File

@@ -43,6 +43,20 @@ pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window";
pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction";
/// Option key for twcs fallback to local.
pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local";
/// Option key for memtable type.
pub const MEMTABLE_TYPE: &str = "memtable.type";
/// Option key for memtable partition tree primary key encoding.
pub const MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING: &str =
"memtable.partition_tree.primary_key_encoding";
/// Option key for memtable partition tree index max keys per shard.
pub const MEMTABLE_PARTITION_TREE_INDEX_MAX_KEYS_PER_SHARD: &str =
"memtable.partition_tree.index_max_keys_per_shard";
/// Option key for memtable partition tree data freeze threshold.
pub const MEMTABLE_PARTITION_TREE_DATA_FREEZE_THRESHOLD: &str =
"memtable.partition_tree.data_freeze_threshold";
/// Option key for memtable partition tree fork dictionary bytes.
pub const MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES: &str =
"memtable.partition_tree.fork_dictionary_bytes";
/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
@@ -61,10 +75,11 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",
WAL_OPTIONS_KEY,
"memtable.type",
"memtable.partition_tree.index_max_keys_per_shard",
"memtable.partition_tree.data_freeze_threshold",
"memtable.partition_tree.fork_dictionary_bytes",
MEMTABLE_TYPE,
MEMTABLE_PARTITION_TREE_INDEX_MAX_KEYS_PER_SHARD,
MEMTABLE_PARTITION_TREE_DATA_FREEZE_THRESHOLD,
MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES,
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
APPEND_MODE_KEY,
MERGE_MODE_KEY,
]

View File

@@ -77,3 +77,107 @@ DROP TABLE phy;
Affected Rows: 0
CREATE TABLE phy (
ts timestamp time index,
val double
) engine = metric with (
"physical_metric_table" = "",
"memtable.type" = "partition_tree",
"memtable.partition_tree.primary_key_encoding" = "sparse"
);
Affected Rows: 0
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
Affected Rows: 2
SELECT * from t1;
+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 1.0 |
| host1 | 1970-01-01T00:00:00 | 0.0 |
+-------+-------------------------+-----+
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
SELECT * from t2;
++
++
INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
Affected Rows: 2
SELECT * from t2;
+------+-------------------------+-----+
| job | ts | val |
+------+-------------------------+-----+
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
| job1 | 1970-01-01T00:00:00 | 0.0 |
+------+-------------------------+-----+
ADMIN flush_table("phy");
Error: 1004(InvalidArguments), Failed to build admin function args: unsupported function arg "phy"
-- SQLNESS ARG restart=true
INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1);
Affected Rows: 2
SELECT * from t1;
+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 1.0 |
| host1 | 1970-01-01T00:00:00 | 0.0 |
+-------+-------------------------+-----+
SELECT * from t2;
+------+-------------------------+-----+
| job | ts | val |
+------+-------------------------+-----+
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
| job3 | 1970-01-01T00:00:00 | 0.0 |
| job4 | 1970-01-01T00:00:00.001 | 1.0 |
| job1 | 1970-01-01T00:00:00 | 0.0 |
+------+-------------------------+-----+
DROP TABLE t1;
Affected Rows: 0
DROP TABLE t2;
Affected Rows: 0
DESC TABLE phy;
+------------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+------------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
| __table_id | UInt32 | PRI | NO | | TAG |
| __tsid | UInt64 | PRI | NO | | TAG |
| host | String | PRI | YES | | TAG |
| job | String | PRI | YES | | TAG |
+------------+----------------------+-----+------+---------+---------------+
DROP TABLE phy;
Affected Rows: 0

View File

@@ -23,3 +23,43 @@ DESC TABLE phy;
SELECT ts, val, __tsid, host, job FROM phy;
DROP TABLE phy;
CREATE TABLE phy (
ts timestamp time index,
val double
) engine = metric with (
"physical_metric_table" = "",
"memtable.type" = "partition_tree",
"memtable.partition_tree.primary_key_encoding" = "sparse"
);
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
SELECT * from t1;
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT * from t2;
INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
SELECT * from t2;
ADMIN flush_table("phy");
-- SQLNESS ARG restart=true
INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1);
SELECT * from t1;
SELECT * from t2;
DROP TABLE t1;
DROP TABLE t2;
DESC TABLE phy;
DROP TABLE phy;