diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index b4f22ccff9..3ada60f824 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -309,8 +309,6 @@ struct MetricEngineInner { metadata_region: MetadataRegion, data_region: DataRegion, state: RwLock, - /// TODO(weny): remove it after the config is used. - #[allow(unused)] config: EngineConfig, row_modifier: RowModifier, } diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index 78204e7225..44713f0bc4 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -77,7 +77,12 @@ impl MetricEngineInner { .context(MitoCatchupOperationSnafu) .map(|response| response.affected_rows)?; - self.recover_states(region_id, physical_region_options) + let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )?; + self.recover_states(region_id, primary_key_encoding, physical_region_options) .await?; Ok(0) } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index d3d3cc87ec..2e77a49e60 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -32,7 +32,9 @@ use store_api::metric_engine_consts::{ METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, }; -use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; +use store_api::mito_engine_options::{ + APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, TTL_KEY, +}; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; @@ -122,14 +124,20 @@ impl MetricEngineInner { .with_context(|_| CreateMitoRegionSnafu { region_type: DATA_REGION_SUBDIR, })?; + let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )?; - info!("Created physical metric region {region_id}"); + info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}"); PHYSICAL_REGION_COUNT.inc(); // remember this table self.state.write().unwrap().add_physical_region( data_region_id, physical_columns, + primary_key_encoding, physical_region_options, ); @@ -516,7 +524,10 @@ impl MetricEngineInner { data_region_request.primary_key = primary_key; // set data region options - set_data_region_options(&mut data_region_request.options); + set_data_region_options( + &mut data_region_request.options, + self.config.experimental_sparse_primary_key_encoding, + ); data_region_request } @@ -555,6 +566,8 @@ pub(crate) fn region_options_for_metadata_region( ) -> HashMap { // TODO(ruihang, weny): add whitelist for metric engine options. original.remove(APPEND_MODE_KEY); + // Don't allow to set primary key encoding for metadata region. + original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING); original.insert(TTL_KEY.to_string(), FOREVER.to_string()); original } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 441ebfeafb..37c2f881af 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -17,7 +17,8 @@ use common_telemetry::info; use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; @@ -26,7 +27,7 @@ use store_api::storage::RegionId; use super::MetricEngineInner; use crate::engine::create::region_options_for_metadata_region; use crate::engine::options::{set_data_region_options, PhysicalRegionOptions}; -use crate::error::{OpenMitoRegionSnafu, Result}; +use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result}; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; @@ -49,7 +50,13 @@ impl MetricEngineInner { // open physical region and recover states let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?; self.open_physical_region(region_id, request).await?; - self.recover_states(region_id, physical_region_options) + let data_region_id = utils::to_data_region_id(region_id); + let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )?; + self.recover_states(region_id, primary_key_encoding, physical_region_options) .await?; Ok(0) @@ -80,7 +87,10 @@ impl MetricEngineInner { }; let mut data_region_options = request.options; - set_data_region_options(&mut data_region_options); + set_data_region_options( + &mut data_region_options, + self.config.experimental_sparse_primary_key_encoding, + ); let open_data_region_request = RegionOpenRequest { region_dir: data_region_dir, options: data_region_options, @@ -125,6 +135,7 @@ impl MetricEngineInner { pub(crate) async fn recover_states( &self, physical_region_id: RegionId, + primary_key_encoding: PrimaryKeyEncoding, physical_region_options: PhysicalRegionOptions, ) -> Result<()> { // load logical regions and physical column names @@ -148,6 +159,7 @@ impl MetricEngineInner { state.add_physical_region( physical_region_id, physical_columns, + primary_key_encoding, physical_region_options, ); // recover logical regions diff --git a/src/metric-engine/src/engine/options.rs b/src/metric-engine/src/engine/options.rs index d52272ea81..3d0fa56104 100644 --- a/src/metric-engine/src/engine/options.rs +++ b/src/metric-engine/src/engine/options.rs @@ -20,6 +20,7 @@ use store_api::metric_engine_consts::{ METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION, METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT, METRIC_ENGINE_INDEX_TYPE_OPTION, }; +use store_api::mito_engine_options::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING; use crate::error::{Error, ParseRegionOptionsSnafu, Result}; @@ -46,7 +47,10 @@ pub enum IndexOptions { } /// Sets data region specific options. -pub fn set_data_region_options(options: &mut HashMap) { +pub fn set_data_region_options( + options: &mut HashMap, + sparse_primary_key_encoding_if_absent: bool, +) { options.remove(METRIC_ENGINE_INDEX_TYPE_OPTION); options.remove(METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION); options.insert( @@ -55,6 +59,14 @@ pub fn set_data_region_options(options: &mut HashMap) { ); // Set memtable options for the data region. options.insert("memtable.type".to_string(), "partition_tree".to_string()); + if sparse_primary_key_encoding_if_absent + && !options.contains_key(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING) + { + options.insert( + MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(), + "sparse".to_string(), + ); + } } impl TryFrom<&HashMap> for PhysicalRegionOptions { @@ -108,7 +120,7 @@ mod tests { METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION.to_string(), "102400".to_string(), ); - set_data_region_options(&mut options); + set_data_region_options(&mut options, false); for key in [ METRIC_ENGINE_INDEX_TYPE_OPTION, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index a2d53d0876..c7dc44be1e 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -59,33 +59,38 @@ impl MetricEngineInner { .with_label_values(&["put"]) .start_timer(); - let physical_region_id = *self - .state - .read() - .unwrap() - .logical_regions() - .get(&logical_region_id) - .with_context(|| LogicalRegionNotFoundSnafu { - region_id: logical_region_id, - })?; - let data_region_id = to_data_region_id(physical_region_id); + let (physical_region_id, data_region_id, primary_key_encoding) = { + let state = self.state.read().unwrap(); + let physical_region_id = *state + .logical_regions() + .get(&logical_region_id) + .with_context(|| LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + })?; + let data_region_id = to_data_region_id(physical_region_id); + + let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )?; + + (physical_region_id, data_region_id, primary_key_encoding) + }; self.verify_put_request(logical_region_id, physical_region_id, &request) .await?; // write to data region - // TODO(weny): retrieve the encoding from the metadata region. - let encoding = PrimaryKeyEncoding::Dense; - // TODO: retrieve table name self.modify_rows( physical_region_id, logical_region_id.table_id(), &mut request.rows, - encoding, + primary_key_encoding, )?; - if encoding == PrimaryKeyEncoding::Sparse { + if primary_key_encoding == PrimaryKeyEncoding::Sparse { request.hint = Some(WriteHint { primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(), }); diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 7a21788583..0e22d436fa 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use snafu::OptionExt; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::ColumnMetadata; use store_api::storage::{ColumnId, RegionId}; @@ -28,17 +29,20 @@ use crate::utils::to_data_region_id; pub struct PhysicalRegionState { logical_regions: HashSet, physical_columns: HashMap, + primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, } impl PhysicalRegionState { pub fn new( physical_columns: HashMap, + primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, ) -> Self { Self { logical_regions: HashSet::new(), physical_columns, + primary_key_encoding, options, } } @@ -83,12 +87,13 @@ impl MetricEngineState { &mut self, physical_region_id: RegionId, physical_columns: HashMap, + primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, ) { let physical_region_id = to_data_region_id(physical_region_id); self.physical_regions.insert( physical_region_id, - PhysicalRegionState::new(physical_columns, options), + PhysicalRegionState::new(physical_columns, primary_key_encoding, options), ); } @@ -148,6 +153,15 @@ impl MetricEngineState { self.physical_regions.contains_key(&physical_region_id) } + pub fn get_primary_key_encoding( + &self, + physical_region_id: RegionId, + ) -> Option { + self.physical_regions + .get(&physical_region_id) + .map(|state| state.primary_key_encoding) + } + pub fn logical_regions(&self) -> &HashMap { &self.logical_regions } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 3809a6f2a3..77d33178c9 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -73,6 +73,7 @@ use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use futures::future::{join_all, try_join_all}; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; @@ -151,6 +152,14 @@ impl MitoEngine { .map(|region| region.region_statistic()) } + /// Returns primary key encoding of the region. + pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option { + self.inner + .workers + .get_region(region_id) + .map(|r| r.primary_key_encoding()) + } + /// Handle substrait query and return a stream of record batches /// /// Notice that the output stream's ordering is not guranateed. If order diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 235ed4ca0b..6860abc082 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -42,6 +42,13 @@ use crate::worker::WorkerId; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to encode sparse primary key, reason: {}", reason))] + EncodeSparsePrimaryKey { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", region_id, expected_last_entry_id, replayed_last_entry_id @@ -1024,7 +1031,7 @@ impl ErrorExt for Error { WriteGroup { source, .. } => source.status_code(), FieldTypeMismatch { source, .. } => source.status_code(), NotSupportedField { .. } => StatusCode::Unsupported, - DeserializeField { .. } => StatusCode::Unexpected, + DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected, InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 1567a6b6d8..a6af39a306 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -15,10 +15,15 @@ use std::collections::HashMap; use api::v1::{ColumnSchema, Mutation, OpType, Row, Rows}; +use datatypes::prelude::ConcreteDataType; use datatypes::value::ValueRef; +use memcomparable::Deserializer; +use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding}; use store_api::metadata::RegionMetadata; use store_api::storage::SequenceNumber; +use crate::row_converter::{SortField, COLUMN_ID_ENCODE_SIZE}; + /// Key value view of a mutation. #[derive(Debug)] pub struct KeyValues { @@ -29,6 +34,8 @@ pub struct KeyValues { pub(crate) mutation: Mutation, /// Key value read helper. helper: SparseReadRowHelper, + /// Primary key encoding hint. + primary_key_encoding: PrimaryKeyEncoding, } impl KeyValues { @@ -37,9 +44,15 @@ impl KeyValues { /// Returns `None` if `rows` of the `mutation` is `None`. pub fn new(metadata: &RegionMetadata, mutation: Mutation) -> Option { let rows = mutation.rows.as_ref()?; - let helper = SparseReadRowHelper::new(metadata, rows); + let primary_key_encoding = + infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref()); + let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding); - Some(KeyValues { mutation, helper }) + Some(KeyValues { + mutation, + helper, + primary_key_encoding, + }) } /// Returns a key value iterator. @@ -54,6 +67,7 @@ impl KeyValues { sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. // Safety: This is a valid mutation. op_type: OpType::try_from(self.mutation.op_type).unwrap(), + primary_key_encoding: self.primary_key_encoding, } }) } @@ -94,6 +108,8 @@ pub struct KeyValuesRef<'a> { mutation: &'a Mutation, /// Key value read helper. helper: SparseReadRowHelper, + /// Primary key encoding hint. + primary_key_encoding: PrimaryKeyEncoding, } impl<'a> KeyValuesRef<'a> { @@ -102,9 +118,15 @@ impl<'a> KeyValuesRef<'a> { /// Returns `None` if `rows` of the `mutation` is `None`. pub fn new(metadata: &RegionMetadata, mutation: &'a Mutation) -> Option> { let rows = mutation.rows.as_ref()?; - let helper = SparseReadRowHelper::new(metadata, rows); + let primary_key_encoding = + infer_primary_key_encoding_from_hint(mutation.write_hint.as_ref()); + let helper = SparseReadRowHelper::new(metadata, rows, primary_key_encoding); - Some(KeyValuesRef { mutation, helper }) + Some(KeyValuesRef { + mutation, + helper, + primary_key_encoding, + }) } /// Returns a key value iterator. @@ -119,6 +141,7 @@ impl<'a> KeyValuesRef<'a> { sequence: self.mutation.sequence + idx as u64, // Calculate sequence for each row. // Safety: This is a valid mutation. op_type: OpType::try_from(self.mutation.op_type).unwrap(), + primary_key_encoding: self.primary_key_encoding, } }) } @@ -143,9 +166,38 @@ pub struct KeyValue<'a> { helper: &'a SparseReadRowHelper, sequence: SequenceNumber, op_type: OpType, + primary_key_encoding: PrimaryKeyEncoding, } impl KeyValue<'_> { + /// Returns primary key encoding. + pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding { + self.primary_key_encoding + } + + /// Returns the partition key. + pub fn partition_key(&self) -> u32 { + // TODO(yingwen): refactor this code + if self.primary_key_encoding == PrimaryKeyEncoding::Sparse { + let Some(primary_key) = self.primary_keys().next() else { + return 0; + }; + let key = primary_key.as_binary().unwrap().unwrap(); + + let mut deserializer = Deserializer::new(key); + deserializer.advance(COLUMN_ID_ENCODE_SIZE); + let field = SortField::new(ConcreteDataType::uint32_datatype()); + let table_id = field.deserialize(&mut deserializer).unwrap(); + table_id.as_value_ref().as_u32().unwrap().unwrap() + } else { + let Some(value) = self.primary_keys().next() else { + return 0; + }; + + value.as_u32().unwrap().unwrap() + } + } + /// Get primary key columns. pub fn primary_keys(&self) -> impl Iterator { self.helper.indices[..self.helper.num_primary_key_column] @@ -220,7 +272,25 @@ impl SparseReadRowHelper { /// /// # Panics /// Time index column must exist. - fn new(metadata: &RegionMetadata, rows: &Rows) -> SparseReadRowHelper { + fn new( + metadata: &RegionMetadata, + rows: &Rows, + primary_key_encoding: PrimaryKeyEncoding, + ) -> SparseReadRowHelper { + if primary_key_encoding == PrimaryKeyEncoding::Sparse { + // We can skip build the indices for sparse primary key encoding. + // The order of the columns is encoded primary key, timestamp, field columns. + let indices = rows + .schema + .iter() + .enumerate() + .map(|(index, _)| Some(index)) + .collect(); + return SparseReadRowHelper { + indices, + num_primary_key_column: 1, + }; + } // Build a name to index mapping for rows. let name_to_index: HashMap<_, _> = rows .schema diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 78a8b7d847..d790b65f6b 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -19,8 +19,6 @@ mod dedup; mod dict; mod merger; mod partition; -// TODO(weny): remove this -#[allow(unused)] mod primary_key_filter; mod shard; mod shard_builder; diff --git a/src/mito2/src/memtable/partition_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs index 75af79f145..781a287eae 100644 --- a/src/mito2/src/memtable/partition_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -101,8 +101,14 @@ impl Partition { inner.pk_to_pk_id.insert(sparse_key, pk_id); } PrimaryKeyEncoding::Sparse => { - // TODO(weny): support sparse primary key. - todo!() + let sparse_key = primary_key.clone(); + let pk_id = inner.shard_builder.write_with_key( + primary_key, + Some(&sparse_key), + &key_value, + metrics, + ); + inner.pk_to_pk_id.insert(sparse_key, pk_id); } } } else { @@ -287,11 +293,7 @@ impl Partition { return PartitionKey::default(); } - let Some(value) = key_value.primary_keys().next() else { - return PartitionKey::default(); - }; - - value.as_u32().unwrap().unwrap() + key_value.partition_key() } /// Returns true if the region can be partitioned. diff --git a/src/mito2/src/memtable/partition_tree/primary_key_filter.rs b/src/mito2/src/memtable/partition_tree/primary_key_filter.rs index ce4873ab48..3002df6d6a 100644 --- a/src/mito2/src/memtable/partition_tree/primary_key_filter.rs +++ b/src/mito2/src/memtable/partition_tree/primary_key_filter.rs @@ -34,7 +34,6 @@ struct PrimaryKeyFilterInner { impl PrimaryKeyFilterInner { fn evaluate_filters( &self, - pk: &[u8], mut decode_fn: impl FnMut(ColumnId, &RegionMetadataRef) -> Result, ) -> bool { if self.filters.is_empty() || self.metadata.primary_key.is_empty() { @@ -101,7 +100,7 @@ impl DensePrimaryKeyFilter { impl PrimaryKeyFilter for DensePrimaryKeyFilter { fn matches(&mut self, pk: &[u8]) -> bool { self.offsets_buf.clear(); - self.inner.evaluate_filters(pk, |column_id, metadata| { + self.inner.evaluate_filters(|column_id, metadata| { // index of tag column in primary key // Safety: A tag column is always in primary key. let index = metadata.primary_key_index(column_id).unwrap(); @@ -135,7 +134,7 @@ impl SparsePrimaryKeyFilter { impl PrimaryKeyFilter for SparsePrimaryKeyFilter { fn matches(&mut self, pk: &[u8]) -> bool { self.offsets_map.clear(); - self.inner.evaluate_filters(pk, |column_id, _| { + self.inner.evaluate_filters(|column_id, _| { if let Some(offset) = self.codec.has_column(pk, &mut self.offsets_map, column_id) { self.codec.decode_value_at(pk, offset, column_id) } else { @@ -150,22 +149,13 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; - use common_time::timestamp::TimeUnit; - use common_time::Timestamp; - use datafusion::execution::context::ExecutionProps; - use datafusion::logical_expr::{col, lit, BinaryExpr}; - use datafusion::physical_expr::create_physical_expr; - use datafusion_common::{Column, DFSchema, ScalarValue}; + use datafusion::logical_expr::BinaryExpr; + use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{Expr, Operator}; - use datatypes::arrow::datatypes::{DataType, Field, Schema}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; - use datatypes::value::{OrderedFloat, Value, ValueRef}; + use datatypes::value::ValueRef; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; - use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, - }; - use store_api::storage::consts::ReservedColumnId; use store_api::storage::{ColumnId, RegionId}; use super::*; diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 4645ca7ab9..df50c8934d 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -26,11 +26,14 @@ use datatypes::prelude::ValueRef; use memcomparable::Serializer; use serde::Serialize; use snafu::{ensure, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; -use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu}; +use crate::error::{ + EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu, +}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::partition::{ @@ -111,6 +114,26 @@ impl PartitionTree { Ok(()) } + /// Encodes the given key value into a sparse primary key. + fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec) -> Result<()> { + if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse { + // If the primary key encoding is sparse and already encoded in the metric engine, + // we only need to copy the encoded primary key into the destination buffer. + let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else { + return EncodeSparsePrimaryKeySnafu { + reason: "sparse primary key is not binary".to_string(), + } + .fail(); + }; + buffer.extend_from_slice(primary_key); + } else { + // For compatibility, use the sparse encoder for dense primary key. + self.sparse_encoder + .encode_to_vec(kv.primary_keys(), buffer)?; + } + Ok(()) + } + // TODO(yingwen): The size computed from values is inaccurate. /// Write key-values into the tree. /// @@ -141,9 +164,7 @@ impl PartitionTree { // Encode primary key. pk_buffer.clear(); if self.is_partitioned { - // Use sparse encoder for metric engine. - self.sparse_encoder - .encode_to_vec(kv.primary_keys(), pk_buffer)?; + self.encode_sparse_primary_key(&kv, pk_buffer)?; } else { self.row_codec.encode_key_value(&kv, pk_buffer)?; } @@ -185,9 +206,7 @@ impl PartitionTree { // Encode primary key. pk_buffer.clear(); if self.is_partitioned { - // Use sparse encoder for metric engine. - self.sparse_encoder - .encode_to_vec(kv.primary_keys(), pk_buffer)?; + self.encode_sparse_primary_key(&kv, pk_buffer)?; } else { self.row_codec.encode_key_value(&kv, pk_buffer)?; } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 0a246b393f..90d2f55e5e 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -26,6 +26,7 @@ use std::sync::{Arc, RwLock}; use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; use snafu::{ensure, OptionExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; @@ -146,6 +147,12 @@ impl MitoRegion { version_data.version.metadata.clone() } + /// Returns primary key encoding of the region. + pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding { + let version_data = self.version_control.current(); + version_data.version.metadata.primary_key_encoding + } + /// Returns current version of the region. pub(crate) fn version(&self) -> VersionRef { let version_data = self.version_control.current(); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 085dd5ba08..1071a2ffb2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -544,7 +544,12 @@ where .as_ref() .map(|rows| rows.rows.len()) .unwrap_or(0); - region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none()); + region_write_ctx.push_mutation( + mutation.op_type, + mutation.rows, + mutation.write_hint, + OptionOutputTx::none(), + ); } // set next_entry_id and write to memtable. diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 4d1cef2340..0047822b4d 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -15,7 +15,7 @@ use std::mem; use std::sync::Arc; -use api::v1::{Mutation, OpType, Rows, WalEntry}; +use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint}; use snafu::ResultExt; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; @@ -131,13 +131,19 @@ impl RegionWriteCtx { } /// Push mutation to the context. - pub(crate) fn push_mutation(&mut self, op_type: i32, rows: Option, tx: OptionOutputTx) { + pub(crate) fn push_mutation( + &mut self, + op_type: i32, + rows: Option, + write_hint: Option, + tx: OptionOutputTx, + ) { let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0); self.wal_entry.mutations.push(Mutation { op_type, sequence: self.next_sequence, rows, - write_hint: None, + write_hint, }); let notify = WriteNotify::new(tx, num_rows); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 19add6b3db..574eb9e19a 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -23,13 +23,14 @@ use api::helper::{ ColumnDataTypeWrapper, }; use api::v1::column_def::options_from_column_schema; -use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; +use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint}; use common_telemetry::info; use datatypes::prelude::DataType; use prometheus::HistogramTimer; use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding}; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{ @@ -63,6 +64,8 @@ pub struct WriteRequest { name_to_index: HashMap, /// Whether each column has null. has_null: Vec, + /// Write hint. + pub hint: Option, } impl WriteRequest { @@ -112,9 +115,21 @@ impl WriteRequest { rows, name_to_index, has_null, + hint: None, }) } + /// Sets the write hint. + pub fn with_hint(mut self, hint: Option) -> Self { + self.hint = hint; + self + } + + /// Returns the encoding hint. + pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding { + infer_primary_key_encoding_from_hint(self.hint.as_ref()) + } + /// Returns estimated size of the request. pub(crate) fn estimated_size(&self) -> usize { let row_size = self @@ -548,7 +563,8 @@ impl WorkerRequest { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { - let write_request = WriteRequest::new(region_id, OpType::Put, v.rows)?; + let write_request = + WriteRequest::new(region_id, OpType::Put, v.rows)?.with_hint(v.hint); WorkerRequest::Write(SenderWriteRequest { sender: sender.into(), request: write_request, diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 4d0635d3cc..e9fd1a5539 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::value::{Value, ValueRef}; pub use dense::{DensePrimaryKeyCodec, SortField}; -pub use sparse::{SparsePrimaryKeyCodec, SparseValues}; +pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito2/src/row_converter/sparse.rs index 91a5623110..92c69acb9b 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito2/src/row_converter/sparse.rs @@ -87,7 +87,7 @@ const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid(); /// The column id of the table id. const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id(); /// The size of the column id in the encoded sparse row. -const COLUMN_ID_ENCODE_SIZE: usize = 4; +pub const COLUMN_ID_ENCODE_SIZE: usize = 4; impl SparsePrimaryKeyCodec { /// Creates a new [`SparsePrimaryKeyCodec`] instance. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 4d45e21a62..125a639d8a 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -117,7 +117,7 @@ pub struct Indexer { impl Indexer { /// Updates the index with the given batch. - pub async fn update(&mut self, batch: &Batch) { + pub async fn update(&mut self, batch: &mut Batch) { self.do_update(batch).await; self.flush_mem_metrics(); diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 31b495c691..7807434592 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -455,10 +455,10 @@ mod tests { .unwrap(); // push 20 rows - let batch = new_batch("tag1", 0..10); - indexer.update(&batch).await.unwrap(); - let batch = new_batch("tag2", 10..20); - indexer.update(&batch).await.unwrap(); + let mut batch = new_batch("tag1", 0..10); + indexer.update(&mut batch).await.unwrap(); + let mut batch = new_batch("tag2", 10..20); + indexer.update(&mut batch).await.unwrap(); let puffin_manager = factory.build(object_store.clone()); diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 3dfe15dfd5..9dde4d851f 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -127,7 +127,7 @@ impl BloomFilterIndexer { /// Garbage will be cleaned up if failed to update. /// /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` - pub async fn update(&mut self, batch: &Batch) -> Result<()> { + pub async fn update(&mut self, batch: &mut Batch) -> Result<()> { ensure!(!self.aborted, OperateAbortedIndexSnafu); if self.creators.is_empty() { @@ -189,14 +189,20 @@ impl BloomFilterIndexer { self.do_cleanup().await } - async fn do_update(&mut self, batch: &Batch) -> Result<()> { + async fn do_update(&mut self, batch: &mut Batch) -> Result<()> { let mut guard = self.stats.record_update(); let n = batch.num_rows(); guard.inc_row_count(n); // TODO(weny, zhenchi): lazy decode - let values = self.codec.decode(batch.primary_key())?; + if batch.pk_values().is_none() { + let values = self.codec.decode(batch.primary_key())?; + batch.set_pk_values(values); + } + + // Safety: the primary key is decoded + let values = batch.pk_values().unwrap(); // Tags for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { let Some(creator) = self.creators.get_mut(col_id) else { @@ -475,11 +481,11 @@ pub(crate) mod tests { .unwrap(); // push 20 rows - let batch = new_batch("tag1", 0..10); - indexer.update(&batch).await.unwrap(); + let mut batch = new_batch("tag1", 0..10); + indexer.update(&mut batch).await.unwrap(); - let batch = new_batch("tag2", 10..20); - indexer.update(&batch).await.unwrap(); + let mut batch = new_batch("tag2", 10..20); + indexer.update(&mut batch).await.unwrap(); let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let puffin_manager = factory.build(object_store); diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 3275f00a14..28b77fdf44 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -110,7 +110,7 @@ impl FulltextIndexer { } /// Updates the index with the given batch. - pub async fn update(&mut self, batch: &Batch) -> Result<()> { + pub async fn update(&mut self, batch: &mut Batch) -> Result<()> { ensure!(!self.aborted, OperateAbortedIndexSnafu); if let Err(update_err) = self.do_update(batch).await { @@ -170,7 +170,7 @@ impl FulltextIndexer { } impl FulltextIndexer { - async fn do_update(&mut self, batch: &Batch) -> Result<()> { + async fn do_update(&mut self, batch: &mut Batch) -> Result<()> { let mut guard = self.stats.record_update(); guard.inc_row_count(batch.num_rows()); @@ -217,7 +217,7 @@ struct SingleCreator { } impl SingleCreator { - async fn update(&mut self, batch: &Batch) -> Result<()> { + async fn update(&mut self, batch: &mut Batch) -> Result<()> { let text_column = batch .fields() .iter() @@ -511,8 +511,8 @@ mod tests { .unwrap() .unwrap(); - let batch = new_batch(rows); - indexer.update(&batch).await.unwrap(); + let mut batch = new_batch(rows); + indexer.update(&mut batch).await.unwrap(); let puffin_manager = factory.build(object_store.clone()); let mut writer = puffin_manager.writer(&file_path).await.unwrap(); diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index c2ab33f0e1..42306673be 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -18,7 +18,7 @@ use crate::read::Batch; use crate::sst::index::Indexer; impl Indexer { - pub(crate) async fn do_update(&mut self, batch: &Batch) { + pub(crate) async fn do_update(&mut self, batch: &mut Batch) { if batch.is_empty() { return; } @@ -35,7 +35,7 @@ impl Indexer { } /// Returns false if the update failed. - async fn do_update_inverted_index(&mut self, batch: &Batch) -> bool { + async fn do_update_inverted_index(&mut self, batch: &mut Batch) -> bool { let Some(creator) = self.inverted_indexer.as_mut() else { return true; }; @@ -60,7 +60,7 @@ impl Indexer { } /// Returns false if the update failed. - async fn do_update_fulltext_index(&mut self, batch: &Batch) -> bool { + async fn do_update_fulltext_index(&mut self, batch: &mut Batch) -> bool { let Some(creator) = self.fulltext_indexer.as_mut() else { return true; }; @@ -85,7 +85,7 @@ impl Indexer { } /// Returns false if the update failed. - async fn do_update_bloom_filter(&mut self, batch: &Batch) -> bool { + async fn do_update_bloom_filter(&mut self, batch: &mut Batch) -> bool { let Some(creator) = self.bloom_filter_indexer.as_mut() else { return true; }; diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 7903f2a496..71edc56e07 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -119,7 +119,7 @@ impl InvertedIndexer { /// Updates index with a batch of rows. /// Garbage will be cleaned up if failed to update. - pub async fn update(&mut self, batch: &Batch) -> Result<()> { + pub async fn update(&mut self, batch: &mut Batch) -> Result<()> { ensure!(!self.aborted, OperateAbortedIndexSnafu); if batch.is_empty() { @@ -177,14 +177,20 @@ impl InvertedIndexer { self.do_cleanup().await } - async fn do_update(&mut self, batch: &Batch) -> Result<()> { + async fn do_update(&mut self, batch: &mut Batch) -> Result<()> { let mut guard = self.stats.record_update(); let n = batch.num_rows(); guard.inc_row_count(n); // TODO(weny, zhenchi): lazy decode - let values = self.codec.decode(batch.primary_key())?; + if batch.pk_values().is_none() { + let values = self.codec.decode(batch.primary_key())?; + batch.set_pk_values(values); + } + + // Safety: the primary key is decoded + let values = batch.pk_values().unwrap(); for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { if !self.indexed_column_ids.contains(col_id) { continue; @@ -455,8 +461,8 @@ mod tests { ); for (str_tag, i32_tag, u64_field) in &rows { - let batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied()); - creator.update(&batch).await.unwrap(); + let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied()); + creator.update(&mut batch).await.unwrap(); } let puffin_manager = factory.build(object_store.clone()); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13f7cfb3ec..01baf2df95 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -126,9 +126,9 @@ where .transpose() { match res { - Ok(batch) => { + Ok(mut batch) => { stats.update(&batch); - self.indexer.update(&batch).await; + self.indexer.update(&mut batch).await; } Err(e) => { self.indexer.abort().await; diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 7dccb6952a..5f9fdd698c 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use api::v1::OpType; use common_telemetry::debug; use snafu::ensure; +use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; @@ -231,19 +232,24 @@ impl RegionWorkerLoop { continue; } - // Checks whether request schema is compatible with region schema. - if let Err(e) = - maybe_fill_missing_columns(&mut sender_req.request, ®ion_ctx.version().metadata) - { - sender_req.sender.send(Err(e)); + // If the primary key is dense, we need to fill missing columns. + if sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense { + // Checks whether request schema is compatible with region schema. + if let Err(e) = maybe_fill_missing_columns( + &mut sender_req.request, + ®ion_ctx.version().metadata, + ) { + sender_req.sender.send(Err(e)); - continue; + continue; + } } // Collect requests by region. region_ctx.push_mutation( sender_req.request.op_type as i32, Some(sender_req.request.rows), + sender_req.request.hint, sender_req.sender, ); } diff --git a/src/store-api/src/codec.rs b/src/store-api/src/codec.rs index 6ddcb6d178..6b668ebae3 100644 --- a/src/store-api/src/codec.rs +++ b/src/store-api/src/codec.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::WriteHint; use serde::{Deserialize, Serialize}; +use strum::Display; /// Primary key encoding mode. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, Display)] #[serde(rename_all = "snake_case")] pub enum PrimaryKeyEncoding { #[default] @@ -24,3 +26,18 @@ pub enum PrimaryKeyEncoding { /// Sparse primary key encoding. Sparse, } + +impl From for PrimaryKeyEncoding { + fn from(value: api::v1::PrimaryKeyEncoding) -> Self { + match value { + api::v1::PrimaryKeyEncoding::Dense => PrimaryKeyEncoding::Dense, + api::v1::PrimaryKeyEncoding::Sparse => PrimaryKeyEncoding::Sparse, + } + } +} + +/// Infer primary key encoding from hint. +pub fn infer_primary_key_encoding_from_hint(hint: Option<&WriteHint>) -> PrimaryKeyEncoding { + hint.map(|hint| PrimaryKeyEncoding::from(hint.primary_key_encoding())) + .unwrap_or(PrimaryKeyEncoding::Dense) +} diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 470f6f44bb..b97e55aae6 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -43,6 +43,20 @@ pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window"; pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction"; /// Option key for twcs fallback to local. pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local"; +/// Option key for memtable type. +pub const MEMTABLE_TYPE: &str = "memtable.type"; +/// Option key for memtable partition tree primary key encoding. +pub const MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING: &str = + "memtable.partition_tree.primary_key_encoding"; +/// Option key for memtable partition tree index max keys per shard. +pub const MEMTABLE_PARTITION_TREE_INDEX_MAX_KEYS_PER_SHARD: &str = + "memtable.partition_tree.index_max_keys_per_shard"; +/// Option key for memtable partition tree data freeze threshold. +pub const MEMTABLE_PARTITION_TREE_DATA_FREEZE_THRESHOLD: &str = + "memtable.partition_tree.data_freeze_threshold"; +/// Option key for memtable partition tree fork dictionary bytes. +pub const MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES: &str = + "memtable.partition_tree.fork_dictionary_bytes"; /// Returns true if the `key` is a valid option key for the mito engine. pub fn is_mito_engine_option_key(key: &str) -> bool { @@ -61,10 +75,11 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "index.inverted_index.ignore_column_ids", "index.inverted_index.segment_row_count", WAL_OPTIONS_KEY, - "memtable.type", - "memtable.partition_tree.index_max_keys_per_shard", - "memtable.partition_tree.data_freeze_threshold", - "memtable.partition_tree.fork_dictionary_bytes", + MEMTABLE_TYPE, + MEMTABLE_PARTITION_TREE_INDEX_MAX_KEYS_PER_SHARD, + MEMTABLE_PARTITION_TREE_DATA_FREEZE_THRESHOLD, + MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES, + MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, APPEND_MODE_KEY, MERGE_MODE_KEY, ] diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index ff32ee185b..2985d2e9c0 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -77,3 +77,107 @@ DROP TABLE phy; Affected Rows: 0 +CREATE TABLE phy ( + ts timestamp time index, + val double +) engine = metric with ( + "physical_metric_table" = "", + "memtable.type" = "partition_tree", + "memtable.partition_tree.primary_key_encoding" = "sparse" +); + +Affected Rows: 0 + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,); + +Affected Rows: 2 + +SELECT * from t1; + ++-------+-------------------------+-----+ +| host | ts | val | ++-------+-------------------------+-----+ +| host2 | 1970-01-01T00:00:00.001 | 1.0 | +| host1 | 1970-01-01T00:00:00 | 0.0 | ++-------+-------------------------+-----+ + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +SELECT * from t2; + +++ +++ + +INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1); + +Affected Rows: 2 + +SELECT * from t2; + ++------+-------------------------+-----+ +| job | ts | val | ++------+-------------------------+-----+ +| job2 | 1970-01-01T00:00:00.001 | 1.0 | +| job1 | 1970-01-01T00:00:00 | 0.0 | ++------+-------------------------+-----+ + +ADMIN flush_table("phy"); + +Error: 1004(InvalidArguments), Failed to build admin function args: unsupported function arg "phy" + +-- SQLNESS ARG restart=true +INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1); + +Affected Rows: 2 + +SELECT * from t1; + ++-------+-------------------------+-----+ +| host | ts | val | ++-------+-------------------------+-----+ +| host2 | 1970-01-01T00:00:00.001 | 1.0 | +| host1 | 1970-01-01T00:00:00 | 0.0 | ++-------+-------------------------+-----+ + +SELECT * from t2; + ++------+-------------------------+-----+ +| job | ts | val | ++------+-------------------------+-----+ +| job2 | 1970-01-01T00:00:00.001 | 1.0 | +| job3 | 1970-01-01T00:00:00 | 0.0 | +| job4 | 1970-01-01T00:00:00.001 | 1.0 | +| job1 | 1970-01-01T00:00:00 | 0.0 | ++------+-------------------------+-----+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DESC TABLE phy; + ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index 2157d68707..7a3bd00935 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -23,3 +23,43 @@ DESC TABLE phy; SELECT ts, val, __tsid, host, job FROM phy; DROP TABLE phy; + +CREATE TABLE phy ( + ts timestamp time index, + val double +) engine = metric with ( + "physical_metric_table" = "", + "memtable.type" = "partition_tree", + "memtable.partition_tree.primary_key_encoding" = "sparse" +); + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,); + +SELECT * from t1; + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +SELECT * from t2; + +INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1); + +SELECT * from t2; + +ADMIN flush_table("phy"); + +-- SQLNESS ARG restart=true +INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1); + +SELECT * from t1; + +SELECT * from t2; + +DROP TABLE t1; + +DROP TABLE t2; + +DESC TABLE phy; + +DROP TABLE phy;