mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
feat(metric-engine): introduce index options from metric engine (#5374)
* feat(metric-engine): introduce index options from metric engine * chore: fmt toml * test: add sqlness test * fix: ignore internal columns * chore: remove unused dep * chore: update sqlness result * chore: ignore metric engine internal columns * chore: refine code styling * test: update sqlness test * refactor: refactor `create_table_constraints` * test: show index * chore: apply suggestions from CR * fix: set inverted index explicitly * chore: apply suggestions from CR
This commit is contained in:
@@ -188,7 +188,7 @@ mod tests {
|
||||
case_sensitive: false,
|
||||
})
|
||||
.unwrap();
|
||||
schema.with_inverted_index(true);
|
||||
schema.set_inverted_index(true);
|
||||
let options = options_from_column_schema(&schema).unwrap();
|
||||
assert_eq!(
|
||||
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
|
||||
|
||||
@@ -158,7 +158,11 @@ impl ColumnSchema {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_inverted_index(&mut self, value: bool) {
|
||||
/// Set the inverted index for the column.
|
||||
/// Similar to [with_inverted_index] but don't take the ownership.
|
||||
///
|
||||
/// [with_inverted_index]: Self::with_inverted_index
|
||||
pub fn set_inverted_index(&mut self, value: bool) {
|
||||
match value {
|
||||
true => {
|
||||
self.metadata
|
||||
@@ -170,6 +174,15 @@ impl ColumnSchema {
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the inverted index for the column.
|
||||
/// Similar to [set_inverted_index] but take the ownership and return a owned value.
|
||||
///
|
||||
/// [set_inverted_index]: Self::set_inverted_index
|
||||
pub fn with_inverted_index(mut self, value: bool) -> Self {
|
||||
self.set_inverted_index(value);
|
||||
self
|
||||
}
|
||||
|
||||
// Put a placeholder to invalidate schemas.all(!has_inverted_index_key).
|
||||
pub fn insert_inverted_index_placeholder(&mut self) {
|
||||
self.metadata
|
||||
|
||||
@@ -16,6 +16,7 @@ use api::v1::SemanticType;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_telemetry::{debug, info, warn};
|
||||
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
|
||||
use mito2::engine::MitoEngine;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
@@ -26,9 +27,10 @@ use store_api::region_request::{
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::{ConcreteDataType, RegionId};
|
||||
|
||||
use crate::engine::IndexOptions;
|
||||
use crate::error::{
|
||||
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
|
||||
MitoWriteOperationSnafu, Result,
|
||||
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
|
||||
};
|
||||
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
|
||||
use crate::utils;
|
||||
@@ -64,13 +66,16 @@ impl DataRegion {
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
columns: &mut [ColumnMetadata],
|
||||
index_options: IndexOptions,
|
||||
) -> Result<()> {
|
||||
let region_id = utils::to_data_region_id(region_id);
|
||||
|
||||
let mut retries = 0;
|
||||
// submit alter request
|
||||
while retries < MAX_RETRIES {
|
||||
let request = self.assemble_alter_request(region_id, columns).await?;
|
||||
let request = self
|
||||
.assemble_alter_request(region_id, columns, index_options)
|
||||
.await?;
|
||||
|
||||
let _timer = MITO_DDL_DURATION.start_timer();
|
||||
|
||||
@@ -97,6 +102,7 @@ impl DataRegion {
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
columns: &mut [ColumnMetadata],
|
||||
index_options: IndexOptions,
|
||||
) -> Result<RegionRequest> {
|
||||
// retrieve underlying version
|
||||
let region_metadata = self
|
||||
@@ -142,6 +148,19 @@ impl DataRegion {
|
||||
|
||||
c.column_id = new_column_id_start + delta as u32;
|
||||
c.column_schema.set_nullable();
|
||||
match index_options {
|
||||
IndexOptions::Inverted => {
|
||||
c.column_schema.set_inverted_index(true);
|
||||
}
|
||||
IndexOptions::Skipping { granularity } => {
|
||||
c.column_schema
|
||||
.set_skipping_options(&SkippingIndexOptions {
|
||||
granularity,
|
||||
index_type: SkippingIndexType::BloomFilter,
|
||||
})
|
||||
.context(SetSkippingIndexOptionSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AddColumn {
|
||||
column_metadata: c.clone(),
|
||||
@@ -256,7 +275,11 @@ mod test {
|
||||
},
|
||||
];
|
||||
env.data_region()
|
||||
.add_columns(env.default_physical_region_id(), &mut new_columns)
|
||||
.add_columns(
|
||||
env.default_physical_region_id(),
|
||||
&mut new_columns,
|
||||
IndexOptions::Inverted,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -295,7 +318,11 @@ mod test {
|
||||
}];
|
||||
let result = env
|
||||
.data_region()
|
||||
.add_columns(env.default_physical_region_id(), &mut new_columns)
|
||||
.add_columns(
|
||||
env.default_physical_region_id(),
|
||||
&mut new_columns,
|
||||
IndexOptions::Inverted,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ use async_trait::async_trait;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use mito2::engine::MitoEngine;
|
||||
pub(crate) use options::IndexOptions;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
|
||||
@@ -22,7 +22,9 @@ use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::{LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu};
|
||||
use crate::error::{
|
||||
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
|
||||
};
|
||||
use crate::utils::{to_data_region_id, to_metadata_region_id};
|
||||
|
||||
impl MetricEngineInner {
|
||||
@@ -64,16 +66,27 @@ impl MetricEngineInner {
|
||||
logical_region_id: RegionId,
|
||||
request: RegionAlterRequest,
|
||||
) -> Result<RegionId> {
|
||||
let physical_region_id = {
|
||||
let (physical_region_id, index_options) = {
|
||||
let state = &self.state.read().unwrap();
|
||||
state
|
||||
let physical_region_id = state
|
||||
.get_physical_region_id(logical_region_id)
|
||||
.with_context(|| {
|
||||
error!("Trying to alter an nonexistent region {logical_region_id}");
|
||||
LogicalRegionNotFoundSnafu {
|
||||
region_id: logical_region_id,
|
||||
}
|
||||
})?;
|
||||
|
||||
let index_options = state
|
||||
.physical_region_states()
|
||||
.get(&physical_region_id)
|
||||
.with_context(|| PhysicalRegionNotFoundSnafu {
|
||||
region_id: physical_region_id,
|
||||
})?
|
||||
.options()
|
||||
.index;
|
||||
|
||||
(physical_region_id, index_options)
|
||||
};
|
||||
|
||||
// only handle adding column
|
||||
@@ -122,6 +135,7 @@ impl MetricEngineInner {
|
||||
data_region_id,
|
||||
logical_region_id,
|
||||
&mut columns_to_add,
|
||||
index_options,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -13,13 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use common_telemetry::debug;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu};
|
||||
use crate::error::{
|
||||
MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
|
||||
};
|
||||
use crate::utils;
|
||||
|
||||
impl MetricEngineInner {
|
||||
@@ -34,6 +36,18 @@ impl MetricEngineInner {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let physical_region_options = *self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
.context(PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?
|
||||
.options();
|
||||
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
// TODO(weny): improve the catchup, we can read the wal entries only once.
|
||||
debug!("Catchup metadata region {metadata_region_id}");
|
||||
@@ -49,7 +63,6 @@ impl MetricEngineInner {
|
||||
.await
|
||||
.context(MitoCatchupOperationSnafu)?;
|
||||
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
debug!("Catchup data region {data_region_id}");
|
||||
self.mito
|
||||
.handle_request(
|
||||
@@ -64,7 +77,8 @@ impl MetricEngineInner {
|
||||
.context(MitoCatchupOperationSnafu)
|
||||
.map(|response| response.affected_rows)?;
|
||||
|
||||
self.recover_states(region_id).await?;
|
||||
self.recover_states(region_id, physical_region_options)
|
||||
.await?;
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,8 +36,7 @@ impl MetricEngineInner {
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_regions()
|
||||
.contains_key(&data_region_id)
|
||||
.exist_physical_region(data_region_id)
|
||||
{
|
||||
self.close_physical_region(data_region_id).await?;
|
||||
self.state
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -38,7 +38,7 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::options::set_data_region_options;
|
||||
use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::{
|
||||
AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu,
|
||||
@@ -91,6 +91,7 @@ impl MetricEngineInner {
|
||||
region_id: RegionId,
|
||||
request: RegionCreateRequest,
|
||||
) -> Result<()> {
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
|
||||
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
|
||||
|
||||
// create metadata region
|
||||
@@ -107,11 +108,11 @@ impl MetricEngineInner {
|
||||
|
||||
// create data region
|
||||
let create_data_region_request = self.create_request_for_data_region(&request);
|
||||
let physical_column_set = create_data_region_request
|
||||
let physical_columns = create_data_region_request
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|metadata| metadata.column_schema.name.clone())
|
||||
.collect::<HashSet<_>>();
|
||||
.map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.mito
|
||||
.handle_request(
|
||||
data_region_id,
|
||||
@@ -126,10 +127,11 @@ impl MetricEngineInner {
|
||||
PHYSICAL_REGION_COUNT.inc();
|
||||
|
||||
// remember this table
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_physical_region(data_region_id, physical_column_set);
|
||||
self.state.write().unwrap().add_physical_region(
|
||||
data_region_id,
|
||||
physical_columns,
|
||||
physical_region_options,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -180,17 +182,18 @@ impl MetricEngineInner {
|
||||
// find new columns to add
|
||||
let mut new_columns = vec![];
|
||||
let mut existing_columns = vec![];
|
||||
{
|
||||
let index_option = {
|
||||
let state = &self.state.read().unwrap();
|
||||
let physical_columns =
|
||||
state
|
||||
.physical_columns()
|
||||
.get(&data_region_id)
|
||||
.with_context(|| PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?;
|
||||
let region_state = state
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
.with_context(|| PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?;
|
||||
let physical_columns = region_state.physical_columns();
|
||||
|
||||
for col in &request.column_metadatas {
|
||||
if !physical_columns.contains(&col.column_schema.name) {
|
||||
if !physical_columns.contains_key(&col.column_schema.name) {
|
||||
// Multi-field on physical table is explicit forbidden at present
|
||||
// TODO(ruihang): support multi-field on both logical and physical column
|
||||
ensure!(
|
||||
@@ -204,7 +207,9 @@ impl MetricEngineInner {
|
||||
existing_columns.push(col.column_schema.name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
region_state.options().index
|
||||
};
|
||||
|
||||
if !new_columns.is_empty() {
|
||||
info!("Found new columns {new_columns:?} to add to physical region {data_region_id}");
|
||||
@@ -213,6 +218,7 @@ impl MetricEngineInner {
|
||||
data_region_id,
|
||||
logical_region_id,
|
||||
&mut new_columns,
|
||||
index_option,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -277,21 +283,17 @@ impl MetricEngineInner {
|
||||
data_region_id: RegionId,
|
||||
logical_region_id: RegionId,
|
||||
new_columns: &mut [ColumnMetadata],
|
||||
index_options: IndexOptions,
|
||||
) -> Result<()> {
|
||||
// alter data region
|
||||
self.data_region
|
||||
.add_columns(data_region_id, new_columns)
|
||||
.add_columns(data_region_id, new_columns, index_options)
|
||||
.await?;
|
||||
|
||||
// safety: previous step has checked this
|
||||
self.state.write().unwrap().add_physical_columns(
|
||||
data_region_id,
|
||||
new_columns
|
||||
.iter()
|
||||
.map(|meta| meta.column_schema.name.clone()),
|
||||
);
|
||||
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
|
||||
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
|
||||
// Return early if no new columns are added.
|
||||
if new_columns.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// correct the column id
|
||||
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
|
||||
@@ -321,6 +323,16 @@ impl MetricEngineInner {
|
||||
}
|
||||
}
|
||||
|
||||
// safety: previous step has checked this
|
||||
self.state.write().unwrap().add_physical_columns(
|
||||
data_region_id,
|
||||
new_columns
|
||||
.iter()
|
||||
.map(|meta| (meta.column_schema.name.clone(), meta.column_id)),
|
||||
);
|
||||
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
|
||||
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -520,7 +532,8 @@ impl MetricEngineInner {
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
),
|
||||
)
|
||||
.with_inverted_index(true),
|
||||
};
|
||||
let tsid_col = ColumnMetadata {
|
||||
column_id: ReservedColumnId::tsid(),
|
||||
@@ -529,7 +542,8 @@ impl MetricEngineInner {
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
)
|
||||
.with_inverted_index(false),
|
||||
};
|
||||
[metric_name_col, tsid_col]
|
||||
}
|
||||
@@ -539,6 +553,7 @@ impl MetricEngineInner {
|
||||
pub(crate) fn region_options_for_metadata_region(
|
||||
mut original: HashMap<String, String>,
|
||||
) -> HashMap<String, String> {
|
||||
// TODO(ruihang, weny): add whitelist for metric engine options.
|
||||
original.remove(APPEND_MODE_KEY);
|
||||
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
|
||||
original
|
||||
|
||||
@@ -36,14 +36,14 @@ impl MetricEngineInner {
|
||||
|
||||
// enclose the guard in a block to prevent the guard from polluting the async context
|
||||
let (is_physical_region, is_physical_region_busy) = {
|
||||
if let Some(logical_regions) = self
|
||||
if let Some(state) = self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_regions()
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
{
|
||||
(true, !logical_regions.is_empty())
|
||||
(true, !state.logical_regions().is_empty())
|
||||
} else {
|
||||
// the second argument is not used, just pass in a dummy value
|
||||
(false, true)
|
||||
|
||||
@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
|
||||
|
||||
use super::MetricEngineInner;
|
||||
use crate::engine::create::region_options_for_metadata_region;
|
||||
use crate::engine::options::set_data_region_options;
|
||||
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
|
||||
use crate::error::{OpenMitoRegionSnafu, Result};
|
||||
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
|
||||
use crate::utils;
|
||||
@@ -47,8 +47,10 @@ impl MetricEngineInner {
|
||||
) -> Result<AffectedRows> {
|
||||
if request.is_physical_table() {
|
||||
// open physical region and recover states
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
|
||||
self.open_physical_region(region_id, request).await?;
|
||||
self.recover_states(region_id).await?;
|
||||
self.recover_states(region_id, physical_region_options)
|
||||
.await?;
|
||||
|
||||
Ok(0)
|
||||
} else {
|
||||
@@ -120,7 +122,11 @@ impl MetricEngineInner {
|
||||
/// Includes:
|
||||
/// - Record physical region's column names
|
||||
/// - Record the mapping between logical region id and physical region id
|
||||
pub(crate) async fn recover_states(&self, physical_region_id: RegionId) -> Result<()> {
|
||||
pub(crate) async fn recover_states(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
physical_region_options: PhysicalRegionOptions,
|
||||
) -> Result<()> {
|
||||
// load logical regions and physical column names
|
||||
let logical_regions = self
|
||||
.metadata_region
|
||||
@@ -135,11 +141,15 @@ impl MetricEngineInner {
|
||||
{
|
||||
let mut state = self.state.write().unwrap();
|
||||
// recover physical column names
|
||||
let physical_column_names = physical_columns
|
||||
let physical_columns = physical_columns
|
||||
.into_iter()
|
||||
.map(|col| col.column_schema.name)
|
||||
.map(|col| (col.column_schema.name, col.column_id))
|
||||
.collect();
|
||||
state.add_physical_region(physical_region_id, physical_column_names);
|
||||
state.add_physical_region(
|
||||
physical_region_id,
|
||||
physical_columns,
|
||||
physical_region_options,
|
||||
);
|
||||
// recover logical regions
|
||||
for logical_region_id in &logical_regions {
|
||||
state.add_logical_region(physical_region_id, *logical_region_id);
|
||||
|
||||
@@ -16,13 +16,12 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use itertools::Itertools as _;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::metric_engine_consts::{
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION,
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT, METRIC_ENGINE_INDEX_TYPE_OPTION,
|
||||
};
|
||||
|
||||
/// Ignore building index on the column `tsid` which is unfriendly to the inverted index and
|
||||
/// will occupy excessive space if indexed.
|
||||
const IGNORE_COLUMN_IDS_FOR_DATA_REGION: [ColumnId; 1] = [ReservedColumnId::tsid()];
|
||||
use crate::error::{Error, ParseRegionOptionsSnafu, Result};
|
||||
|
||||
/// The empirical value for the seg row count of the metric data region.
|
||||
/// Compared to the mito engine, the pattern of the metric engine constructs smaller indices.
|
||||
@@ -30,13 +29,26 @@ const IGNORE_COLUMN_IDS_FOR_DATA_REGION: [ColumnId; 1] = [ReservedColumnId::tsid
|
||||
/// value and appropriately increasing the size of the index, it results in an improved indexing effect.
|
||||
const SEG_ROW_COUNT_FOR_DATA_REGION: u32 = 256;
|
||||
|
||||
/// Physical region options.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct PhysicalRegionOptions {
|
||||
pub index: IndexOptions,
|
||||
}
|
||||
|
||||
/// Index options for auto created columns
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum IndexOptions {
|
||||
#[default]
|
||||
Inverted,
|
||||
Skipping {
|
||||
granularity: u32,
|
||||
},
|
||||
}
|
||||
|
||||
/// Sets data region specific options.
|
||||
pub fn set_data_region_options(options: &mut HashMap<String, String>) {
|
||||
// Set the index options for the data region.
|
||||
options.insert(
|
||||
"index.inverted_index.ignore_column_ids".to_string(),
|
||||
IGNORE_COLUMN_IDS_FOR_DATA_REGION.iter().join(","),
|
||||
);
|
||||
options.remove(METRIC_ENGINE_INDEX_TYPE_OPTION);
|
||||
options.remove(METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION);
|
||||
options.insert(
|
||||
"index.inverted_index.segment_row_count".to_string(),
|
||||
SEG_ROW_COUNT_FOR_DATA_REGION.to_string(),
|
||||
@@ -44,3 +56,97 @@ pub fn set_data_region_options(options: &mut HashMap<String, String>) {
|
||||
// Set memtable options for the data region.
|
||||
options.insert("memtable.type".to_string(), "partition_tree".to_string());
|
||||
}
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for PhysicalRegionOptions {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &HashMap<String, String>) -> Result<Self> {
|
||||
let index = match value
|
||||
.get(METRIC_ENGINE_INDEX_TYPE_OPTION)
|
||||
.map(|s| s.to_lowercase())
|
||||
{
|
||||
Some(ref index_type) if index_type == "inverted" => Ok(IndexOptions::Inverted),
|
||||
Some(ref index_type) if index_type == "skipping" => {
|
||||
let granularity = value
|
||||
.get(METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION)
|
||||
.map_or(
|
||||
Ok(METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT),
|
||||
|g| {
|
||||
g.parse().map_err(|_| {
|
||||
ParseRegionOptionsSnafu {
|
||||
reason: format!("Invalid granularity: {}", g),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
},
|
||||
)?;
|
||||
Ok(IndexOptions::Skipping { granularity })
|
||||
}
|
||||
Some(index_type) => ParseRegionOptionsSnafu {
|
||||
reason: format!("Invalid index type: {}", index_type),
|
||||
}
|
||||
.fail(),
|
||||
None => Ok(IndexOptions::default()),
|
||||
}?;
|
||||
|
||||
Ok(PhysicalRegionOptions { index })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_set_data_region_options_should_remove_metric_engine_options() {
|
||||
let mut options = HashMap::new();
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_TYPE_OPTION.to_string(),
|
||||
"inverted".to_string(),
|
||||
);
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION.to_string(),
|
||||
"102400".to_string(),
|
||||
);
|
||||
set_data_region_options(&mut options);
|
||||
|
||||
for key in [
|
||||
METRIC_ENGINE_INDEX_TYPE_OPTION,
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION,
|
||||
] {
|
||||
assert_eq!(options.get(key), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_physical_region_options_from_hashmap() {
|
||||
let mut options = HashMap::new();
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_TYPE_OPTION.to_string(),
|
||||
"inverted".to_string(),
|
||||
);
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION.to_string(),
|
||||
"102400".to_string(),
|
||||
);
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&options).unwrap();
|
||||
assert_eq!(physical_region_options.index, IndexOptions::Inverted);
|
||||
|
||||
let mut options = HashMap::new();
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_TYPE_OPTION.to_string(),
|
||||
"skipping".to_string(),
|
||||
);
|
||||
options.insert(
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION.to_string(),
|
||||
"102400".to_string(),
|
||||
);
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&options).unwrap();
|
||||
assert_eq!(
|
||||
physical_region_options.index,
|
||||
IndexOptions::Skipping {
|
||||
granularity: 102400
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,12 +42,8 @@ impl MetricEngineInner {
|
||||
region_id: RegionId,
|
||||
request: RegionPutRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
let is_putting_physical_region = self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_regions()
|
||||
.contains_key(®ion_id);
|
||||
let is_putting_physical_region =
|
||||
self.state.read().unwrap().exist_physical_region(region_id);
|
||||
|
||||
if is_putting_physical_region {
|
||||
info!(
|
||||
@@ -114,16 +110,16 @@ impl MetricEngineInner {
|
||||
}
|
||||
|
||||
// Check if a physical column exists
|
||||
let physical_columns =
|
||||
state
|
||||
.physical_columns()
|
||||
.get(&data_region_id)
|
||||
.context(PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?;
|
||||
let physical_columns = state
|
||||
.physical_region_states()
|
||||
.get(&data_region_id)
|
||||
.context(PhysicalRegionNotFoundSnafu {
|
||||
region_id: data_region_id,
|
||||
})?
|
||||
.physical_columns();
|
||||
for col in &request.rows.schema {
|
||||
ensure!(
|
||||
physical_columns.contains(&col.column_name),
|
||||
physical_columns.contains_key(&col.column_name),
|
||||
ColumnNotFoundSnafu {
|
||||
name: col.column_name.clone(),
|
||||
region_id: logical_region_id,
|
||||
|
||||
@@ -86,12 +86,8 @@ impl MetricEngineInner {
|
||||
}
|
||||
|
||||
pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
|
||||
let is_reading_physical_region = self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_regions()
|
||||
.contains_key(®ion_id);
|
||||
let is_reading_physical_region =
|
||||
self.state.read().unwrap().exist_physical_region(region_id);
|
||||
|
||||
if is_reading_physical_region {
|
||||
self.mito
|
||||
@@ -107,11 +103,7 @@ impl MetricEngineInner {
|
||||
|
||||
/// Returns true if it's a physical region.
|
||||
pub fn is_physical_region(&self, region_id: RegionId) -> bool {
|
||||
self.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_regions()
|
||||
.contains_key(®ion_id)
|
||||
self.state.read().unwrap().exist_physical_region(region_id)
|
||||
}
|
||||
|
||||
async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
|
||||
|
||||
@@ -18,24 +18,60 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use snafu::OptionExt;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::engine::options::PhysicalRegionOptions;
|
||||
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
|
||||
use crate::metrics::LOGICAL_REGION_COUNT;
|
||||
use crate::utils::to_data_region_id;
|
||||
|
||||
pub struct PhysicalRegionState {
|
||||
logical_regions: HashSet<RegionId>,
|
||||
physical_columns: HashMap<String, ColumnId>,
|
||||
options: PhysicalRegionOptions,
|
||||
}
|
||||
|
||||
impl PhysicalRegionState {
|
||||
pub fn new(
|
||||
physical_columns: HashMap<String, ColumnId>,
|
||||
options: PhysicalRegionOptions,
|
||||
) -> Self {
|
||||
Self {
|
||||
logical_regions: HashSet::new(),
|
||||
physical_columns,
|
||||
options,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the logical region ids.
|
||||
pub fn logical_regions(&self) -> &HashSet<RegionId> {
|
||||
&self.logical_regions
|
||||
}
|
||||
|
||||
/// Returns a reference to the physical columns.
|
||||
pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
|
||||
&self.physical_columns
|
||||
}
|
||||
|
||||
/// Returns a reference to the physical region options.
|
||||
pub fn options(&self) -> &PhysicalRegionOptions {
|
||||
&self.options
|
||||
}
|
||||
|
||||
/// Removes a logical region id from the physical region state.
|
||||
/// Returns true if the logical region id was present.
|
||||
pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> bool {
|
||||
self.logical_regions.remove(&logical_region_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal states of metric engine
|
||||
#[derive(Default)]
|
||||
pub(crate) struct MetricEngineState {
|
||||
/// Mapping from physical region id to its logical region ids
|
||||
/// `logical_regions` records a reverse mapping from logical region id to
|
||||
/// physical region id
|
||||
physical_regions: HashMap<RegionId, HashSet<RegionId>>,
|
||||
/// Physical regions states.
|
||||
physical_regions: HashMap<RegionId, PhysicalRegionState>,
|
||||
/// Mapping from logical region id to physical region id.
|
||||
logical_regions: HashMap<RegionId, RegionId>,
|
||||
/// Cache for the columns of physical regions.
|
||||
/// The region id in key is the data region id.
|
||||
physical_columns: HashMap<RegionId, HashSet<String>>,
|
||||
/// Cache for the column metadata of logical regions.
|
||||
/// The column order is the same with the order in the metadata, which is
|
||||
/// alphabetically ordered on column name.
|
||||
@@ -46,13 +82,14 @@ impl MetricEngineState {
|
||||
pub fn add_physical_region(
|
||||
&mut self,
|
||||
physical_region_id: RegionId,
|
||||
physical_columns: HashSet<String>,
|
||||
physical_columns: HashMap<String, ColumnId>,
|
||||
options: PhysicalRegionOptions,
|
||||
) {
|
||||
let physical_region_id = to_data_region_id(physical_region_id);
|
||||
self.physical_regions
|
||||
.insert(physical_region_id, HashSet::new());
|
||||
self.physical_columns
|
||||
.insert(physical_region_id, physical_columns);
|
||||
self.physical_regions.insert(
|
||||
physical_region_id,
|
||||
PhysicalRegionState::new(physical_columns, options),
|
||||
);
|
||||
}
|
||||
|
||||
/// # Panic
|
||||
@@ -60,12 +97,12 @@ impl MetricEngineState {
|
||||
pub fn add_physical_columns(
|
||||
&mut self,
|
||||
physical_region_id: RegionId,
|
||||
physical_columns: impl IntoIterator<Item = String>,
|
||||
physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
|
||||
) {
|
||||
let physical_region_id = to_data_region_id(physical_region_id);
|
||||
let columns = self.physical_columns.get_mut(&physical_region_id).unwrap();
|
||||
for col in physical_columns {
|
||||
columns.insert(col);
|
||||
let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
|
||||
for (col, id) in physical_columns {
|
||||
state.physical_columns.insert(col, id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,6 +117,7 @@ impl MetricEngineState {
|
||||
self.physical_regions
|
||||
.get_mut(&physical_region_id)
|
||||
.unwrap()
|
||||
.logical_regions
|
||||
.insert(logical_region_id);
|
||||
self.logical_regions
|
||||
.insert(logical_region_id, physical_region_id);
|
||||
@@ -98,18 +136,18 @@ impl MetricEngineState {
|
||||
self.logical_regions.get(&logical_region_id).copied()
|
||||
}
|
||||
|
||||
pub fn physical_columns(&self) -> &HashMap<RegionId, HashSet<String>> {
|
||||
&self.physical_columns
|
||||
}
|
||||
|
||||
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
|
||||
&self.logical_columns
|
||||
}
|
||||
|
||||
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
|
||||
pub fn physical_region_states(&self) -> &HashMap<RegionId, PhysicalRegionState> {
|
||||
&self.physical_regions
|
||||
}
|
||||
|
||||
pub fn exist_physical_region(&self, physical_region_id: RegionId) -> bool {
|
||||
self.physical_regions.contains_key(&physical_region_id)
|
||||
}
|
||||
|
||||
pub fn logical_regions(&self) -> &HashMap<RegionId, RegionId> {
|
||||
&self.logical_regions
|
||||
}
|
||||
@@ -118,11 +156,13 @@ impl MetricEngineState {
|
||||
pub fn remove_physical_region(&mut self, physical_region_id: RegionId) -> Result<()> {
|
||||
let physical_region_id = to_data_region_id(physical_region_id);
|
||||
|
||||
let logical_regions = self.physical_regions.get(&physical_region_id).context(
|
||||
PhysicalRegionNotFoundSnafu {
|
||||
let logical_regions = &self
|
||||
.physical_regions
|
||||
.get(&physical_region_id)
|
||||
.context(PhysicalRegionNotFoundSnafu {
|
||||
region_id: physical_region_id,
|
||||
},
|
||||
)?;
|
||||
})?
|
||||
.logical_regions;
|
||||
|
||||
LOGICAL_REGION_COUNT.sub(logical_regions.len() as i64);
|
||||
|
||||
@@ -130,7 +170,6 @@ impl MetricEngineState {
|
||||
self.logical_regions.remove(logical_region);
|
||||
}
|
||||
self.physical_regions.remove(&physical_region_id);
|
||||
self.physical_columns.remove(&physical_region_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -145,7 +184,7 @@ impl MetricEngineState {
|
||||
self.physical_regions
|
||||
.get_mut(&physical_region_id)
|
||||
.unwrap() // Safety: physical_region_id is got from physical_regions
|
||||
.remove(&logical_region_id);
|
||||
.remove_logical_region(logical_region_id);
|
||||
|
||||
self.logical_columns.remove(&logical_region_id);
|
||||
|
||||
|
||||
@@ -91,6 +91,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse region options: {}", reason))]
|
||||
ParseRegionOptions {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Mito read operation fails"))]
|
||||
MitoReadOperation {
|
||||
source: BoxedError,
|
||||
@@ -225,6 +232,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to set SKIPPING index option"))]
|
||||
SetSkippingIndexOption {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -241,7 +255,8 @@ impl ErrorExt for Error {
|
||||
| PhysicalRegionBusy { .. }
|
||||
| MultipleFieldColumn { .. }
|
||||
| NoFieldColumn { .. }
|
||||
| AddingFieldColumn { .. } => StatusCode::InvalidArguments,
|
||||
| AddingFieldColumn { .. }
|
||||
| ParseRegionOptions { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
|
||||
StatusCode::Unsupported
|
||||
@@ -251,7 +266,8 @@ impl ErrorExt for Error {
|
||||
| SerializeColumnMetadata { .. }
|
||||
| DecodeColumnValue { .. }
|
||||
| ParseRegionId { .. }
|
||||
| InvalidMetadata { .. } => StatusCode::Unexpected,
|
||||
| InvalidMetadata { .. }
|
||||
| SetSkippingIndexOption { .. } => StatusCode::Unexpected,
|
||||
|
||||
PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
|
||||
StatusCode::RegionNotFound
|
||||
|
||||
@@ -148,6 +148,43 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Colu
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the column schemas for `SHOW CREATE TABLE` statement.
|
||||
///
|
||||
/// For metric engine, it will only return the column schemas that are not internal columns.
|
||||
fn column_schemas_for_show_create<'a>(
|
||||
schema: &'a SchemaRef,
|
||||
engine: &str,
|
||||
) -> Vec<&'a ColumnSchema> {
|
||||
let is_metric_engine = is_metric_engine(engine);
|
||||
if is_metric_engine {
|
||||
schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.filter(|c| !is_metric_engine_internal_column(&c.name))
|
||||
.collect()
|
||||
} else {
|
||||
schema.column_schemas().iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the primary key columns for `SHOW CREATE TABLE` statement.
|
||||
///
|
||||
/// For metric engine, it will only return the primary key columns that are not internal columns.
|
||||
fn primary_key_columns_for_show_create<'a>(
|
||||
table_meta: &'a TableMeta,
|
||||
engine: &str,
|
||||
) -> Vec<&'a String> {
|
||||
let is_metric_engine = is_metric_engine(engine);
|
||||
if is_metric_engine {
|
||||
table_meta
|
||||
.row_key_column_names()
|
||||
.filter(|name| !is_metric_engine_internal_column(name))
|
||||
.collect()
|
||||
} else {
|
||||
table_meta.row_key_column_names().collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn create_table_constraints(
|
||||
engine: &str,
|
||||
schema: &SchemaRef,
|
||||
@@ -162,31 +199,22 @@ fn create_table_constraints(
|
||||
});
|
||||
}
|
||||
if !table_meta.primary_key_indices.is_empty() {
|
||||
let is_metric_engine = is_metric_engine(engine);
|
||||
let columns = table_meta
|
||||
.row_key_column_names()
|
||||
.flat_map(|name| {
|
||||
if is_metric_engine && is_metric_engine_internal_column(name) {
|
||||
None
|
||||
} else {
|
||||
Some(Ident::with_quote(quote_style, name))
|
||||
}
|
||||
})
|
||||
let columns = primary_key_columns_for_show_create(table_meta, engine)
|
||||
.into_iter()
|
||||
.map(|name| Ident::with_quote(quote_style, name))
|
||||
.collect();
|
||||
constraints.push(TableConstraint::PrimaryKey { columns });
|
||||
}
|
||||
|
||||
let inverted_index_set = schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.any(|c| c.has_inverted_index_key());
|
||||
let column_schemas = column_schemas_for_show_create(schema, engine);
|
||||
let inverted_index_set = column_schemas.iter().any(|c| c.has_inverted_index_key());
|
||||
if inverted_index_set {
|
||||
let inverted_index_cols = schema
|
||||
.column_schemas()
|
||||
let inverted_index_cols = column_schemas
|
||||
.iter()
|
||||
.filter(|c| c.is_inverted_indexed())
|
||||
.map(|c| Ident::with_quote(quote_style, &c.name))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
constraints.push(TableConstraint::InvertedIndex {
|
||||
columns: inverted_index_cols,
|
||||
});
|
||||
@@ -248,8 +276,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_show_create_table_sql() {
|
||||
let mut host_schema = ColumnSchema::new("host", ConcreteDataType::string_datatype(), true);
|
||||
host_schema.with_inverted_index(true);
|
||||
let schema = vec![
|
||||
ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), true)
|
||||
.with_skipping_options(SkippingIndexOptions {
|
||||
@@ -257,7 +283,8 @@ mod tests {
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap(),
|
||||
host_schema,
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true)
|
||||
.with_inverted_index(true),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
|
||||
ColumnSchema::new("disk", ConcreteDataType::float32_datatype(), true),
|
||||
ColumnSchema::new("msg", ConcreteDataType::string_datatype(), true)
|
||||
|
||||
@@ -497,7 +497,7 @@ pub fn column_to_schema(
|
||||
column_schema.insert_inverted_index_placeholder();
|
||||
}
|
||||
} else if inverted_index_cols.contains(&column.name().value) {
|
||||
column_schema.with_inverted_index(true);
|
||||
column_schema.set_inverted_index(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -628,12 +628,36 @@ impl RegionMetadataBuilder {
|
||||
.map(|col| col.column_schema.name.clone())
|
||||
.collect();
|
||||
|
||||
let pk_as_inverted_index = !self
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.any(|c| c.column_schema.has_inverted_index_key());
|
||||
let mut set_inverted_index_for_primary_keys = false;
|
||||
|
||||
for add_column in columns {
|
||||
if names.contains(&add_column.column_metadata.column_schema.name) {
|
||||
// Column already exists.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handles using primary key as inverted index.
|
||||
let has_inverted_index_key = add_column
|
||||
.column_metadata
|
||||
.column_schema
|
||||
.has_inverted_index_key();
|
||||
|
||||
if pk_as_inverted_index
|
||||
&& has_inverted_index_key
|
||||
&& !set_inverted_index_for_primary_keys
|
||||
{
|
||||
self.column_metadatas.iter_mut().for_each(|col| {
|
||||
if col.semantic_type == SemanticType::Tag {
|
||||
col.column_schema.set_inverted_index(true);
|
||||
}
|
||||
});
|
||||
set_inverted_index_for_primary_keys = true;
|
||||
}
|
||||
|
||||
let column_id = add_column.column_metadata.column_id;
|
||||
let semantic_type = add_column.column_metadata.semantic_type;
|
||||
let column_name = add_column.column_metadata.column_schema.name.clone();
|
||||
@@ -704,7 +728,7 @@ impl RegionMetadataBuilder {
|
||||
) -> Result<()> {
|
||||
for column_meta in self.column_metadatas.iter_mut() {
|
||||
if column_meta.column_schema.name == column_name {
|
||||
column_meta.column_schema.with_inverted_index(value)
|
||||
column_meta.column_schema.set_inverted_index(value)
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1495,6 +1519,43 @@ mod test {
|
||||
check_columns(&metadata, &["a", "b", "c", "d"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_column_with_inverted_index() {
|
||||
// a (tag), b (field), c (ts)
|
||||
let metadata = build_test_region_metadata();
|
||||
let mut builder = RegionMetadataBuilder::from_existing(metadata);
|
||||
// tag d, e
|
||||
let mut col = new_column_metadata("d", true, 4);
|
||||
col.column_schema.set_inverted_index(true);
|
||||
builder
|
||||
.alter(AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumn {
|
||||
column_metadata: col,
|
||||
location: None,
|
||||
},
|
||||
AddColumn {
|
||||
column_metadata: new_column_metadata("e", true, 5),
|
||||
location: None,
|
||||
},
|
||||
],
|
||||
})
|
||||
.unwrap();
|
||||
let metadata = builder.build().unwrap();
|
||||
check_columns(&metadata, &["a", "b", "c", "d", "e"]);
|
||||
assert_eq!([1, 4, 5], &metadata.primary_key[..]);
|
||||
let column_metadata = metadata.column_by_name("a").unwrap();
|
||||
assert!(column_metadata.column_schema.is_inverted_indexed());
|
||||
let column_metadata = metadata.column_by_name("b").unwrap();
|
||||
assert!(!column_metadata.column_schema.is_inverted_indexed());
|
||||
let column_metadata = metadata.column_by_name("c").unwrap();
|
||||
assert!(!column_metadata.column_schema.is_inverted_indexed());
|
||||
let column_metadata = metadata.column_by_name("d").unwrap();
|
||||
assert!(column_metadata.column_schema.is_inverted_indexed());
|
||||
let column_metadata = metadata.column_by_name("e").unwrap();
|
||||
assert!(!column_metadata.column_schema.is_inverted_indexed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_if_exists() {
|
||||
// a (tag), b (field), c (ts)
|
||||
|
||||
@@ -82,3 +82,47 @@ pub fn is_metric_engine_internal_column(name: &str) -> bool {
|
||||
pub fn is_metric_engine(name: &str) -> bool {
|
||||
name == METRIC_ENGINE_NAME
|
||||
}
|
||||
|
||||
/// Option key for metric engine index type.
|
||||
/// Used to identify the primary key index type of the metric engine.
|
||||
/// ```sql
|
||||
/// CREATE TABLE table_name (
|
||||
/// ...
|
||||
/// )
|
||||
/// ENGINE = metric
|
||||
/// WITH (
|
||||
/// physical_metric_table = "",
|
||||
/// index.type = "inverted",
|
||||
/// index.inverted_index.segment_row_count = "256",
|
||||
/// );
|
||||
/// ```
|
||||
pub const METRIC_ENGINE_INDEX_TYPE_OPTION: &str = "index.type";
|
||||
|
||||
/// Option key for the granularity of the skipping index in the metric engine.
|
||||
/// This key is used to specify the granularity of the primary key index (skipping index) in the metric engine.
|
||||
/// ```sql
|
||||
/// CREATE TABLE table_name (
|
||||
/// ...
|
||||
/// )
|
||||
/// ENGINE = metric
|
||||
/// WITH (
|
||||
/// physical_metric_table = "",
|
||||
/// index.type = "skipping",
|
||||
/// index.granularity = "102400",
|
||||
/// );
|
||||
/// ```
|
||||
pub const METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION: &str = "index.granularity";
|
||||
|
||||
/// Default granularity for the skipping index in the metric engine.
|
||||
pub const METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT: u32 = 102400;
|
||||
|
||||
/// Returns true if the `key` is a valid option key for the metric engine.
|
||||
pub fn is_metric_engine_option_key(key: &str) -> bool {
|
||||
[
|
||||
PHYSICAL_TABLE_METADATA_KEY,
|
||||
LOGICAL_TABLE_METADATA_KEY,
|
||||
METRIC_ENGINE_INDEX_TYPE_OPTION,
|
||||
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION,
|
||||
]
|
||||
.contains(&key)
|
||||
}
|
||||
|
||||
@@ -304,13 +304,13 @@ impl TableMeta {
|
||||
columns.push(new_column_schema);
|
||||
} else {
|
||||
let mut new_column_schema = column_schema.clone();
|
||||
new_column_schema.with_inverted_index(value);
|
||||
new_column_schema.set_inverted_index(value);
|
||||
columns.push(new_column_schema);
|
||||
}
|
||||
} else if pk_as_inverted_index && self.primary_key_indices.contains(&i) {
|
||||
// Need to set inverted_indexed=true for all other columns in primary key.
|
||||
let mut new_column_schema = column_schema.clone();
|
||||
new_column_schema.with_inverted_index(true);
|
||||
new_column_schema.set_inverted_index(true);
|
||||
columns.push(new_column_schema);
|
||||
} else {
|
||||
columns.push(column_schema.clone());
|
||||
|
||||
@@ -28,7 +28,9 @@ use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, FulltextOptions};
|
||||
use greptime_proto::v1::region::compact_request;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY};
|
||||
use store_api::metric_engine_consts::{
|
||||
is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY,
|
||||
};
|
||||
use store_api::mito_engine_options::is_mito_engine_option_key;
|
||||
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
|
||||
|
||||
@@ -51,6 +53,10 @@ pub fn validate_table_option(key: &str) -> bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
if is_metric_engine_option_key(key) {
|
||||
return true;
|
||||
}
|
||||
|
||||
[
|
||||
// common keys:
|
||||
WRITE_BUFFER_SIZE_KEY,
|
||||
|
||||
@@ -71,6 +71,27 @@ DESC TABLE phy;
|
||||
| job | String | PRI | YES | | TAG |
|
||||
+------------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "host" STRING NULL, |
|
||||
| | "job" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host", "job"), |
|
||||
| | INVERTED INDEX ("host", "job") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
DESC TABLE t1;
|
||||
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
@@ -219,3 +240,74 @@ DROP TABLE phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "index.type" = "skipping", "index.granularity" = "8192");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | 'index.granularity' = '8192', |
|
||||
| | 'index.type' = 'skipping', |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "host" STRING NULL SKIPPING INDEX WITH(granularity = '8192', type = 'BLOOM'), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | 'index.granularity' = '8192', |
|
||||
| | 'index.type' = 'skipping', |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
|
||||
SHOW INDEX FROM phy;
|
||||
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
| phy | 1 | PRIMARY, INVERTED INDEX | 3 | __table_id | A | | | | YES | greptime-primary-key-v1, greptime-inverted-index-v1 | | | YES | |
|
||||
| phy | 1 | PRIMARY | 4 | __tsid | A | | | | YES | greptime-primary-key-v1 | | | YES | |
|
||||
| phy | 1 | PRIMARY, SKIPPING INDEX | 5 | host | A | | | | YES | greptime-primary-key-v1, greptime-bloom-filter-v1 | | | YES | |
|
||||
| phy | 1 | TIME INDEX | 1 | ts | A | | | | NO | | | | YES | |
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "index.type" = "hihi", "index.granularity" = "8192");
|
||||
|
||||
Error: 1004(InvalidArguments), Failed to parse region options: Invalid index type: hihi
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ SELECT table_catalog, table_schema, table_name, table_type, engine FROM informat
|
||||
|
||||
DESC TABLE phy;
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
DESC TABLE t1;
|
||||
|
||||
DESC TABLE t2;
|
||||
@@ -86,3 +88,19 @@ DROP TABLE t1;
|
||||
DESC TABLE t1;
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "index.type" = "skipping", "index.granularity" = "8192");
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy");
|
||||
|
||||
SHOW CREATE TABLE phy;
|
||||
|
||||
SHOW INDEX FROM phy;
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
DROP TABLE phy;
|
||||
|
||||
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "index.type" = "hihi", "index.granularity" = "8192");
|
||||
|
||||
@@ -129,7 +129,8 @@ show create table phy;
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "host" STRING NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | PRIMARY KEY ("host"), |
|
||||
| | INVERTED INDEX ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
@@ -222,6 +223,73 @@ drop table phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "phy" (
|
||||
"ts" TIMESTAMP(3) NOT NULL,
|
||||
"val" DOUBLE NULL,
|
||||
"host" STRING NULL SKIPPING INDEX WITH(granularity = '8192', type = 'BLOOM'),
|
||||
TIME INDEX ("ts"),
|
||||
PRIMARY KEY ("host"),
|
||||
)
|
||||
ENGINE=metric
|
||||
WITH(
|
||||
'index.granularity' = '8192',
|
||||
'index.type' = 'skipping',
|
||||
physical_metric_table = ''
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
show create table phy;
|
||||
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" DOUBLE NULL, |
|
||||
| | "host" STRING NULL SKIPPING INDEX WITH(granularity = '8192', type = 'BLOOM'), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=metric |
|
||||
| | WITH( |
|
||||
| | 'index.granularity' = '8192', |
|
||||
| | 'index.type' = 'skipping', |
|
||||
| | physical_metric_table = '' |
|
||||
| | ) |
|
||||
+-------+---------------------------------------------------------------------------------+
|
||||
|
||||
CREATE TABLE t1 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
val DOUBLE,
|
||||
job STRING PRIMARY KEY
|
||||
) ENGINE=metric WITH (
|
||||
"on_physical_table" = "phy"
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
show index from phy;
|
||||
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
| phy | 1 | PRIMARY, INVERTED INDEX | 4 | __table_id | A | | | | YES | greptime-primary-key-v1, greptime-inverted-index-v1 | | | YES | |
|
||||
| phy | 1 | PRIMARY | 5 | __tsid | A | | | | YES | greptime-primary-key-v1 | | | YES | |
|
||||
| phy | 1 | PRIMARY, SKIPPING INDEX | 3 | host | A | | | | YES | greptime-primary-key-v1, greptime-bloom-filter-v1 | | | YES | |
|
||||
| phy | 1 | PRIMARY, SKIPPING INDEX | 6 | job | A | | | | YES | greptime-primary-key-v1, greptime-bloom-filter-v1 | | | YES | |
|
||||
| phy | 1 | TIME INDEX | 1 | ts | A | | | | NO | | | | YES | |
|
||||
+-------+------------+-------------------------+--------------+-------------+-----------+-------------+----------+--------+------+-----------------------------------------------------+---------+---------------+---------+------------+
|
||||
|
||||
drop table t1;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table phy;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
show create table numbers;
|
||||
|
||||
Error: 1001(Unsupported), Show create table only for base table. greptime.public.numbers is TEMPORARY
|
||||
|
||||
@@ -84,6 +84,36 @@ show create table phy;
|
||||
|
||||
drop table phy;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "phy" (
|
||||
"ts" TIMESTAMP(3) NOT NULL,
|
||||
"val" DOUBLE NULL,
|
||||
"host" STRING NULL SKIPPING INDEX WITH(granularity = '8192', type = 'BLOOM'),
|
||||
TIME INDEX ("ts"),
|
||||
PRIMARY KEY ("host"),
|
||||
)
|
||||
ENGINE=metric
|
||||
WITH(
|
||||
'index.granularity' = '8192',
|
||||
'index.type' = 'skipping',
|
||||
physical_metric_table = ''
|
||||
);
|
||||
|
||||
show create table phy;
|
||||
|
||||
CREATE TABLE t1 (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
val DOUBLE,
|
||||
job STRING PRIMARY KEY
|
||||
) ENGINE=metric WITH (
|
||||
"on_physical_table" = "phy"
|
||||
);
|
||||
|
||||
show index from phy;
|
||||
|
||||
drop table t1;
|
||||
|
||||
drop table phy;
|
||||
|
||||
show create table numbers;
|
||||
|
||||
show create table information_schema.columns;
|
||||
|
||||
Reference in New Issue
Block a user