From d0b360763397e760b5e9e946422eba93b00eb7e2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 14 Aug 2023 12:19:44 +0800 Subject: [PATCH] feat: add table route manager and upgrade tool (#2145) * feat: add table route manager and upgrade tool * test: add table route manager tests * feat: add new TableRouteValue struct * chore: apply suggestions from CR * refactor: change HashMap to BTreeMap * feat: add version to TableRouteValue --- Cargo.lock | 1 + src/cmd/Cargo.toml | 1 + src/cmd/src/cli/upgrade.rs | 96 +++++++++++- src/common/meta/src/error.rs | 9 +- src/common/meta/src/key.rs | 8 +- src/common/meta/src/key/table_route.rs | 197 ++++++++++++++++++++++++- src/common/meta/src/rpc/router.rs | 26 ++-- src/common/meta/src/rpc/store.rs | 4 +- src/frontend/src/table.rs | 14 +- 9 files changed, 329 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15a87e7630..96bb529ff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1601,6 +1601,7 @@ dependencies = [ "metrics", "nu-ansi-term", "partition", + "prost", "query", "rand", "rexpect", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index c3a8f4de8b..a458e74949 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -42,6 +42,7 @@ meta-srv = { workspace = true } metrics.workspace = true nu-ansi-term = "0.46" partition = { workspace = true } +prost.workspace = true query = { workspace = true } rand.workspace = true rustyline = "10.1" diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 18c203328c..8e4d02fad8 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use async_trait::async_trait; use clap::Parser; +use client::api::v1::meta::TableRouteValue; use common_meta::error as MetaError; use common_meta::helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue}; use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue}; @@ -24,8 +25,10 @@ use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue}; +use common_meta::key::table_route::{NextTableRouteKey, TableRouteValue as NextTableRouteValue}; use common_meta::key::TableMetaKey; use common_meta::range_stream::PaginationStream; +use common_meta::rpc::router::TableRoute; use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest}; use common_meta::rpc::KeyValue; use common_meta::util::get_prefix_end_key; @@ -34,6 +37,7 @@ use etcd_client::Client; use futures::TryStreamExt; use meta_srv::service::store::etcd::EtcdStore; use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; +use prost::Message; use snafu::ResultExt; use crate::cli::{Instance, Tool}; @@ -45,6 +49,15 @@ pub struct UpgradeCommand { etcd_addr: String, #[clap(long)] dryrun: bool, + + #[clap(long)] + skip_table_global_keys: bool, + #[clap(long)] + skip_catalog_keys: bool, + #[clap(long)] + skip_schema_keys: bool, + #[clap(long)] + skip_table_route_keys: bool, } impl UpgradeCommand { @@ -57,6 +70,10 @@ impl UpgradeCommand { let tool = MigrateTableMetadata { etcd_store: EtcdStore::with_etcd_client(client), dryrun: self.dryrun, + skip_catalog_keys: self.skip_catalog_keys, + skip_table_global_keys: self.skip_table_global_keys, + skip_schema_keys: self.skip_schema_keys, + skip_table_route_keys: self.skip_table_route_keys, }; Ok(Instance::Tool(Box::new(tool))) } @@ -65,15 +82,32 @@ impl UpgradeCommand { struct MigrateTableMetadata { etcd_store: KvStoreRef, dryrun: bool, + + skip_table_global_keys: bool, + + skip_catalog_keys: bool, + + skip_schema_keys: bool, + + skip_table_route_keys: bool, } #[async_trait] impl Tool for MigrateTableMetadata { // migrates database's metadata from 0.3 to 0.4. async fn do_work(&self) -> Result<()> { - self.migrate_table_global_values().await?; - self.migrate_catalog_keys().await?; - self.migrate_schema_keys().await?; + if !self.skip_table_global_keys { + self.migrate_table_global_values().await?; + } + if !self.skip_catalog_keys { + self.migrate_catalog_keys().await?; + } + if !self.skip_schema_keys { + self.migrate_schema_keys().await?; + } + if !self.skip_table_route_keys { + self.migrate_table_route_keys().await?; + } Ok(()) } } @@ -81,6 +115,62 @@ impl Tool for MigrateTableMetadata { const PAGE_SIZE: usize = 1000; impl MigrateTableMetadata { + async fn migrate_table_route_keys(&self) -> Result<()> { + let key = b"__meta_table_route".to_vec(); + let range_end = get_prefix_end_key(&key); + let mut keys = Vec::new(); + info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); + + let mut stream = PaginationStream::new( + KvBackendAdapter::wrap(self.etcd_store.clone()), + RangeRequest::new().with_range(key, range_end), + PAGE_SIZE, + Arc::new(|kv: KeyValue| { + let value = + TableRouteValue::decode(&kv.value[..]).context(MetaError::DecodeProtoSnafu)?; + Ok((kv.key, value)) + }), + ); + + while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? { + self.migrate_table_route_key(value).await?; + keys.push(key); + } + + info!("Total migrated TableRouteKeys: {}", keys.len()); + self.delete_migrated_keys(keys).await; + + Ok(()) + } + + async fn migrate_table_route_key(&self, value: TableRouteValue) -> Result<()> { + let table_route = TableRoute::try_from_raw( + &value.peers, + value.table_route.expect("expected table_route"), + ) + .unwrap(); + + let new_table_value = NextTableRouteValue::new(table_route.region_routes); + + let new_key = NextTableRouteKey::new(table_route.table.id as u32); + info!("Creating '{new_key}'"); + + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store + .put( + PutRequest::new() + .with_key(new_key.as_raw_key()) + .with_value(new_table_value.try_as_raw_value().unwrap()), + ) + .await + .unwrap(); + } + + Ok(()) + } + async fn migrate_schema_keys(&self) -> Result<()> { // The schema key prefix. let key = b"__s".to_vec(); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 7492c0d6ef..ef892a24e1 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -24,6 +24,12 @@ use table::metadata::TableId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to decode protobuf, source: {}", source))] + DecodeProto { + location: Location, + source: prost::DecodeError, + }, + #[snafu(display("Failed to encode object into json, source: {}", source))] EncodeJson { location: Location, @@ -164,7 +170,8 @@ impl ErrorExt for Error { EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } - | ConvertRawKey { .. } => StatusCode::Unexpected, + | ConvertRawKey { .. } + | DecodeProto { .. } => StatusCode::Unexpected, MetaSrv { source, .. } => source.status_code(), diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c0ff12b263..e06122b62f 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -53,7 +53,9 @@ pub mod schema_name; pub mod table_info; pub mod table_name; pub mod table_region; -mod table_route; +// TODO(weny): removes it. +#[allow(unused)] +pub mod table_route; use std::sync::Arc; @@ -67,6 +69,7 @@ use table_region::{TableRegionKey, TableRegionManager, TableRegionValue}; use self::catalog_name::{CatalogManager, CatalogNameValue}; use self::schema_name::{SchemaManager, SchemaNameValue}; +use self::table_route::TableRouteValue; use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::KvBackendRef; @@ -212,7 +215,8 @@ impl_table_meta_value! { TableNameValue, TableInfoValue, TableRegionValue, - DatanodeTableValue + DatanodeTableValue, + TableRouteValue } #[cfg(test)] diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 6bd189d4c1..b4a06d0b2c 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -15,12 +15,161 @@ use std::fmt::Display; use api::v1::meta::TableName; +use serde::{Deserialize, Serialize}; +use snafu::ensure; use table::metadata::TableId; -use crate::key::to_removed_key; +use crate::error::{Result, UnexpectedSnafu}; +use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::router::{RegionRoute, Table, TableRoute}; +use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; +pub const NEXT_TABLE_ROUTE_PREFIX: &str = "__table_route"; + +// TODO(weny): Renames it to TableRouteKey. +pub struct NextTableRouteKey { + pub table_id: TableId, +} + +impl NextTableRouteKey { + pub fn new(table_id: TableId) -> Self { + Self { table_id } + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct TableRouteValue { + pub region_routes: Vec, + version: u64, +} + +impl TableRouteValue { + pub fn new(region_routes: Vec) -> Self { + Self { + region_routes, + version: 0, + } + } +} + +impl TableMetaKey for NextTableRouteKey { + fn as_raw_key(&self) -> Vec { + self.to_string().into_bytes() + } +} + +impl Display for NextTableRouteKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", NEXT_TABLE_ROUTE_PREFIX, self.table_id) + } +} + +pub struct TableRouteManager { + kv_backend: KvBackendRef, +} + +impl TableRouteManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + pub async fn get(&self, table_id: TableId) -> Result> { + let key = NextTableRouteKey::new(table_id); + self.kv_backend + .get(&key.as_raw_key()) + .await? + .map(|kv| TableRouteValue::try_from_raw_value(kv.value)) + .transpose() + } + + // Creates TableRoute key and value. If the key already exists, check whether the value is the same. + pub async fn create(&self, table_id: TableId, region_routes: Vec) -> Result<()> { + let key = NextTableRouteKey::new(table_id); + let val = TableRouteValue::new(region_routes); + let req = CompareAndPutRequest::new() + .with_key(key.as_raw_key()) + .with_value(val.try_as_raw_value()?); + + self.kv_backend.compare_and_put(req).await?.handle(|resp| { + if !resp.success { + let Some(cur) = resp + .prev_kv + .map(|kv|TableRouteValue::try_from_raw_value(kv.value)) + .transpose()? + else { + return UnexpectedSnafu { + err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, val: {val:?}"), + }.fail(); + }; + + ensure!( + cur==val, + UnexpectedSnafu { + err_msg: format!("current value '{cur:?}' already existed for key '{key}', {val:?} is not set"), + } + ); + } + Ok(()) + }) + } + + /// Compares and puts value of key. `expect` is the expected value, if backend's current value associated + /// with key is the same as `expect`, the value will be updated to `val`. + /// + /// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())` + /// - If associated value is not the same as `expect`, no value will be updated and an + /// `Ok(Err(Option))` will be returned. The `Option` indicates + /// the current associated value of key. + /// - If any error happens during operation, an `Err(Error)` will be returned. + pub async fn compare_and_put( + &self, + table_id: TableId, + expect: Option, + region_routes: Vec, + ) -> Result>> { + let key = NextTableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + + let (expect, version) = if let Some(x) = expect { + (x.try_as_raw_value()?, x.version + 1) + } else { + (vec![], 0) + }; + let value = TableRouteValue { + region_routes, + version, + }; + let raw_value = value.try_as_raw_value()?; + + let req = CompareAndPutRequest::new() + .with_key(raw_key) + .with_expect(expect) + .with_value(raw_value); + + self.kv_backend.compare_and_put(req).await?.handle(|resp| { + Ok(if resp.success { + Ok(()) + } else { + Err(resp + .prev_kv + .map(|x| TableRouteValue::try_from_raw_value(x.value)) + .transpose()?) + }) + }) + } + + pub async fn remove(&self, table_id: TableId) -> Result<()> { + let key = NextTableRouteKey::new(table_id).as_raw_key(); + let removed_key = to_removed_key(&String::from_utf8_lossy(&key)); + let req = MoveValueRequest::new(key, removed_key.as_bytes()); + self.kv_backend.move_value(req).await?; + Ok(()) + } +} + #[derive(Copy, Clone)] pub struct TableRouteKey<'a> { pub table_id: TableId, @@ -59,9 +208,51 @@ impl<'a> Display for TableRouteKey<'a> { #[cfg(test)] mod tests { - use api::v1::meta::TableName; + use std::sync::Arc; + + use api::v1::meta::TableName as PbTableName; use super::TableRouteKey; + use crate::key::table_route::{TableRouteManager, TableRouteValue}; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::rpc::router::{RegionRoute, Table, TableRoute}; + use crate::table_name::TableName; + + #[tokio::test] + async fn test_table_route_manager() { + let mgr = TableRouteManager::new(Arc::new(MemoryKvBackend::default())); + + let table_id = 1024u32; + let table = Table { + id: table_id as u64, + table_name: TableName::new("foo", "bar", "baz"), + table_schema: b"mock schema".to_vec(), + }; + let region_route = RegionRoute::default(); + let region_routes = vec![region_route]; + + mgr.create(table_id, region_routes.clone()).await.unwrap(); + + let got = mgr.get(1024).await.unwrap().unwrap(); + + assert_eq!(got.region_routes, region_routes); + + let empty = mgr.get(1023).await.unwrap(); + assert!(empty.is_none()); + + let expect = TableRouteValue::new(region_routes); + + let mut updated = expect.clone(); + updated.region_routes.push(RegionRoute::default()); + + mgr.compare_and_put(1024, Some(expect.clone()), updated.region_routes.clone()) + .await + .unwrap(); + + mgr.compare_and_put(1024, Some(expect.clone()), updated.region_routes) + .await + .unwrap(); + } #[test] fn test_table_route_key() { @@ -87,7 +278,7 @@ mod tests { #[test] fn test_with_table_name() { - let table_name = TableName { + let table_name = PbTableName { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), table_name: "demo".to_string(), diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index a4da3f3e14..ac5d815015 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use api::v1::meta::{ Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue, }; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; @@ -85,7 +85,6 @@ impl TableRoute { .iter() .map(|x| (x.region.id.region_number(), x.leader_peer.clone())) .collect::>(); - Self { table, region_routes, @@ -237,7 +236,7 @@ impl TryFrom for TableRoute { pub struct Table { pub id: u64, pub table_name: TableName, - #[serde(serialize_with = "as_utf8")] + #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")] pub table_schema: Vec, } @@ -281,7 +280,7 @@ pub struct Region { pub id: RegionId, pub name: String, pub partition: Option, - pub attrs: HashMap, + pub attrs: BTreeMap, } impl From for Region { @@ -290,7 +289,7 @@ impl From for Region { id: r.id.into(), name: r.name, partition: r.partition.map(Into::into), - attrs: r.attrs, + attrs: r.attrs.into_iter().collect::>(), } } } @@ -301,7 +300,7 @@ impl From for PbRegion { id: region.id.into(), name: region.name, partition: region.partition.map(Into::into), - attrs: region.attrs, + attrs: region.attrs.into_iter().collect::>(), } } } @@ -322,6 +321,15 @@ fn as_utf8(val: &[u8], serializer: S) -> std::result::Result(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + + Ok(s.into_bytes()) +} + fn as_utf8_vec( val: &[Vec], serializer: S, @@ -513,7 +521,7 @@ mod tests { id: 1.into(), name: "r1".to_string(), partition: None, - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(2, "a2")), follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")], @@ -523,7 +531,7 @@ mod tests { id: 2.into(), name: "r2".to_string(), partition: None, - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index e69a2f87bd..2426442e3f 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -612,9 +612,9 @@ impl TryFrom for CompareAndPutResponse { } impl CompareAndPutResponse { - pub fn handle(&self, f: F) -> std::result::Result + pub fn handle(self, f: F) -> std::result::Result where - F: FnOnce(&Self) -> std::result::Result, + F: FnOnce(Self) -> std::result::Result, { f(self) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 90e3efe618..52758ad64a 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -377,7 +377,7 @@ impl PartitionExec { #[cfg(test)] pub(crate) mod test { - use std::collections::HashMap; + use std::collections::BTreeMap; use std::sync::atomic::{AtomicU32, Ordering}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -445,7 +445,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(3, "")), follower_peers: vec![], @@ -462,7 +462,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(2, "")), follower_peers: vec![], @@ -479,7 +479,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], @@ -517,7 +517,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: None, follower_peers: vec![], @@ -537,7 +537,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: None, follower_peers: vec![], @@ -554,7 +554,7 @@ pub(crate) mod test { .try_into() .unwrap(), ), - attrs: HashMap::new(), + attrs: BTreeMap::new(), }, leader_peer: None, follower_peers: vec![],