chore: reject table creation when partitions exceeds peer number (#1654)

* chore: table creation is rejected, when partition_num exceeds peer_num

* chore: modify no_active_datanode error msg

* fix: ut

* fix sqlness test and add limit for select peer in region_failover

* upgrade greptime-proto

* self cr

* fix: cargo sqlness

* chore: add table info in select ctx for failover

* fix sqlness
This commit is contained in:
fys
2023-06-01 17:05:17 +08:00
committed by GitHub
parent e7a410573b
commit 86adac1532
28 changed files with 546 additions and 175 deletions

2
Cargo.lock generated
View File

@@ -4050,7 +4050,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=44c5adf34938d0650c18a14db2a374bdee471ae7#44c5adf34938d0650c18a14db2a374bdee471ae7"
dependencies = [
"prost",
"serde",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "44c5adf34938d0650c18a14db2a374bdee471ae7" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -72,6 +72,9 @@ pub trait KvBackend: Send + Sync {
return Ok(None);
}
/// MoveValue atomically renames the key to the given updated key.
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error>;
fn as_any(&self) -> &dyn Any;
}
@@ -125,6 +128,10 @@ mod tests {
unimplemented!()
}
async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -18,7 +18,9 @@ use std::sync::Arc;
use std::time::Duration;
use async_stream::stream;
use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use common_meta::rpc::store::{
CompareAndPutRequest, DeleteRangeRequest, MoveValueRequest, PutRequest, RangeRequest,
};
use common_telemetry::{info, timer};
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
@@ -101,6 +103,17 @@ impl KvBackend for CachedMetaKvBackend {
ret
}
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> {
let ret = self.kv_backend.move_value(from_key, to_key).await;
if ret.is_ok() {
self.invalidate_key(from_key).await;
self.invalidate_key(to_key).await;
}
ret
}
fn as_any(&self) -> &dyn Any {
self
}
@@ -225,6 +238,12 @@ impl KvBackend for MetaKvBackend {
}
}
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> {
let req = MoveValueRequest::new(from_key, to_key);
self.client.move_value(req).await.context(MetaSrvSnafu)?;
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -153,6 +153,10 @@ impl KvBackend for MockKvBackend {
Ok(())
}
async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<(), Error> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod table_route;
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
pub const REMOVED_PREFIX: &str = "__removed";
pub fn to_removed_key(key: &str) -> String {
format!("{REMOVED_PREFIX}-{key}")
}
#[cfg(test)]
mod tests {
use crate::key::to_removed_key;
#[test]
fn test_to_removed_key() {
let key = "test_key";
let removed = "__removed-test_key";
assert_eq!(removed, to_removed_key(key));
}
}

View File

@@ -0,0 +1,97 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::TableName;
use crate::key::to_removed_key;
pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
pub struct TableRouteKey<'a> {
pub table_id: u64,
pub catalog_name: &'a str,
pub schema_name: &'a str,
pub table_name: &'a str,
}
impl<'a> TableRouteKey<'a> {
pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self {
Self {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
pub fn prefix(&self) -> String {
format!(
"{}-{}-{}-{}",
TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name
)
}
pub fn key(&self) -> String {
format!("{}-{}", self.prefix(), self.table_id)
}
pub fn removed_key(&self) -> String {
to_removed_key(&self.key())
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::TableName;
use super::TableRouteKey;
#[test]
fn test_table_route_key() {
let key = TableRouteKey {
table_id: 123,
catalog_name: "greptime",
schema_name: "public",
table_name: "demo",
};
let prefix = key.prefix();
assert_eq!("__meta_table_route-greptime-public-demo", prefix);
let key_string = key.key();
assert_eq!("__meta_table_route-greptime-public-demo-123", key_string);
let removed = key.removed_key();
assert_eq!(
"__removed-__meta_table_route-greptime-public-demo-123",
removed
);
}
#[test]
fn test_with_table_name() {
let table_name = TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
};
let key = TableRouteKey::with_table_name(123, &table_name);
assert_eq!(123, key.table_id);
assert_eq!("greptime", key.catalog_name);
assert_eq!("public", key.schema_name);
assert_eq!("demo", key.table_name);
}
}

View File

@@ -15,6 +15,7 @@
pub mod error;
pub mod heartbeat;
pub mod instruction;
pub mod key;
pub mod peer;
pub mod rpc;
pub mod table_name;

View File

@@ -23,6 +23,7 @@ use catalog::helper::{TableGlobalKey, TableGlobalValue};
use catalog::remote::KvBackendRef;
use client::Database;
use common_error::prelude::BoxedError;
use common_meta::key::TableRouteKey;
use common_meta::table_name::TableName;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
@@ -339,7 +340,44 @@ impl DistTable {
.context(error::CatalogSnafu)
}
async fn move_table_route_value(
&self,
catalog_name: &str,
schema_name: &str,
table_id: u32,
old_table_name: &str,
new_table_name: &str,
) -> Result<()> {
let old_key = TableRouteKey {
table_id: table_id.into(),
catalog_name,
schema_name,
table_name: old_table_name,
}
.key();
let new_key = TableRouteKey {
table_id: table_id.into(),
catalog_name,
schema_name,
table_name: new_table_name,
}
.key();
self.backend
.move_value(old_key.as_bytes(), new_key.as_bytes())
.await
.context(error::CatalogSnafu)
}
async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> {
let AlterTableRequest {
catalog_name,
schema_name,
table_name,
alter_kind,
} = request;
let alter_expr = context
.get::<AlterExpr>()
.context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?;
@@ -347,7 +385,6 @@ impl DistTable {
self.alter_by_expr(alter_expr).await?;
let table_info = self.table_info();
let table_name = &table_info.name;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
@@ -362,27 +399,33 @@ impl DistTable {
new_info.meta = new_meta;
let key = TableGlobalKey {
catalog_name: alter_expr.catalog_name.clone(),
schema_name: alter_expr.schema_name.clone(),
table_name: alter_expr.table_name.clone(),
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
};
let mut value =
self.table_global_value(&key)
.await?
.context(error::TableNotFoundSnafu {
table_name: alter_expr.table_name.clone(),
})?;
let mut value = self
.table_global_value(&key)
.await?
.context(error::TableNotFoundSnafu { table_name })?;
value.table_info = new_info.into();
if let AlterKind::RenameTable { new_table_name } = &request.alter_kind {
if let AlterKind::RenameTable { new_table_name } = alter_kind {
let new_key = TableGlobalKey {
catalog_name: alter_expr.catalog_name.clone(),
schema_name: alter_expr.schema_name.clone(),
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: new_table_name.clone(),
};
self.set_table_global_value(new_key, value).await?;
self.delete_table_global_value(key).await
self.delete_table_global_value(key).await?;
self.move_table_route_value(
catalog_name,
schema_name,
table_info.ident.table_id,
table_name,
new_table_name,
)
.await
} else {
self.set_table_global_value(key, value).await
}
@@ -616,6 +659,10 @@ mod test {
unimplemented!()
}
async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<()> {
unimplemented!()
}
async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> {
unimplemented!()
}

View File

@@ -44,7 +44,7 @@ pub struct Stat {
pub region_stats: Vec<RegionStat>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct RegionStat {
pub id: u64,
pub catalog: String,

View File

@@ -14,8 +14,7 @@
use std::str::FromStr;
use api::v1::meta::TableName;
use catalog::helper::TableGlobalKey;
use common_meta::key::TABLE_ROUTE_PREFIX;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -28,7 +27,6 @@ use crate::handler::node_stat::Stat;
pub(crate) const REMOVED_PREFIX: &str = "__removed";
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
pub const DN_STAT_PREFIX: &str = "__meta_dnstat";
@@ -129,51 +127,6 @@ impl TryFrom<LeaseValue> for Vec<u8> {
}
}
pub struct TableRouteKey<'a> {
pub table_id: u64,
pub catalog_name: &'a str,
pub schema_name: &'a str,
pub table_name: &'a str,
}
impl<'a> TableRouteKey<'a> {
pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self {
Self {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
pub fn with_table_global_key(table_id: u64, t: &'a TableGlobalKey) -> Self {
Self {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
#[inline]
pub fn prefix(&self) -> String {
format!(
"{}-{}-{}-{}",
TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name
)
}
#[inline]
pub fn key(&self) -> String {
format!("{}-{}", self.prefix(), self.table_id)
}
#[inline]
pub fn removed_key(&self) -> String {
to_removed_key(&self.key())
}
}
pub(crate) fn to_removed_key(key: &str) -> String {
format!("{REMOVED_PREFIX}-{key}")
}

View File

@@ -12,18 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::RangeRequest;
use common_time::util as time_util;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
use crate::service::store::kv::KvStoreRef;
use crate::util;
pub async fn alive_datanodes<P>(
pub async fn alive_datanodes(
cluster_id: u64,
kv_store: &KvStoreRef,
lease_secs: i64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000
};
filter_datanodes(cluster_id, kv_store, lease_filter).await
}
pub async fn filter_datanodes<P>(
cluster_id: u64,
kv_store: &KvStoreRef,
predicate: P,
) -> Result<Vec<(LeaseKey, LeaseValue)>>
) -> Result<HashMap<LeaseKey, LeaseValue>>
where
P: Fn(&LeaseKey, &LeaseValue) -> bool,
{
@@ -38,14 +53,14 @@ where
let res = kv_store.range(req).await?;
let kvs = res.kvs;
let mut lease_kvs = vec![];
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
if !predicate(&lease_key, &lease_value) {
continue;
}
lease_kvs.push((lease_key, lease_value));
lease_kvs.insert(lease_key, lease_value);
}
Ok(lease_kvs)

View File

@@ -102,6 +102,7 @@ pub struct SelectorContext {
pub kv_store: KvStoreRef,
pub catalog: Option<String>,
pub schema: Option<String>,
pub table: Option<String>,
}
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;

View File

@@ -156,6 +156,7 @@ impl MetaSrvBuilder {
kv_store: kv_store.clone(),
catalog: None,
schema: None,
table: None,
},
lock.clone(),
));

View File

@@ -462,8 +462,9 @@ mod tests {
datanode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_store,
catalog: None,
schema: None,
catalog: Some(DEFAULT_CATALOG_NAME.to_string()),
schema: Some(DEFAULT_SCHEMA_NAME.to_string()),
table: Some(table.to_string()),
};
TestingEnv {

View File

@@ -14,6 +14,7 @@
use async_trait::async_trait;
use common_error::prelude::{ErrorExt, StatusCode};
use common_meta::instruction::TableIdent;
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::info;
@@ -45,10 +46,21 @@ impl RegionFailoverStart {
return Ok(candidate);
}
let mut selector_ctx = ctx.selector_ctx.clone();
let TableIdent {
catalog,
schema,
table,
..
} = &failed_region.table_ident;
selector_ctx.catalog = Some(catalog.to_string());
selector_ctx.schema = Some(schema.to_string());
selector_ctx.table = Some(table.to_string());
let cluster_id = failed_region.cluster_id;
let candidates = ctx
.selector
.select(cluster_id, &ctx.selector_ctx)
.select(cluster_id, &selector_ctx)
.await?
.iter()
.filter_map(|p| {

View File

@@ -15,6 +15,7 @@
use api::v1::meta::{TableName, TableRouteValue};
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_meta::RegionIdent;
@@ -28,7 +29,6 @@ use crate::error::{
CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableNotFoundSnafu,
TableRouteConversionSnafu,
};
use crate::keys::TableRouteKey;
use crate::lock::keys::table_metadata_lock_key;
use crate::lock::Opts;
use crate::table_routes;
@@ -221,6 +221,7 @@ impl State for UpdateRegionMetadata {
mod tests {
use api::v1::meta::TableRouteValue;
use catalog::helper::TableGlobalValue;
use common_meta::key::TableRouteKey;
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::{State, *};

View File

@@ -13,10 +13,8 @@
// limitations under the License.
use api::v1::meta::Peer;
use common_time::util as time_util;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector};
@@ -30,10 +28,12 @@ impl Selector for LeaseBasedSelector {
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
// filter out the nodes out lease
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000
};
let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?;
let mut lease_kvs: Vec<_> =
lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs)
.await?
.into_iter()
.collect();
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,
// and it is better to use load-based strategies in the future.
lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis));

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::Peer;
use common_telemetry::warn;
use common_time::util as time_util;
use crate::cluster::MetaPeerClient;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, StatKey};
use crate::handler::node_stat::RegionStat;
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector};
@@ -38,31 +36,27 @@ impl Selector for LoadBasedSelector {
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
// get alive datanodes
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000
};
let lease_kvs: HashMap<LeaseKey, LeaseValue> =
lease::alive_datanodes(ns, &ctx.kv_store, lease_filter)
.await?
.into_iter()
.collect();
let lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs).await?;
if lease_kvs.is_empty() {
return Ok(vec![]);
}
// get stats of alive datanodes
let stat_keys: Vec<StatKey> = lease_kvs
.keys()
.map(|k| StatKey {
cluster_id: k.cluster_id,
node_id: k.node_id,
})
.collect();
let stat_keys: Vec<StatKey> = lease_kvs.keys().map(|k| k.into()).collect();
let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs
.into_iter()
// The regions of a table need to be distributed on different datanode.
.filter(|(lease_k, _)| {
if let Some(stat_val) = stat_kvs.get(&lease_k.into()) {
if let (Some(catalog), Some(schema), Some(table)) =
(&ctx.catalog, &ctx.schema, &ctx.table)
{
return contains_table(stat_val, catalog, schema, table) != Some(true);
}
}
true
})
.map(|(lease_k, lease_v)| {
let stat_key: StatKey = (&lease_k).into();
@@ -93,3 +87,127 @@ impl Selector for LoadBasedSelector {
.collect())
}
}
// Determine whether there is the table in datanode according to the heartbeats.
//
// Result:
// None indicates no heartbeats in stat_val;
// Some(true) indicates table exists in the datanode;
// Some(false) indicates that table not exists in datanode.
fn contains_table(
stat_val: &StatValue,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Option<bool> {
let may_latest = stat_val.stats.last();
if let Some(latest) = may_latest {
for RegionStat {
catalog,
schema,
table,
..
} in latest.region_stats.iter()
{
if catalog_name == catalog && schema_name == schema && table_name == table {
return Some(true);
}
}
} else {
return None;
}
Some(false)
}
#[cfg(test)]
mod tests {
use crate::handler::node_stat::{RegionStat, Stat};
use crate::keys::StatValue;
use crate::selector::load_based::contains_table;
#[test]
fn test_contains_table_from_stat_val() {
let empty = StatValue { stats: vec![] };
assert!(contains_table(&empty, "greptime_4", "public_4", "demo_5").is_none());
let stat_val = StatValue {
stats: vec![
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
..Default::default()
},
],
..Default::default()
},
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_3".to_string(),
schema: "public_3".to_string(),
table: "demo_3".to_string(),
..Default::default()
},
],
..Default::default()
},
Stat {
region_stats: vec![
RegionStat {
catalog: "greptime_1".to_string(),
schema: "public_1".to_string(),
table: "demo_1".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_2".to_string(),
schema: "public_2".to_string(),
table: "demo_2".to_string(),
..Default::default()
},
RegionStat {
catalog: "greptime_4".to_string(),
schema: "public_4".to_string(),
table: "demo_4".to_string(),
..Default::default()
},
],
..Default::default()
},
],
};
assert!(contains_table(&stat_val, "greptime_1", "public_1", "demo_1").unwrap());
assert!(contains_table(&stat_val, "greptime_2", "public_2", "demo_2").unwrap());
assert!(!contains_table(&stat_val, "greptime_3", "public_3", "demo_3").unwrap());
assert!(contains_table(&stat_val, "greptime_4", "public_4", "demo_4").unwrap());
}
}

View File

@@ -20,6 +20,7 @@ use api::v1::meta::{
RouteResponse, Table, TableRoute, TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::TableRouteKey;
use common_telemetry::{timer, warn};
use snafu::{OptionExt, ResultExt};
use table::metadata::RawTableInfo;
@@ -27,7 +28,6 @@ use tonic::{Request, Response};
use crate::error;
use crate::error::Result;
use crate::keys::TableRouteKey;
use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::sequence::SequenceRef;
@@ -60,6 +60,7 @@ impl router_server::Router for MetaSrv {
kv_store: self.kv_store(),
catalog: Some(table_name.catalog_name),
schema: Some(table_name.schema_name),
table: Some(table_name.table_name),
};
let selector = self.selector();
@@ -131,16 +132,18 @@ async fn handle_create(
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let mut peers = selector.select(cluster_id, &ctx).await?;
if peers.is_empty() {
warn!("Create table failed due to no active datanodes, table: {table_name:?}");
if peers.len() < partitions.len() {
warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len());
return Ok(RouteResponse {
header: Some(ResponseHeader::failed(
cluster_id,
Error::not_enough_active_datanodes(0),
Error::not_enough_available_datanodes(partitions.len(), peers.len()),
)),
..Default::default()
});
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
@@ -304,7 +307,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result<RouteResponse
let _ = remove_table_global_value(&ctx.kv_store, &tgk).await?;
let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk);
let trk = table_route_key(tgv.table_id() as u64, &tgk);
let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?;
let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?;
@@ -365,7 +368,7 @@ async fn fetch_tables(
}
let tgv = tgv.unwrap();
let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk);
let trk = table_route_key(tgv.table_id() as u64, &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tgv, trv));
@@ -374,6 +377,15 @@ async fn fetch_tables(
Ok(tables)
}
fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
TableRouteKey {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
async fn remove_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,

View File

@@ -14,12 +14,12 @@
use api::v1::meta::{PutRequest, TableRouteValue};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::TableRouteKey;
use snafu::{OptionExt, ResultExt};
use crate::error::{
DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableRouteNotFoundSnafu,
};
use crate::keys::TableRouteKey;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;

View File

@@ -42,6 +42,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
kv_store,
catalog: None,
schema: None,
table: None,
};
Arc::new(RegionFailoverManager::new(

View File

@@ -159,7 +159,7 @@ impl GreptimeDbClusterBuilder {
async fn wait_datanodes_alive(&self, expected_datanodes: u32) {
let kv_store = self.kv_store();
for _ in 0..10 {
let alive_datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true)
let alive_datanodes = meta_srv::lease::filter_datanodes(1000, &kv_store, |_, _| true)
.await
.unwrap()
.len() as u32;

View File

@@ -84,7 +84,7 @@ pub async fn test_region_failover(store_type: StorageType) {
let (store_config, _guard) = get_test_store_config(&store_type, cluster_name);
let cluster = GreptimeDbClusterBuilder::new(cluster_name)
.with_datanodes(2)
.with_datanodes(4)
.with_store_config(store_config)
.build()
.await;
@@ -125,7 +125,7 @@ pub async fn test_region_failover(store_type: StorageType) {
run_region_failover_procedure(&cluster, failed_region.clone()).await;
let mut distribution = find_region_distribution(&cluster).await;
let distribution = find_region_distribution(&cluster).await;
info!("Find region distribution again: {distribution:?}");
// Waits for invalidating table cache
@@ -145,16 +145,14 @@ pub async fn test_region_failover(store_type: StorageType) {
assert_writes(&frontend).await;
assert!(!distribution
.remove(&failed_region.datanode_id)
.unwrap()
.contains(&failed_region.region_number));
// Since there are only two datanodes, the other datanode is the candidate.
assert!(distribution
.values()
.next()
.unwrap()
.contains(&failed_region.region_number));
assert!(!distribution.contains_key(&failed_region.datanode_id));
let mut success = false;
let values = distribution.values();
for val in values {
success = success || val.contains(&failed_region.region_number);
}
assert!(success)
}
fn get_table_cache(instance: &Arc<Instance>, key: &str) -> Option<Option<Kv>> {
@@ -303,6 +301,7 @@ async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_regio
kv_store: meta_srv.kv_store(),
catalog: None,
schema: None,
table: None,
},
dist_lock: meta_srv.lock().clone(),
},

View File

@@ -36,5 +36,5 @@ Error: 4001(TableNotFound), Table not found: greptime.public.t
-- SQLNESS REPLACE details.*
DROP TABLE new_table;
Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025",
Affected Rows: 1

View File

@@ -1,8 +1,8 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
CREATE TABLE test_alt_table(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
DESC TABLE t;
DESC TABLE test_alt_table;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
@@ -11,11 +11,11 @@ DESC TABLE t;
| j | Int64 | NO | | TIME INDEX |
+-------+-------+------+---------+---------------+
ALTER TABLE t ADD COLUMN k INTEGER;
ALTER TABLE test_alt_table ADD COLUMN k INTEGER;
Affected Rows: 0
DESC TABLE t;
DESC TABLE test_alt_table;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
@@ -26,11 +26,11 @@ DESC TABLE t;
+-------+-------+------+---------+---------------+
-- SQLNESS ARG restart=true
ALTER TABLE t ADD COLUMN m INTEGER;
ALTER TABLE test_alt_table ADD COLUMN m INTEGER;
Affected Rows: 0
DESC TABLE t;
DESC TABLE test_alt_table;
+-------+-------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
@@ -41,7 +41,7 @@ DESC TABLE t;
| m | Int32 | YES | | FIELD |
+-------+-------+------+---------+---------------+
DROP TABLE t;
DROP TABLE test_alt_table;
Affected Rows: 1

View File

@@ -1,14 +1,14 @@
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
CREATE TABLE test_alt_table(i INTEGER, j BIGINT TIME INDEX);
DESC TABLE t;
DESC TABLE test_alt_table;
ALTER TABLE t ADD COLUMN k INTEGER;
ALTER TABLE test_alt_table ADD COLUMN k INTEGER;
DESC TABLE t;
DESC TABLE test_alt_table;
-- SQLNESS ARG restart=true
ALTER TABLE t ADD COLUMN m INTEGER;
ALTER TABLE test_alt_table ADD COLUMN m INTEGER;
DESC TABLE t;
DESC TABLE test_alt_table;
DROP TABLE t;
DROP TABLE test_alt_table;

View File

@@ -18,6 +18,7 @@ use std::path::{Path, PathBuf};
use std::process::Stdio;
// use tokio::process::{Child, Command};
use std::process::{Child, Command};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -35,7 +36,6 @@ use tokio::sync::Mutex as TokioMutex;
use crate::util;
const DATANODE_ADDR: &str = "127.0.0.1:4100";
const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log";
@@ -64,6 +64,8 @@ impl EnvController for Env {
}
}
static DATANODE_ID: AtomicU32 = AtomicU32::new(1);
#[allow(clippy::print_stdout)]
impl Env {
pub async fn start_standalone() -> GreptimeDB {
@@ -79,7 +81,7 @@ impl Env {
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_process: Arc::new(Mutex::new(server_process)),
server_processes: Arc::new(Mutex::new(vec![server_process])),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
@@ -95,14 +97,18 @@ impl Env {
// start a distributed GreptimeDB
let meta_server = Env::start_server("metasrv", &db_ctx, true).await;
let datanode = Env::start_server("datanode", &db_ctx, true).await;
let datanode_1 = Env::start_server("datanode", &db_ctx, true).await;
let datanode_2 = Env::start_server("datanode", &db_ctx, true).await;
let datanode_3 = Env::start_server("datanode", &db_ctx, true).await;
let frontend = Env::start_server("frontend", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_process: Arc::new(Mutex::new(datanode)),
server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
@@ -135,27 +141,41 @@ impl Env {
.open(log_file_name)
.unwrap_or_else(|_| panic!("Cannot open log file at {log_file_name}"));
let mut args = vec![
"--log-level=debug".to_string(),
subcommand.to_string(),
"start".to_string(),
];
match subcommand {
"datanode" | "standalone" => {
args.push("-c".to_string());
args.push(Self::generate_config_file(subcommand, db_ctx));
args.push("--http-addr=0.0.0.0:5001".to_string());
let (args, check_ip_addr) = match subcommand {
"datanode" => Self::datanode_start_args(db_ctx),
"standalone" => {
let args = vec![
"--log-level=debug".to_string(),
subcommand.to_string(),
"start".to_string(),
"-c".to_string(),
Self::generate_config_file(subcommand, db_ctx),
"--http-addr=0.0.0.0:5001".to_string(),
];
(args, SERVER_ADDR.to_string())
}
"frontend" => {
args.push("--metasrv-addr=0.0.0.0:3002".to_string());
args.push("--http-addr=0.0.0.0:5003".to_string());
let args = vec![
"--log-level=debug".to_string(),
subcommand.to_string(),
"start".to_string(),
"--metasrv-addr=0.0.0.0:3002".to_string(),
"--http-addr=0.0.0.0:5003".to_string(),
];
(args, SERVER_ADDR.to_string())
}
"metasrv" => {
args.push("--use-memory-store".to_string());
args.push("--http-addr=0.0.0.0:5002".to_string());
let args = vec![
"--log-level=debug".to_string(),
subcommand.to_string(),
"start".to_string(),
"--use-memory-store".to_string(),
"--http-addr=0.0.0.0:5001".to_string(),
];
(args, METASRV_ADDR.to_string())
}
_ => panic!("Unexpected subcommand: {subcommand}"),
}
};
let mut process = Command::new("./greptime")
.current_dir(util::get_binary_dir("debug"))
@@ -164,15 +184,7 @@ impl Env {
.spawn()
.unwrap_or_else(|_| panic!("Failed to start the DB with subcommand {subcommand}"));
// check connection
let ip_addr = match subcommand {
"datanode" => DATANODE_ADDR,
"frontend" => SERVER_ADDR,
"metasrv" => METASRV_ADDR,
"standalone" => SERVER_ADDR,
_ => panic!("Unexpected subcommand: {subcommand}"),
};
if !util::check_port(ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut process);
panic!("{subcommand} doesn't up in 10 seconds, quit.")
}
@@ -180,23 +192,56 @@ impl Env {
process
}
fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
let id = DATANODE_ID.fetch_add(1, Ordering::Relaxed);
let subcommand = "datanode";
let mut args = vec![
"--log-level=debug".to_string(),
subcommand.to_string(),
"start".to_string(),
];
args.push(format!("--rpc-addr=0.0.0.0:410{id}"));
args.push(format!("--mysql-addr=0.0.0.0:420{id}"));
args.push(format!("--http-addr=0.0.0.0:430{id}"));
args.push(format!(
"--data-home=/tmp/greptimedb_datanode_{}",
db_ctx.time
));
args.push(format!(
"--wal-dir=/tmp/greptimedb_datanode_{}_{id}/wal",
db_ctx.time
));
args.push(format!("--node-id={id}"));
args.push("--metasrv-addr=0.0.0.0:3002".to_string());
(args, format!("0.0.0.0:410{id}"))
}
/// stop and restart the server process
async fn restart_server(db: &GreptimeDB) {
{
let mut server_process = db.server_process.lock().unwrap();
Env::stop_server(&mut server_process);
let mut server_processes = db.server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
}
}
// check if the server is distributed or standalone
let subcommand = if db.is_standalone {
"standalone"
let new_server_processes = if db.is_standalone {
let new_server_process = Env::start_server("standalone", &db.ctx, false).await;
vec![new_server_process]
} else {
"datanode"
let mut processes = vec![];
DATANODE_ID.store(1, Ordering::Relaxed);
for _ in 0..3 {
let new_server_process = Env::start_server("datanode", &db.ctx, false).await;
processes.push(new_server_process);
}
processes
};
let new_server_process = Env::start_server(subcommand, &db.ctx, false).await;
let mut server_process = db.server_process.lock().unwrap();
*server_process = new_server_process;
let mut server_processes = db.server_processes.lock().unwrap();
*server_processes = new_server_processes;
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
@@ -248,7 +293,7 @@ impl Env {
}
pub struct GreptimeDB {
server_process: Arc<Mutex<Child>>,
server_processes: Arc<Mutex<Vec<Child>>>,
metasrv_process: Option<Child>,
frontend_process: Option<Child>,
client: TokioMutex<DB>,
@@ -281,8 +326,10 @@ impl Database for GreptimeDB {
impl GreptimeDB {
#![allow(clippy::print_stdout)]
fn stop(&mut self) {
let mut server = self.server_process.lock().unwrap();
Env::stop_server(&mut server);
let mut servers = self.server_processes.lock().unwrap();
for server in servers.iter_mut() {
Env::stop_server(server);
}
if let Some(mut metasrv) = self.metasrv_process.take() {
Env::stop_server(&mut metasrv);
}