From 09b3c7029b7d7ac8abd1cebb613c9a00ea7c66bb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 11 Jan 2024 17:38:43 +0800 Subject: [PATCH] feat: handle drop request for metric table (#3136) * handle drop request Signed-off-by: Ruihang Xia * adjust procedure manager Signed-off-by: Ruihang Xia * add create table sqlness test Signed-off-by: Ruihang Xia * insert/query metric table Signed-off-by: Ruihang Xia * address CR comments Signed-off-by: Ruihang Xia * Update src/common/meta/src/kv_backend.rs Co-authored-by: JeremyHi * fix clippy Signed-off-by: Ruihang Xia * reuse region option for metadata region Signed-off-by: Ruihang Xia * tweak variable name Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: JeremyHi --- src/common/meta/src/ddl_manager.rs | 12 +- src/common/meta/src/key.rs | 1 + src/common/meta/src/kv_backend.rs | 1 + src/metric-engine/src/engine.rs | 7 +- src/metric-engine/src/engine/create.rs | 12 +- src/metric-engine/src/engine/drop.rs | 138 ++++++++++++++++++ src/metric-engine/src/engine/state.rs | 16 ++ src/metric-engine/src/error.rs | 12 +- src/metric-engine/src/metadata_region.rs | 77 +++++++++- .../common/create/create_metric_table.result | 68 +++++++++ .../common/create/create_metric_table.sql | 24 +++ .../common/insert/logical_metric_table.result | 62 ++++++++ .../common/insert/logical_metric_table.sql | 22 +++ 13 files changed, 440 insertions(+), 12 deletions(-) create mode 100644 src/metric-engine/src/engine/drop.rs create mode 100644 tests/cases/standalone/common/create/create_metric_table.result create mode 100644 tests/cases/standalone/common/create/create_metric_table.sql create mode 100644 tests/cases/standalone/common/insert/logical_metric_table.result create mode 100644 tests/cases/standalone/common/insert/logical_metric_table.sql diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d8b19cebdf..f06f3583c2 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -389,15 +389,21 @@ async fn handle_drop_table_task( let table_metadata_manager = &ddl_manager.table_metadata_manager(); let table_ref = drop_table_task.table_ref(); - let (table_info_value, table_route_value) = - table_metadata_manager.get_full_table_info(table_id).await?; + let table_info_value = table_metadata_manager + .table_info_manager() + .get(table_id) + .await?; + let (_, table_route_value) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await?; let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { table_name: table_ref.to_string(), })?; let table_route_value = - table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?; + DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value)); let id = ddl_manager .submit_drop_table_task( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 0c62d89e4d..2e9a47a1e4 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -354,6 +354,7 @@ impl TableMetadataManager { &self.kv_backend } + // TODO(ruihang): deprecate this pub async fn get_full_table_info( &self, table_id: TableId, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 5c31ff2529..e16dc2c23a 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -114,6 +114,7 @@ where Ok(!resp.kvs.is_empty()) } + /// Returns previous key-value pair if `prev_kv` is `true`. async fn delete(&self, key: &[u8], prev_kv: bool) -> Result, Self::Error> { let mut req = DeleteRangeRequest::new().with_key(key.to_vec()); if prev_kv { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 40040dc49a..bd81f17d66 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -15,6 +15,7 @@ mod alter; mod close; mod create; +mod drop; mod open; mod put; mod read; @@ -83,13 +84,15 @@ use crate::utils; /// | Operations | Logical Region | Physical Region | /// | ---------- | -------------- | --------------- | /// | Create | ✅ | ✅ | -/// | Drop | ✅ | ❌ | +/// | Drop | ✅ | ❓* | /// | Write | ✅ | ❌ | /// | Read | ✅ | ✅ | /// | Close | ✅ | ✅ | /// | Open | ✅ | ✅ | /// | Alter | ✅ | ❌ | /// +/// *: Physical region can be dropped only when all related logical regions are dropped. +/// /// ## Internal Columns /// /// The physical data region contains two internal columns. Should @@ -123,7 +126,7 @@ impl RegionEngine for MetricEngine { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), RegionRequest::Create(create) => self.inner.create_region(region_id, create).await, - RegionRequest::Drop(_) => todo!(), + RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await, RegionRequest::Open(open) => self.inner.open_region(region_id, open).await, RegionRequest::Close(close) => self.inner.close_region(region_id, close).await, RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await, diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index bfd87b92e7..bc04eeb82a 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -73,8 +73,7 @@ impl MetricEngineInner { let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id); // create metadata region - let create_metadata_region_request = - self.create_request_for_metadata_region(&request.region_dir); + let create_metadata_region_request = self.create_request_for_metadata_region(&request); self.mito .handle_request( metadata_region_id, @@ -287,7 +286,10 @@ impl MetricEngineInner { /// Build [RegionCreateRequest] for metadata region /// /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`. - pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest { + pub fn create_request_for_metadata_region( + &self, + request: &RegionCreateRequest, + ) -> RegionCreateRequest { // ts TIME INDEX DEFAULT 0 let timestamp_column_metadata = ColumnMetadata { column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _, @@ -324,7 +326,7 @@ impl MetricEngineInner { }; // concat region dir - let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR); + let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR); RegionCreateRequest { engine: MITO_ENGINE_NAME.to_string(), @@ -334,7 +336,7 @@ impl MetricEngineInner { value_column_metadata, ], primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _], - options: HashMap::new(), + options: request.options.clone(), region_dir: metadata_region_dir, } } diff --git a/src/metric-engine/src/engine/drop.rs b/src/metric-engine/src/engine/drop.rs new file mode 100644 index 0000000000..9399844720 --- /dev/null +++ b/src/metric-engine/src/engine/drop.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Drop a metric region + +use mito2::engine::MITO_ENGINE_NAME; +use object_store::util::join_dir; +use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, +}; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + AffectedRows, RegionDropRequest, RegionOpenRequest, RegionRequest, +}; +use store_api::storage::RegionId; + +use super::MetricEngineInner; +use crate::error::{ + CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, + PhysicalRegionBusySnafu, PhysicalRegionNotFoundSnafu, Result, +}; +use crate::metrics::PHYSICAL_REGION_COUNT; +use crate::{metadata_region, utils}; + +impl MetricEngineInner { + pub async fn drop_region( + &self, + region_id: RegionId, + _req: RegionDropRequest, + ) -> Result { + let data_region_id = utils::to_data_region_id(region_id); + + // enclose the guard in a block to prevent the guard from polluting the async context + let (is_physical_region, is_physical_region_busy) = { + if let Some(logical_regions) = self + .state + .read() + .unwrap() + .physical_regions() + .get(&data_region_id) + { + (true, !logical_regions.is_empty()) + } else { + // the second argument is not used, just pass in a dummy value + (false, true) + } + }; + + if is_physical_region { + // check if there is no logical region relates to this physical region + if is_physical_region_busy { + // reject if there is any present logical region + return Err(PhysicalRegionBusySnafu { + region_id: data_region_id, + } + .build()); + } + + self.drop_physical_region(data_region_id).await + } else { + // cannot merge these two `if` otherwise the stupid type checker will complain + let metadata_region_id = self + .state + .read() + .unwrap() + .logical_regions() + .get(®ion_id) + .copied(); + if let Some(metadata_region_id) = metadata_region_id { + self.drop_logical_region(region_id, metadata_region_id) + .await + } else { + Err(LogicalRegionNotFoundSnafu { region_id }.build()) + } + } + } + + async fn drop_physical_region(&self, region_id: RegionId) -> Result { + let data_region_id = utils::to_data_region_id(region_id); + let metadata_region_id = utils::to_metadata_region_id(region_id); + + // Drop mito regions. + // Since the physical regions are going to be dropped, we don't need to + // update the contents in metadata region. + self.mito + .handle_request(data_region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .with_context(|_| CloseMitoRegionSnafu { region_id })?; + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Drop(RegionDropRequest {}), + ) + .await + .with_context(|_| CloseMitoRegionSnafu { region_id })?; + + PHYSICAL_REGION_COUNT.dec(); + + // Update engine state + self.state + .write() + .unwrap() + .remove_physical_region(data_region_id)?; + + Ok(0) + } + + async fn drop_logical_region( + &self, + logical_region_id: RegionId, + physical_region_id: RegionId, + ) -> Result { + // Update metadata + self.metadata_region + .remove_logical_region(physical_region_id, logical_region_id) + .await?; + + // Update engine state + self.state + .write() + .unwrap() + .remove_logical_region(logical_region_id)?; + + Ok(0) + } +} diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 87023bcfa5..154115138d 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -115,4 +115,20 @@ impl MetricEngineState { self.physical_columns.remove(&physical_region_id); Ok(()) } + + /// Remove all data that are related to the logical region id. + pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> { + let physical_region_id = self.logical_regions.remove(&logical_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: logical_region_id, + }, + )?; + + self.physical_regions + .get_mut(&physical_region_id) + .unwrap() // Safety: physical_region_id is got from physical_regions + .remove(&logical_region_id); + + Ok(()) + } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 2a79c1d90d..aa4f35472c 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -146,6 +146,15 @@ pub enum Error { source: store_api::metadata::MetadataError, location: Location, }, + + #[snafu(display( + "Physical region {} is busy, there are still some logical regions using it", + region_id + ))] + PhysicalRegionBusy { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -158,7 +167,8 @@ impl ErrorExt for Error { InternalColumnOccupied { .. } | MissingRegionOption { .. } | ConflictRegionOption { .. } - | ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, + | ColumnTypeMismatch { .. } + | PhysicalRegionBusy { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported, diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 0621b9a3fd..97b73d7a9b 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -29,7 +29,7 @@ use store_api::metric_engine_consts::{ METADATA_SCHEMA_VALUE_COLUMN_NAME, }; use store_api::region_engine::RegionEngine; -use store_api::region_request::RegionPutRequest; +use store_api::region_request::{RegionDeleteRequest, RegionPutRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::error::{ @@ -111,6 +111,34 @@ impl MetadataRegion { .await } + /// Remove a registered logical region from metadata. + /// + /// This method doesn't check if the previous key exists. + pub async fn remove_logical_region( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result<()> { + // concat region key + let region_id = utils::to_metadata_region_id(physical_region_id); + let region_key = Self::concat_region_key(logical_region_id); + + // concat column keys + let logical_columns = self + .logical_columns(physical_region_id, logical_region_id) + .await?; + let mut column_keys = logical_columns + .into_iter() + .map(|(col, _)| Self::concat_column_key(logical_region_id, &col)) + .collect::>(); + + // remove region key and column keys + column_keys.push(region_key); + self.delete(region_id, &column_keys).await?; + + Ok(()) + } + /// Check if the given logical region exists. pub async fn is_logical_region_exists( &self, @@ -354,6 +382,20 @@ impl MetadataRegion { Ok(result) } + /// Delete the given keys. For performance consideration, this method + /// doesn't check if those keys exist or not. + async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> { + let delete_request = Self::build_delete_request(keys); + self.mito + .handle_request( + region_id, + store_api::region_request::RegionRequest::Delete(delete_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + Ok(()) + } + /// Builds a [ScanRequest] to read metadata for a given key. /// The request will contains a EQ filter on the key column. /// @@ -409,6 +451,39 @@ impl MetadataRegion { RegionPutRequest { rows } } + + fn build_delete_request(keys: &[String]) -> RegionDeleteRequest { + let cols = vec![ + ColumnSchema { + column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Tag as _, + ..Default::default() + }, + ]; + let rows = keys + .iter() + .map(|key| Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + ], + }) + .collect(); + let rows = Rows { schema: cols, rows }; + + RegionDeleteRequest { rows } + } } #[cfg(test)] diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result new file mode 100644 index 0000000000..31a095c95a --- /dev/null +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -0,0 +1,68 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +SHOW TABLES; + ++---------+ +| Tables | ++---------+ +| numbers | +| phy | ++---------+ + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name; + +Error: 3000(PlanQuery), Failed to plan SQL: No field named metric. Valid fields are information_schema.tables.table_catalog, information_schema.tables.table_schema, information_schema.tables.table_name, information_schema.tables.table_type, information_schema.tables.table_id, information_schema.tables.engine. + +-- We currently don't maintains physical table's schema. +DESC TABLE phy; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE t1; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | +| host | String | PRI | YES | | TAG | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE t2; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| job | String | PRI | YES | | TAG | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +-- TODO(ruihang): add a case that drops phy before t1 +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/create/create_metric_table.sql b/tests/cases/standalone/common/create/create_metric_table.sql new file mode 100644 index 0000000000..3caf427d5a --- /dev/null +++ b/tests/cases/standalone/common/create/create_metric_table.sql @@ -0,0 +1,24 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +SHOW TABLES; + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name; + +-- We currently don't maintains physical table's schema. +DESC TABLE phy; + +DESC TABLE t1; + +DESC TABLE t2; + +-- TODO(ruihang): add a case that drops phy before t1 + +DROP TABLE t1; + +DROP TABLE t2; + +DROP TABLE phy; diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result new file mode 100644 index 0000000000..d3f5978683 --- /dev/null +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -0,0 +1,62 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2'); + +Affected Rows: 2 + +SELECT * from t1; + ++-------------------------+-----+-------+ +| ts | val | host | ++-------------------------+-----+-------+ +| 1970-01-01T00:00:00.001 | 1.0 | host2 | +| 1970-01-01T00:00:00 | 0.0 | host1 | ++-------------------------+-----+-------+ + +-- TODO(ruihang): fix this. t2 should not contains data from t1 +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +SELECT * from t2; + ++-------------------------+-----+-----+ +| ts | job | val | ++-------------------------+-----+-----+ +| 1970-01-01T00:00:00.001 | | 1.0 | +| 1970-01-01T00:00:00 | | 0.0 | ++-------------------------+-----+-----+ + +INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1); + +Affected Rows: 2 + +SELECT * from t2; + ++-------------------------+------+-----+ +| ts | job | val | ++-------------------------+------+-----+ +| 1970-01-01T00:00:00.001 | | 1.0 | +| 1970-01-01T00:00:00 | | 0.0 | +| 1970-01-01T00:00:00.001 | job2 | 1.0 | +| 1970-01-01T00:00:00 | job1 | 0.0 | ++-------------------------+------+-----+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql new file mode 100644 index 0000000000..6583833de5 --- /dev/null +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -0,0 +1,22 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2'); + +SELECT * from t1; + +-- TODO(ruihang): fix this. t2 should not contains data from t1 +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +SELECT * from t2; + +INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1); + +SELECT * from t2; + +DROP TABLE t1; + +DROP TABLE t2; + +DROP TABLE phy;