From 86adac1532506aa538458b3031d5f54a2d8d721e Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 1 Jun 2023 17:05:17 +0800 Subject: [PATCH] chore: reject table creation when partitions exceeds peer number (#1654) * chore: table creation is rejected, when partition_num exceeds peer_num * chore: modify no_active_datanode error msg * fix: ut * fix sqlness test and add limit for select peer in region_failover * upgrade greptime-proto * self cr * fix: cargo sqlness * chore: add table info in select ctx for failover * fix sqlness --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/catalog/src/remote.rs | 7 + src/catalog/src/remote/client.rs | 21 ++- src/catalog/tests/mock.rs | 4 + src/common/meta/src/key.rs | 35 ++++ src/common/meta/src/key/table_route.rs | 97 +++++++++++ src/common/meta/src/lib.rs | 1 + src/frontend/src/table.rs | 75 ++++++-- src/meta-srv/src/handler/node_stat.rs | 2 +- src/meta-srv/src/keys.rs | 49 +----- src/meta-srv/src/lease.rs | 23 ++- src/meta-srv/src/metasrv.rs | 1 + src/meta-srv/src/metasrv/builder.rs | 1 + src/meta-srv/src/procedure/region_failover.rs | 5 +- .../region_failover/failover_start.rs | 14 +- .../region_failover/update_metadata.rs | 3 +- src/meta-srv/src/selector/lease_based.rs | 12 +- src/meta-srv/src/selector/load_based.rs | 160 +++++++++++++++--- src/meta-srv/src/service/router.rs | 24 ++- src/meta-srv/src/table_routes.rs | 2 +- src/meta-srv/src/test_util.rs | 1 + tests-integration/src/cluster.rs | 2 +- tests-integration/tests/region_failover.rs | 23 ++- .../distributed/alter/rename_table.result | 2 +- .../common/alter/alter_table.result | 14 +- .../standalone/common/alter/alter_table.sql | 14 +- tests/runner/src/env.rs | 125 +++++++++----- 28 files changed, 546 insertions(+), 175 deletions(-) create mode 100644 src/common/meta/src/key.rs create mode 100644 src/common/meta/src/key/table_route.rs diff --git a/Cargo.lock b/Cargo.lock index 212fc2d6c4..67bc70a25d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4050,7 +4050,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=44c5adf34938d0650c18a14db2a374bdee471ae7#44c5adf34938d0650c18a14db2a374bdee471ae7" dependencies = [ "prost", "serde", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 18503b2701..fb9f7a08ab 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "44c5adf34938d0650c18a14db2a374bdee471ae7" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index 42b52cec71..a4ebebf4fb 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -72,6 +72,9 @@ pub trait KvBackend: Send + Sync { return Ok(None); } + /// MoveValue atomically renames the key to the given updated key. + async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error>; + fn as_any(&self) -> &dyn Any; } @@ -125,6 +128,10 @@ mod tests { unimplemented!() } + async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index 981d7c209a..6df2fcd261 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use std::time::Duration; use async_stream::stream; -use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest}; +use common_meta::rpc::store::{ + CompareAndPutRequest, DeleteRangeRequest, MoveValueRequest, PutRequest, RangeRequest, +}; use common_telemetry::{info, timer}; use meta_client::client::MetaClient; use moka::future::{Cache, CacheBuilder}; @@ -101,6 +103,17 @@ impl KvBackend for CachedMetaKvBackend { ret } + async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> { + let ret = self.kv_backend.move_value(from_key, to_key).await; + + if ret.is_ok() { + self.invalidate_key(from_key).await; + self.invalidate_key(to_key).await; + } + + ret + } + fn as_any(&self) -> &dyn Any { self } @@ -225,6 +238,12 @@ impl KvBackend for MetaKvBackend { } } + async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> { + let req = MoveValueRequest::new(from_key, to_key); + self.client.move_value(req).await.context(MetaSrvSnafu)?; + Ok(()) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index af8f997934..10a924e3dd 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -153,6 +153,10 @@ impl KvBackend for MockKvBackend { Ok(()) } + async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs new file mode 100644 index 0000000000..86ec38cc12 --- /dev/null +++ b/src/common/meta/src/key.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod table_route; + +pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; + +pub const REMOVED_PREFIX: &str = "__removed"; + +pub fn to_removed_key(key: &str) -> String { + format!("{REMOVED_PREFIX}-{key}") +} + +#[cfg(test)] +mod tests { + use crate::key::to_removed_key; + + #[test] + fn test_to_removed_key() { + let key = "test_key"; + let removed = "__removed-test_key"; + assert_eq!(removed, to_removed_key(key)); + } +} diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs new file mode 100644 index 0000000000..ca29162951 --- /dev/null +++ b/src/common/meta/src/key/table_route.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::TableName; + +use crate::key::to_removed_key; + +pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; + +pub struct TableRouteKey<'a> { + pub table_id: u64, + pub catalog_name: &'a str, + pub schema_name: &'a str, + pub table_name: &'a str, +} + +impl<'a> TableRouteKey<'a> { + pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self { + Self { + table_id, + catalog_name: &t.catalog_name, + schema_name: &t.schema_name, + table_name: &t.table_name, + } + } + + pub fn prefix(&self) -> String { + format!( + "{}-{}-{}-{}", + TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name + ) + } + + pub fn key(&self) -> String { + format!("{}-{}", self.prefix(), self.table_id) + } + + pub fn removed_key(&self) -> String { + to_removed_key(&self.key()) + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::TableName; + + use super::TableRouteKey; + + #[test] + fn test_table_route_key() { + let key = TableRouteKey { + table_id: 123, + catalog_name: "greptime", + schema_name: "public", + table_name: "demo", + }; + + let prefix = key.prefix(); + assert_eq!("__meta_table_route-greptime-public-demo", prefix); + + let key_string = key.key(); + assert_eq!("__meta_table_route-greptime-public-demo-123", key_string); + + let removed = key.removed_key(); + assert_eq!( + "__removed-__meta_table_route-greptime-public-demo-123", + removed + ); + } + + #[test] + fn test_with_table_name() { + let table_name = TableName { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "demo".to_string(), + }; + + let key = TableRouteKey::with_table_name(123, &table_name); + + assert_eq!(123, key.table_id); + assert_eq!("greptime", key.catalog_name); + assert_eq!("public", key.schema_name); + assert_eq!("demo", key.table_name); + } +} diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 979ceeb0ef..b49a7c4620 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -15,6 +15,7 @@ pub mod error; pub mod heartbeat; pub mod instruction; +pub mod key; pub mod peer; pub mod rpc; pub mod table_name; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 2ae98a911e..394cb2c7c6 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -23,6 +23,7 @@ use catalog::helper::{TableGlobalKey, TableGlobalValue}; use catalog::remote::KvBackendRef; use client::Database; use common_error::prelude::BoxedError; +use common_meta::key::TableRouteKey; use common_meta::table_name::TableName; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; @@ -339,7 +340,44 @@ impl DistTable { .context(error::CatalogSnafu) } + async fn move_table_route_value( + &self, + catalog_name: &str, + schema_name: &str, + table_id: u32, + old_table_name: &str, + new_table_name: &str, + ) -> Result<()> { + let old_key = TableRouteKey { + table_id: table_id.into(), + catalog_name, + schema_name, + table_name: old_table_name, + } + .key(); + + let new_key = TableRouteKey { + table_id: table_id.into(), + catalog_name, + schema_name, + table_name: new_table_name, + } + .key(); + + self.backend + .move_value(old_key.as_bytes(), new_key.as_bytes()) + .await + .context(error::CatalogSnafu) + } + async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { + let AlterTableRequest { + catalog_name, + schema_name, + table_name, + alter_kind, + } = request; + let alter_expr = context .get::() .context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?; @@ -347,7 +385,6 @@ impl DistTable { self.alter_by_expr(alter_expr).await?; let table_info = self.table_info(); - let table_name = &table_info.name; let new_meta = table_info .meta .builder_with_alter_kind(table_name, &request.alter_kind) @@ -362,27 +399,33 @@ impl DistTable { new_info.meta = new_meta; let key = TableGlobalKey { - catalog_name: alter_expr.catalog_name.clone(), - schema_name: alter_expr.schema_name.clone(), - table_name: alter_expr.table_name.clone(), + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), }; - let mut value = - self.table_global_value(&key) - .await? - .context(error::TableNotFoundSnafu { - table_name: alter_expr.table_name.clone(), - })?; + let mut value = self + .table_global_value(&key) + .await? + .context(error::TableNotFoundSnafu { table_name })?; value.table_info = new_info.into(); - if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + if let AlterKind::RenameTable { new_table_name } = alter_kind { let new_key = TableGlobalKey { - catalog_name: alter_expr.catalog_name.clone(), - schema_name: alter_expr.schema_name.clone(), + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), table_name: new_table_name.clone(), }; self.set_table_global_value(new_key, value).await?; - self.delete_table_global_value(key).await + self.delete_table_global_value(key).await?; + self.move_table_route_value( + catalog_name, + schema_name, + table_info.ident.table_id, + table_name, + new_table_name, + ) + .await } else { self.set_table_global_value(key, value).await } @@ -616,6 +659,10 @@ mod test { unimplemented!() } + async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<()> { + unimplemented!() + } + async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> { unimplemented!() } diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 1cdb49c1c8..ef9289470a 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -44,7 +44,7 @@ pub struct Stat { pub region_stats: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct RegionStat { pub id: u64, pub catalog: String, diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index ee4bf80cd8..7a8f73d65a 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -14,8 +14,7 @@ use std::str::FromStr; -use api::v1::meta::TableName; -use catalog::helper::TableGlobalKey; +use common_meta::key::TABLE_ROUTE_PREFIX; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -28,7 +27,6 @@ use crate::handler::node_stat::Stat; pub(crate) const REMOVED_PREFIX: &str = "__removed"; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; -pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; @@ -129,51 +127,6 @@ impl TryFrom for Vec { } } -pub struct TableRouteKey<'a> { - pub table_id: u64, - pub catalog_name: &'a str, - pub schema_name: &'a str, - pub table_name: &'a str, -} - -impl<'a> TableRouteKey<'a> { - pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self { - Self { - table_id, - catalog_name: &t.catalog_name, - schema_name: &t.schema_name, - table_name: &t.table_name, - } - } - - pub fn with_table_global_key(table_id: u64, t: &'a TableGlobalKey) -> Self { - Self { - table_id, - catalog_name: &t.catalog_name, - schema_name: &t.schema_name, - table_name: &t.table_name, - } - } - - #[inline] - pub fn prefix(&self) -> String { - format!( - "{}-{}-{}-{}", - TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name - ) - } - - #[inline] - pub fn key(&self) -> String { - format!("{}-{}", self.prefix(), self.table_id) - } - - #[inline] - pub fn removed_key(&self) -> String { - to_removed_key(&self.key()) - } -} - pub(crate) fn to_removed_key(key: &str) -> String { format!("{REMOVED_PREFIX}-{key}") } diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 45e974c7f5..b951fd9f87 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -12,18 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::meta::RangeRequest; +use common_time::util as time_util; use crate::error::Result; use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; use crate::service::store::kv::KvStoreRef; use crate::util; -pub async fn alive_datanodes

( +pub async fn alive_datanodes( + cluster_id: u64, + kv_store: &KvStoreRef, + lease_secs: i64, +) -> Result> { + let lease_filter = |_: &LeaseKey, v: &LeaseValue| { + time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000 + }; + + filter_datanodes(cluster_id, kv_store, lease_filter).await +} + +pub async fn filter_datanodes

( cluster_id: u64, kv_store: &KvStoreRef, predicate: P, -) -> Result> +) -> Result> where P: Fn(&LeaseKey, &LeaseValue) -> bool, { @@ -38,14 +53,14 @@ where let res = kv_store.range(req).await?; let kvs = res.kvs; - let mut lease_kvs = vec![]; + let mut lease_kvs = HashMap::new(); for kv in kvs { let lease_key: LeaseKey = kv.key.try_into()?; let lease_value: LeaseValue = kv.value.try_into()?; if !predicate(&lease_key, &lease_value) { continue; } - lease_kvs.push((lease_key, lease_value)); + lease_kvs.insert(lease_key, lease_value); } Ok(lease_kvs) diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 64e4117aa6..8d664253ab 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -102,6 +102,7 @@ pub struct SelectorContext { pub kv_store: KvStoreRef, pub catalog: Option, pub schema: Option, + pub table: Option, } pub type SelectorRef = Arc>>; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 5b89dca3e3..c1eafbc3d2 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -156,6 +156,7 @@ impl MetaSrvBuilder { kv_store: kv_store.clone(), catalog: None, schema: None, + table: None, }, lock.clone(), )); diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 07de7e7c95..35861dbe22 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -462,8 +462,9 @@ mod tests { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), kv_store, - catalog: None, - schema: None, + catalog: Some(DEFAULT_CATALOG_NAME.to_string()), + schema: Some(DEFAULT_SCHEMA_NAME.to_string()), + table: Some(table.to_string()), }; TestingEnv { diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index ca2c00d05c..2956fd026e 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use common_error::prelude::{ErrorExt, StatusCode}; +use common_meta::instruction::TableIdent; use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::info; @@ -45,10 +46,21 @@ impl RegionFailoverStart { return Ok(candidate); } + let mut selector_ctx = ctx.selector_ctx.clone(); + let TableIdent { + catalog, + schema, + table, + .. + } = &failed_region.table_ident; + selector_ctx.catalog = Some(catalog.to_string()); + selector_ctx.schema = Some(schema.to_string()); + selector_ctx.table = Some(table.to_string()); + let cluster_id = failed_region.cluster_id; let candidates = ctx .selector - .select(cluster_id, &ctx.selector_ctx) + .select(cluster_id, &selector_ctx) .await? .iter() .filter_map(|p| { diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index e92f72a168..6c28de3efa 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -15,6 +15,7 @@ use api::v1::meta::{TableName, TableRouteValue}; use async_trait::async_trait; use catalog::helper::TableGlobalKey; +use common_meta::key::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::TableRoute; use common_meta::RegionIdent; @@ -28,7 +29,6 @@ use crate::error::{ CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableNotFoundSnafu, TableRouteConversionSnafu, }; -use crate::keys::TableRouteKey; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; use crate::table_routes; @@ -221,6 +221,7 @@ impl State for UpdateRegionMetadata { mod tests { use api::v1::meta::TableRouteValue; use catalog::helper::TableGlobalValue; + use common_meta::key::TableRouteKey; use super::super::tests::{TestingEnv, TestingEnvBuilder}; use super::{State, *}; diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 13c1c036b8..74c0116521 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -13,10 +13,8 @@ // limitations under the License. use api::v1::meta::Peer; -use common_time::util as time_util; use crate::error::Result; -use crate::keys::{LeaseKey, LeaseValue}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::{Namespace, Selector}; @@ -30,10 +28,12 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { // filter out the nodes out lease - let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000 - }; - let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?; + let mut lease_kvs: Vec<_> = + lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs) + .await? + .into_iter() + .collect(); + // TODO(jiachun): At the moment we are just pushing the latest to the forefront, // and it is better to use load-based strategies in the future. lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis)); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 6a12f4b1a1..6417a4590a 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -12,15 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::meta::Peer; use common_telemetry::warn; -use common_time::util as time_util; use crate::cluster::MetaPeerClient; use crate::error::Result; -use crate::keys::{LeaseKey, LeaseValue, StatKey}; +use crate::handler::node_stat::RegionStat; +use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::{Namespace, Selector}; @@ -38,31 +36,27 @@ impl Selector for LoadBasedSelector { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { // get alive datanodes - let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000 - }; - let lease_kvs: HashMap = - lease::alive_datanodes(ns, &ctx.kv_store, lease_filter) - .await? - .into_iter() - .collect(); - + let lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs).await?; if lease_kvs.is_empty() { return Ok(vec![]); } - // get stats of alive datanodes - let stat_keys: Vec = lease_kvs - .keys() - .map(|k| StatKey { - cluster_id: k.cluster_id, - node_id: k.node_id, - }) - .collect(); + let stat_keys: Vec = lease_kvs.keys().map(|k| k.into()).collect(); let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs .into_iter() + // The regions of a table need to be distributed on different datanode. + .filter(|(lease_k, _)| { + if let Some(stat_val) = stat_kvs.get(&lease_k.into()) { + if let (Some(catalog), Some(schema), Some(table)) = + (&ctx.catalog, &ctx.schema, &ctx.table) + { + return contains_table(stat_val, catalog, schema, table) != Some(true); + } + } + true + }) .map(|(lease_k, lease_v)| { let stat_key: StatKey = (&lease_k).into(); @@ -93,3 +87,127 @@ impl Selector for LoadBasedSelector { .collect()) } } + +// Determine whether there is the table in datanode according to the heartbeats. +// +// Result: +// None indicates no heartbeats in stat_val; +// Some(true) indicates table exists in the datanode; +// Some(false) indicates that table not exists in datanode. +fn contains_table( + stat_val: &StatValue, + catalog_name: &str, + schema_name: &str, + table_name: &str, +) -> Option { + let may_latest = stat_val.stats.last(); + + if let Some(latest) = may_latest { + for RegionStat { + catalog, + schema, + table, + .. + } in latest.region_stats.iter() + { + if catalog_name == catalog && schema_name == schema && table_name == table { + return Some(true); + } + } + } else { + return None; + } + + Some(false) +} + +#[cfg(test)] +mod tests { + use crate::handler::node_stat::{RegionStat, Stat}; + use crate::keys::StatValue; + use crate::selector::load_based::contains_table; + + #[test] + fn test_contains_table_from_stat_val() { + let empty = StatValue { stats: vec![] }; + assert!(contains_table(&empty, "greptime_4", "public_4", "demo_5").is_none()); + + let stat_val = StatValue { + stats: vec![ + Stat { + region_stats: vec![ + RegionStat { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_3".to_string(), + schema: "public_3".to_string(), + table: "demo_3".to_string(), + ..Default::default() + }, + ], + ..Default::default() + }, + Stat { + region_stats: vec![ + RegionStat { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_3".to_string(), + schema: "public_3".to_string(), + table: "demo_3".to_string(), + ..Default::default() + }, + ], + ..Default::default() + }, + Stat { + region_stats: vec![ + RegionStat { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, + RegionStat { + catalog: "greptime_4".to_string(), + schema: "public_4".to_string(), + table: "demo_4".to_string(), + ..Default::default() + }, + ], + ..Default::default() + }, + ], + }; + assert!(contains_table(&stat_val, "greptime_1", "public_1", "demo_1").unwrap()); + assert!(contains_table(&stat_val, "greptime_2", "public_2", "demo_2").unwrap()); + assert!(!contains_table(&stat_val, "greptime_3", "public_3", "demo_3").unwrap()); + assert!(contains_table(&stat_val, "greptime_4", "public_4", "demo_4").unwrap()); + } +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 6ce8294345..d6f4bb2f63 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -20,6 +20,7 @@ use api::v1::meta::{ RouteResponse, Table, TableRoute, TableRouteValue, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; +use common_meta::key::TableRouteKey; use common_telemetry::{timer, warn}; use snafu::{OptionExt, ResultExt}; use table::metadata::RawTableInfo; @@ -27,7 +28,6 @@ use tonic::{Request, Response}; use crate::error; use crate::error::Result; -use crate::keys::TableRouteKey; use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef}; use crate::metrics::METRIC_META_ROUTE_REQUEST; use crate::sequence::SequenceRef; @@ -60,6 +60,7 @@ impl router_server::Router for MetaSrv { kv_store: self.kv_store(), catalog: Some(table_name.catalog_name), schema: Some(table_name.schema_name), + table: Some(table_name.table_name), }; let selector = self.selector(); @@ -131,16 +132,18 @@ async fn handle_create( let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); let mut peers = selector.select(cluster_id, &ctx).await?; - if peers.is_empty() { - warn!("Create table failed due to no active datanodes, table: {table_name:?}"); + + if peers.len() < partitions.len() { + warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len()); return Ok(RouteResponse { header: Some(ResponseHeader::failed( cluster_id, - Error::not_enough_active_datanodes(0), + Error::not_enough_available_datanodes(partitions.len(), peers.len()), )), ..Default::default() }); } + // We don't need to keep all peers, just truncate it to the number of partitions. // If the peers are not enough, some peers will be used for multiple partitions. peers.truncate(partitions.len()); @@ -304,7 +307,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result TableRouteKey<'_> { + TableRouteKey { + table_id, + catalog_name: &t.catalog_name, + schema_name: &t.schema_name, + table_name: &t.table_name, + } +} + async fn remove_table_route_value( kv_store: &KvStoreRef, key: &TableRouteKey<'_>, diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 12fe8fa7b5..39b5ed28a3 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -14,12 +14,12 @@ use api::v1::meta::{PutRequest, TableRouteValue}; use catalog::helper::{TableGlobalKey, TableGlobalValue}; +use common_meta::key::TableRouteKey; use snafu::{OptionExt, ResultExt}; use crate::error::{ DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableRouteNotFoundSnafu, }; -use crate::keys::TableRouteKey; use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::KvStoreRef; diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 06737e3528..67b7900b3c 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -42,6 +42,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { kv_store, catalog: None, schema: None, + table: None, }; Arc::new(RegionFailoverManager::new( diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index a84e4c083e..751beb299f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -159,7 +159,7 @@ impl GreptimeDbClusterBuilder { async fn wait_datanodes_alive(&self, expected_datanodes: u32) { let kv_store = self.kv_store(); for _ in 0..10 { - let alive_datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true) + let alive_datanodes = meta_srv::lease::filter_datanodes(1000, &kv_store, |_, _| true) .await .unwrap() .len() as u32; diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index e40157a4dc..43180db2fb 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -84,7 +84,7 @@ pub async fn test_region_failover(store_type: StorageType) { let (store_config, _guard) = get_test_store_config(&store_type, cluster_name); let cluster = GreptimeDbClusterBuilder::new(cluster_name) - .with_datanodes(2) + .with_datanodes(4) .with_store_config(store_config) .build() .await; @@ -125,7 +125,7 @@ pub async fn test_region_failover(store_type: StorageType) { run_region_failover_procedure(&cluster, failed_region.clone()).await; - let mut distribution = find_region_distribution(&cluster).await; + let distribution = find_region_distribution(&cluster).await; info!("Find region distribution again: {distribution:?}"); // Waits for invalidating table cache @@ -145,16 +145,14 @@ pub async fn test_region_failover(store_type: StorageType) { assert_writes(&frontend).await; - assert!(!distribution - .remove(&failed_region.datanode_id) - .unwrap() - .contains(&failed_region.region_number)); - // Since there are only two datanodes, the other datanode is the candidate. - assert!(distribution - .values() - .next() - .unwrap() - .contains(&failed_region.region_number)); + assert!(!distribution.contains_key(&failed_region.datanode_id)); + + let mut success = false; + let values = distribution.values(); + for val in values { + success = success || val.contains(&failed_region.region_number); + } + assert!(success) } fn get_table_cache(instance: &Arc, key: &str) -> Option> { @@ -303,6 +301,7 @@ async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_regio kv_store: meta_srv.kv_store(), catalog: None, schema: None, + table: None, }, dist_lock: meta_srv.lock().clone(), }, diff --git a/tests/cases/distributed/alter/rename_table.result b/tests/cases/distributed/alter/rename_table.result index 6ca723e316..dbc77584e6 100644 --- a/tests/cases/distributed/alter/rename_table.result +++ b/tests/cases/distributed/alter/rename_table.result @@ -36,5 +36,5 @@ Error: 4001(TableNotFound), Table not found: greptime.public.t -- SQLNESS REPLACE details.* DROP TABLE new_table; -Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025", +Affected Rows: 1 diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index d213ab8a56..0603c7485c 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -1,8 +1,8 @@ -CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX); +CREATE TABLE test_alt_table(i INTEGER, j BIGINT TIME INDEX); Affected Rows: 0 -DESC TABLE t; +DESC TABLE test_alt_table; +-------+-------+------+---------+---------------+ | Field | Type | Null | Default | Semantic Type | @@ -11,11 +11,11 @@ DESC TABLE t; | j | Int64 | NO | | TIME INDEX | +-------+-------+------+---------+---------------+ -ALTER TABLE t ADD COLUMN k INTEGER; +ALTER TABLE test_alt_table ADD COLUMN k INTEGER; Affected Rows: 0 -DESC TABLE t; +DESC TABLE test_alt_table; +-------+-------+------+---------+---------------+ | Field | Type | Null | Default | Semantic Type | @@ -26,11 +26,11 @@ DESC TABLE t; +-------+-------+------+---------+---------------+ -- SQLNESS ARG restart=true -ALTER TABLE t ADD COLUMN m INTEGER; +ALTER TABLE test_alt_table ADD COLUMN m INTEGER; Affected Rows: 0 -DESC TABLE t; +DESC TABLE test_alt_table; +-------+-------+------+---------+---------------+ | Field | Type | Null | Default | Semantic Type | @@ -41,7 +41,7 @@ DESC TABLE t; | m | Int32 | YES | | FIELD | +-------+-------+------+---------+---------------+ -DROP TABLE t; +DROP TABLE test_alt_table; Affected Rows: 1 diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index 46165625dc..586e1b1b3b 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -1,14 +1,14 @@ -CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX); +CREATE TABLE test_alt_table(i INTEGER, j BIGINT TIME INDEX); -DESC TABLE t; +DESC TABLE test_alt_table; -ALTER TABLE t ADD COLUMN k INTEGER; +ALTER TABLE test_alt_table ADD COLUMN k INTEGER; -DESC TABLE t; +DESC TABLE test_alt_table; -- SQLNESS ARG restart=true -ALTER TABLE t ADD COLUMN m INTEGER; +ALTER TABLE test_alt_table ADD COLUMN m INTEGER; -DESC TABLE t; +DESC TABLE test_alt_table; -DROP TABLE t; +DROP TABLE test_alt_table; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index bc79c2ac19..89e9e33f9b 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -18,6 +18,7 @@ use std::path::{Path, PathBuf}; use std::process::Stdio; // use tokio::process::{Child, Command}; use std::process::{Child, Command}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -35,7 +36,6 @@ use tokio::sync::Mutex as TokioMutex; use crate::util; -const DATANODE_ADDR: &str = "127.0.0.1:4100"; const METASRV_ADDR: &str = "127.0.0.1:3002"; const SERVER_ADDR: &str = "127.0.0.1:4001"; const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log"; @@ -64,6 +64,8 @@ impl EnvController for Env { } } +static DATANODE_ID: AtomicU32 = AtomicU32::new(1); + #[allow(clippy::print_stdout)] impl Env { pub async fn start_standalone() -> GreptimeDB { @@ -79,7 +81,7 @@ impl Env { let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_process: Arc::new(Mutex::new(server_process)), + server_processes: Arc::new(Mutex::new(vec![server_process])), metasrv_process: None, frontend_process: None, client: TokioMutex::new(db), @@ -95,14 +97,18 @@ impl Env { // start a distributed GreptimeDB let meta_server = Env::start_server("metasrv", &db_ctx, true).await; - let datanode = Env::start_server("datanode", &db_ctx, true).await; + + let datanode_1 = Env::start_server("datanode", &db_ctx, true).await; + let datanode_2 = Env::start_server("datanode", &db_ctx, true).await; + let datanode_3 = Env::start_server("datanode", &db_ctx, true).await; + let frontend = Env::start_server("frontend", &db_ctx, true).await; let client = Client::with_urls(vec![SERVER_ADDR]); let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_process: Arc::new(Mutex::new(datanode)), + server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])), metasrv_process: Some(meta_server), frontend_process: Some(frontend), client: TokioMutex::new(db), @@ -135,27 +141,41 @@ impl Env { .open(log_file_name) .unwrap_or_else(|_| panic!("Cannot open log file at {log_file_name}")); - let mut args = vec![ - "--log-level=debug".to_string(), - subcommand.to_string(), - "start".to_string(), - ]; - match subcommand { - "datanode" | "standalone" => { - args.push("-c".to_string()); - args.push(Self::generate_config_file(subcommand, db_ctx)); - args.push("--http-addr=0.0.0.0:5001".to_string()); + let (args, check_ip_addr) = match subcommand { + "datanode" => Self::datanode_start_args(db_ctx), + "standalone" => { + let args = vec![ + "--log-level=debug".to_string(), + subcommand.to_string(), + "start".to_string(), + "-c".to_string(), + Self::generate_config_file(subcommand, db_ctx), + "--http-addr=0.0.0.0:5001".to_string(), + ]; + (args, SERVER_ADDR.to_string()) } "frontend" => { - args.push("--metasrv-addr=0.0.0.0:3002".to_string()); - args.push("--http-addr=0.0.0.0:5003".to_string()); + let args = vec![ + "--log-level=debug".to_string(), + subcommand.to_string(), + "start".to_string(), + "--metasrv-addr=0.0.0.0:3002".to_string(), + "--http-addr=0.0.0.0:5003".to_string(), + ]; + (args, SERVER_ADDR.to_string()) } "metasrv" => { - args.push("--use-memory-store".to_string()); - args.push("--http-addr=0.0.0.0:5002".to_string()); + let args = vec![ + "--log-level=debug".to_string(), + subcommand.to_string(), + "start".to_string(), + "--use-memory-store".to_string(), + "--http-addr=0.0.0.0:5001".to_string(), + ]; + (args, METASRV_ADDR.to_string()) } _ => panic!("Unexpected subcommand: {subcommand}"), - } + }; let mut process = Command::new("./greptime") .current_dir(util::get_binary_dir("debug")) @@ -164,15 +184,7 @@ impl Env { .spawn() .unwrap_or_else(|_| panic!("Failed to start the DB with subcommand {subcommand}")); - // check connection - let ip_addr = match subcommand { - "datanode" => DATANODE_ADDR, - "frontend" => SERVER_ADDR, - "metasrv" => METASRV_ADDR, - "standalone" => SERVER_ADDR, - _ => panic!("Unexpected subcommand: {subcommand}"), - }; - if !util::check_port(ip_addr.parse().unwrap(), Duration::from_secs(10)).await { + if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await { Env::stop_server(&mut process); panic!("{subcommand} doesn't up in 10 seconds, quit.") } @@ -180,23 +192,56 @@ impl Env { process } + fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec, String) { + let id = DATANODE_ID.fetch_add(1, Ordering::Relaxed); + + let subcommand = "datanode"; + let mut args = vec![ + "--log-level=debug".to_string(), + subcommand.to_string(), + "start".to_string(), + ]; + args.push(format!("--rpc-addr=0.0.0.0:410{id}")); + args.push(format!("--mysql-addr=0.0.0.0:420{id}")); + args.push(format!("--http-addr=0.0.0.0:430{id}")); + args.push(format!( + "--data-home=/tmp/greptimedb_datanode_{}", + db_ctx.time + )); + args.push(format!( + "--wal-dir=/tmp/greptimedb_datanode_{}_{id}/wal", + db_ctx.time + )); + args.push(format!("--node-id={id}")); + args.push("--metasrv-addr=0.0.0.0:3002".to_string()); + (args, format!("0.0.0.0:410{id}")) + } + /// stop and restart the server process async fn restart_server(db: &GreptimeDB) { { - let mut server_process = db.server_process.lock().unwrap(); - Env::stop_server(&mut server_process); + let mut server_processes = db.server_processes.lock().unwrap(); + for server_process in server_processes.iter_mut() { + Env::stop_server(server_process); + } } // check if the server is distributed or standalone - let subcommand = if db.is_standalone { - "standalone" + let new_server_processes = if db.is_standalone { + let new_server_process = Env::start_server("standalone", &db.ctx, false).await; + vec![new_server_process] } else { - "datanode" + let mut processes = vec![]; + DATANODE_ID.store(1, Ordering::Relaxed); + for _ in 0..3 { + let new_server_process = Env::start_server("datanode", &db.ctx, false).await; + processes.push(new_server_process); + } + processes }; - let new_server_process = Env::start_server(subcommand, &db.ctx, false).await; - let mut server_process = db.server_process.lock().unwrap(); - *server_process = new_server_process; + let mut server_processes = db.server_processes.lock().unwrap(); + *server_processes = new_server_processes; } /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` @@ -248,7 +293,7 @@ impl Env { } pub struct GreptimeDB { - server_process: Arc>, + server_processes: Arc>>, metasrv_process: Option, frontend_process: Option, client: TokioMutex, @@ -281,8 +326,10 @@ impl Database for GreptimeDB { impl GreptimeDB { #![allow(clippy::print_stdout)] fn stop(&mut self) { - let mut server = self.server_process.lock().unwrap(); - Env::stop_server(&mut server); + let mut servers = self.server_processes.lock().unwrap(); + for server in servers.iter_mut() { + Env::stop_server(server); + } if let Some(mut metasrv) = self.metasrv_process.take() { Env::stop_server(&mut metasrv); }