feat: handle drop request for metric table (#3136)

* handle drop request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust procedure manager

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add create table sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* insert/query metric table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* address CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/common/meta/src/kv_backend.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reuse region option for metadata region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak variable name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-01-11 17:38:43 +08:00
committed by GitHub
parent f5798e2833
commit 09b3c7029b
13 changed files with 440 additions and 12 deletions

View File

@@ -389,15 +389,21 @@ async fn handle_drop_table_task(
let table_metadata_manager = &ddl_manager.table_metadata_manager();
let table_ref = drop_table_task.table_ref();
let (table_info_value, table_route_value) =
table_metadata_manager.get_full_table_info(table_id).await?;
let table_info_value = table_metadata_manager
.table_info_manager()
.get(table_id)
.await?;
let (_, table_route_value) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value));
let id = ddl_manager
.submit_drop_table_task(

View File

@@ -354,6 +354,7 @@ impl TableMetadataManager {
&self.kv_backend
}
// TODO(ruihang): deprecate this
pub async fn get_full_table_info(
&self,
table_id: TableId,

View File

@@ -114,6 +114,7 @@ where
Ok(!resp.kvs.is_empty())
}
/// Returns previous key-value pair if `prev_kv` is `true`.
async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
if prev_kv {

View File

@@ -15,6 +15,7 @@
mod alter;
mod close;
mod create;
mod drop;
mod open;
mod put;
mod read;
@@ -83,13 +84,15 @@ use crate::utils;
/// | Operations | Logical Region | Physical Region |
/// | ---------- | -------------- | --------------- |
/// | Create | ✅ | ✅ |
/// | Drop | ✅ | |
/// | Drop | ✅ | ❓* |
/// | Write | ✅ | ❌ |
/// | Read | ✅ | ✅ |
/// | Close | ✅ | ✅ |
/// | Open | ✅ | ✅ |
/// | Alter | ✅ | ❌ |
///
/// *: Physical region can be dropped only when all related logical regions are dropped.
///
/// ## Internal Columns
///
/// The physical data region contains two internal columns. Should
@@ -123,7 +126,7 @@ impl RegionEngine for MetricEngine {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self.inner.create_region(region_id, create).await,
RegionRequest::Drop(_) => todo!(),
RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await,

View File

@@ -73,8 +73,7 @@ impl MetricEngineInner {
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
// create metadata region
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
let create_metadata_region_request = self.create_request_for_metadata_region(&request);
self.mito
.handle_request(
metadata_region_id,
@@ -287,7 +286,10 @@ impl MetricEngineInner {
/// Build [RegionCreateRequest] for metadata region
///
/// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest {
pub fn create_request_for_metadata_region(
&self,
request: &RegionCreateRequest,
) -> RegionCreateRequest {
// ts TIME INDEX DEFAULT 0
let timestamp_column_metadata = ColumnMetadata {
column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
@@ -324,7 +326,7 @@ impl MetricEngineInner {
};
// concat region dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
@@ -334,7 +336,7 @@ impl MetricEngineInner {
value_column_metadata,
],
primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
options: HashMap::new(),
options: request.options.clone(),
region_dir: metadata_region_dir,
}
}

View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Drop a metric region
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AffectedRows, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::error::{
CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu,
PhysicalRegionBusySnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::{metadata_region, utils};
impl MetricEngineInner {
pub async fn drop_region(
&self,
region_id: RegionId,
_req: RegionDropRequest,
) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
// enclose the guard in a block to prevent the guard from polluting the async context
let (is_physical_region, is_physical_region_busy) = {
if let Some(logical_regions) = self
.state
.read()
.unwrap()
.physical_regions()
.get(&data_region_id)
{
(true, !logical_regions.is_empty())
} else {
// the second argument is not used, just pass in a dummy value
(false, true)
}
};
if is_physical_region {
// check if there is no logical region relates to this physical region
if is_physical_region_busy {
// reject if there is any present logical region
return Err(PhysicalRegionBusySnafu {
region_id: data_region_id,
}
.build());
}
self.drop_physical_region(data_region_id).await
} else {
// cannot merge these two `if` otherwise the stupid type checker will complain
let metadata_region_id = self
.state
.read()
.unwrap()
.logical_regions()
.get(&region_id)
.copied();
if let Some(metadata_region_id) = metadata_region_id {
self.drop_logical_region(region_id, metadata_region_id)
.await
} else {
Err(LogicalRegionNotFoundSnafu { region_id }.build())
}
}
}
async fn drop_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);
// Drop mito regions.
// Since the physical regions are going to be dropped, we don't need to
// update the contents in metadata region.
self.mito
.handle_request(data_region_id, RegionRequest::Drop(RegionDropRequest {}))
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Drop(RegionDropRequest {}),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
PHYSICAL_REGION_COUNT.dec();
// Update engine state
self.state
.write()
.unwrap()
.remove_physical_region(data_region_id)?;
Ok(0)
}
async fn drop_logical_region(
&self,
logical_region_id: RegionId,
physical_region_id: RegionId,
) -> Result<AffectedRows> {
// Update metadata
self.metadata_region
.remove_logical_region(physical_region_id, logical_region_id)
.await?;
// Update engine state
self.state
.write()
.unwrap()
.remove_logical_region(logical_region_id)?;
Ok(0)
}
}

View File

@@ -115,4 +115,20 @@ impl MetricEngineState {
self.physical_columns.remove(&physical_region_id);
Ok(())
}
/// Remove all data that are related to the logical region id.
pub fn remove_logical_region(&mut self, logical_region_id: RegionId) -> Result<()> {
let physical_region_id = self.logical_regions.remove(&logical_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: logical_region_id,
},
)?;
self.physical_regions
.get_mut(&physical_region_id)
.unwrap() // Safety: physical_region_id is got from physical_regions
.remove(&logical_region_id);
Ok(())
}
}

View File

@@ -146,6 +146,15 @@ pub enum Error {
source: store_api::metadata::MetadataError,
location: Location,
},
#[snafu(display(
"Physical region {} is busy, there are still some logical regions using it",
region_id
))]
PhysicalRegionBusy {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -158,7 +167,8 @@ impl ErrorExt for Error {
InternalColumnOccupied { .. }
| MissingRegionOption { .. }
| ConflictRegionOption { .. }
| ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
| ColumnTypeMismatch { .. }
| PhysicalRegionBusy { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported,

View File

@@ -29,7 +29,7 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionPutRequest;
use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::error::{
@@ -111,6 +111,34 @@ impl MetadataRegion {
.await
}
/// Remove a registered logical region from metadata.
///
/// This method doesn't check if the previous key exists.
pub async fn remove_logical_region(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<()> {
// concat region key
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
// concat column keys
let logical_columns = self
.logical_columns(physical_region_id, logical_region_id)
.await?;
let mut column_keys = logical_columns
.into_iter()
.map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
.collect::<Vec<_>>();
// remove region key and column keys
column_keys.push(region_key);
self.delete(region_id, &column_keys).await?;
Ok(())
}
/// Check if the given logical region exists.
pub async fn is_logical_region_exists(
&self,
@@ -354,6 +382,20 @@ impl MetadataRegion {
Ok(result)
}
/// Delete the given keys. For performance consideration, this method
/// doesn't check if those keys exist or not.
async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
let delete_request = Self::build_delete_request(keys);
self.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Delete(delete_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(())
}
/// Builds a [ScanRequest] to read metadata for a given key.
/// The request will contains a EQ filter on the key column.
///
@@ -409,6 +451,39 @@ impl MetadataRegion {
RegionPutRequest { rows }
}
fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
..Default::default()
},
];
let rows = keys
.iter()
.map(|key| Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
],
})
.collect();
let rows = Rows { schema: cols, rows };
RegionDeleteRequest { rows }
}
}
#[cfg(test)]

View File

@@ -0,0 +1,68 @@
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
Affected Rows: 0
SHOW TABLES;
+---------+
| Tables |
+---------+
| numbers |
| phy |
+---------+
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
Error: 3000(PlanQuery), Failed to plan SQL: No field named metric. Valid fields are information_schema.tables.table_catalog, information_schema.tables.table_schema, information_schema.tables.table_name, information_schema.tables.table_type, information_schema.tables.table_id, information_schema.tables.engine.
-- We currently don't maintains physical table's schema.
DESC TABLE phy;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
DESC TABLE t1;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
| host | String | PRI | YES | | TAG |
+--------+----------------------+-----+------+---------+---------------+
DESC TABLE t2;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| job | String | PRI | YES | | TAG |
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
-- TODO(ruihang): add a case that drops phy before t1
DROP TABLE t1;
Affected Rows: 0
DROP TABLE t2;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,24 @@
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
SHOW TABLES;
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name;
-- We currently don't maintains physical table's schema.
DESC TABLE phy;
DESC TABLE t1;
DESC TABLE t2;
-- TODO(ruihang): add a case that drops phy before t1
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE phy;

View File

@@ -0,0 +1,62 @@
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2');
Affected Rows: 2
SELECT * from t1;
+-------------------------+-----+-------+
| ts | val | host |
+-------------------------+-----+-------+
| 1970-01-01T00:00:00.001 | 1.0 | host2 |
| 1970-01-01T00:00:00 | 0.0 | host1 |
+-------------------------+-----+-------+
-- TODO(ruihang): fix this. t2 should not contains data from t1
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
SELECT * from t2;
+-------------------------+-----+-----+
| ts | job | val |
+-------------------------+-----+-----+
| 1970-01-01T00:00:00.001 | | 1.0 |
| 1970-01-01T00:00:00 | | 0.0 |
+-------------------------+-----+-----+
INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1);
Affected Rows: 2
SELECT * from t2;
+-------------------------+------+-----+
| ts | job | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00.001 | | 1.0 |
| 1970-01-01T00:00:00 | | 0.0 |
| 1970-01-01T00:00:00.001 | job2 | 1.0 |
| 1970-01-01T00:00:00 | job1 | 0.0 |
+-------------------------+------+-----+
DROP TABLE t1;
Affected Rows: 0
DROP TABLE t2;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,22 @@
CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2');
SELECT * from t1;
-- TODO(ruihang): fix this. t2 should not contains data from t1
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT * from t2;
INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1);
SELECT * from t2;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE phy;