feat: add table route manager and upgrade tool (#2145)

* feat: add table route manager and upgrade tool

* test: add table route manager tests

* feat: add new TableRouteValue struct

* chore: apply suggestions from CR

* refactor: change HashMap to BTreeMap

* feat: add version to TableRouteValue
This commit is contained in:
Weny Xu
2023-08-14 12:19:44 +08:00
committed by GitHub
parent 5b012a1f67
commit d0b3607633
9 changed files with 329 additions and 27 deletions

1
Cargo.lock generated
View File

@@ -1601,6 +1601,7 @@ dependencies = [
"metrics",
"nu-ansi-term",
"partition",
"prost",
"query",
"rand",
"rexpect",

View File

@@ -42,6 +42,7 @@ meta-srv = { workspace = true }
metrics.workspace = true
nu-ansi-term = "0.46"
partition = { workspace = true }
prost.workspace = true
query = { workspace = true }
rand.workspace = true
rustyline = "10.1"

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use clap::Parser;
use client::api::v1::meta::TableRouteValue;
use common_meta::error as MetaError;
use common_meta::helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};
use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue};
@@ -24,8 +25,10 @@ use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{NextTableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::TableMetaKey;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::router::TableRoute;
use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
use common_meta::rpc::KeyValue;
use common_meta::util::get_prefix_end_key;
@@ -34,6 +37,7 @@ use etcd_client::Client;
use futures::TryStreamExt;
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
use prost::Message;
use snafu::ResultExt;
use crate::cli::{Instance, Tool};
@@ -45,6 +49,15 @@ pub struct UpgradeCommand {
etcd_addr: String,
#[clap(long)]
dryrun: bool,
#[clap(long)]
skip_table_global_keys: bool,
#[clap(long)]
skip_catalog_keys: bool,
#[clap(long)]
skip_schema_keys: bool,
#[clap(long)]
skip_table_route_keys: bool,
}
impl UpgradeCommand {
@@ -57,6 +70,10 @@ impl UpgradeCommand {
let tool = MigrateTableMetadata {
etcd_store: EtcdStore::with_etcd_client(client),
dryrun: self.dryrun,
skip_catalog_keys: self.skip_catalog_keys,
skip_table_global_keys: self.skip_table_global_keys,
skip_schema_keys: self.skip_schema_keys,
skip_table_route_keys: self.skip_table_route_keys,
};
Ok(Instance::Tool(Box::new(tool)))
}
@@ -65,15 +82,32 @@ impl UpgradeCommand {
struct MigrateTableMetadata {
etcd_store: KvStoreRef,
dryrun: bool,
skip_table_global_keys: bool,
skip_catalog_keys: bool,
skip_schema_keys: bool,
skip_table_route_keys: bool,
}
#[async_trait]
impl Tool for MigrateTableMetadata {
// migrates database's metadata from 0.3 to 0.4.
async fn do_work(&self) -> Result<()> {
self.migrate_table_global_values().await?;
self.migrate_catalog_keys().await?;
self.migrate_schema_keys().await?;
if !self.skip_table_global_keys {
self.migrate_table_global_values().await?;
}
if !self.skip_catalog_keys {
self.migrate_catalog_keys().await?;
}
if !self.skip_schema_keys {
self.migrate_schema_keys().await?;
}
if !self.skip_table_route_keys {
self.migrate_table_route_keys().await?;
}
Ok(())
}
}
@@ -81,6 +115,62 @@ impl Tool for MigrateTableMetadata {
const PAGE_SIZE: usize = 1000;
impl MigrateTableMetadata {
async fn migrate_table_route_keys(&self) -> Result<()> {
let key = b"__meta_table_route".to_vec();
let range_end = get_prefix_end_key(&key);
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
let value =
TableRouteValue::decode(&kv.value[..]).context(MetaError::DecodeProtoSnafu)?;
Ok((kv.key, value))
}),
);
while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? {
self.migrate_table_route_key(value).await?;
keys.push(key);
}
info!("Total migrated TableRouteKeys: {}", keys.len());
self.delete_migrated_keys(keys).await;
Ok(())
}
async fn migrate_table_route_key(&self, value: TableRouteValue) -> Result<()> {
let table_route = TableRoute::try_from_raw(
&value.peers,
value.table_route.expect("expected table_route"),
)
.unwrap();
let new_table_value = NextTableRouteValue::new(table_route.region_routes);
let new_key = NextTableRouteKey::new(table_route.table.id as u32);
info!("Creating '{new_key}'");
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_value(new_table_value.try_as_raw_value().unwrap()),
)
.await
.unwrap();
}
Ok(())
}
async fn migrate_schema_keys(&self) -> Result<()> {
// The schema key prefix.
let key = b"__s".to_vec();

View File

@@ -24,6 +24,12 @@ use table::metadata::TableId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to decode protobuf, source: {}", source))]
DecodeProto {
location: Location,
source: prost::DecodeError,
},
#[snafu(display("Failed to encode object into json, source: {}", source))]
EncodeJson {
location: Location,
@@ -164,7 +170,8 @@ impl ErrorExt for Error {
EncodeJson { .. }
| DecodeJson { .. }
| PayloadNotExist { .. }
| ConvertRawKey { .. } => StatusCode::Unexpected,
| ConvertRawKey { .. }
| DecodeProto { .. } => StatusCode::Unexpected,
MetaSrv { source, .. } => source.status_code(),

View File

@@ -53,7 +53,9 @@ pub mod schema_name;
pub mod table_info;
pub mod table_name;
pub mod table_region;
mod table_route;
// TODO(weny): removes it.
#[allow(unused)]
pub mod table_route;
use std::sync::Arc;
@@ -67,6 +69,7 @@ use table_region::{TableRegionKey, TableRegionManager, TableRegionValue};
use self::catalog_name::{CatalogManager, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameValue};
use self::table_route::TableRouteValue;
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::KvBackendRef;
@@ -212,7 +215,8 @@ impl_table_meta_value! {
TableNameValue,
TableInfoValue,
TableRegionValue,
DatanodeTableValue
DatanodeTableValue,
TableRouteValue
}
#[cfg(test)]

View File

@@ -15,12 +15,161 @@
use std::fmt::Display;
use api::v1::meta::TableName;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use table::metadata::TableId;
use crate::key::to_removed_key;
use crate::error::{Result, UnexpectedSnafu};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{RegionRoute, Table, TableRoute};
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
pub const NEXT_TABLE_ROUTE_PREFIX: &str = "__table_route";
// TODO(weny): Renames it to TableRouteKey.
pub struct NextTableRouteKey {
pub table_id: TableId,
}
impl NextTableRouteKey {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct TableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}
impl TableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: 0,
}
}
}
impl TableMetaKey for NextTableRouteKey {
fn as_raw_key(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
}
impl Display for NextTableRouteKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", NEXT_TABLE_ROUTE_PREFIX, self.table_id)
}
}
pub struct TableRouteManager {
kv_backend: KvBackendRef,
}
impl TableRouteManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let key = NextTableRouteKey::new(table_id);
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|kv| TableRouteValue::try_from_raw_value(kv.value))
.transpose()
}
// Creates TableRoute key and value. If the key already exists, check whether the value is the same.
pub async fn create(&self, table_id: TableId, region_routes: Vec<RegionRoute>) -> Result<()> {
let key = NextTableRouteKey::new(table_id);
let val = TableRouteValue::new(region_routes);
let req = CompareAndPutRequest::new()
.with_key(key.as_raw_key())
.with_value(val.try_as_raw_value()?);
self.kv_backend.compare_and_put(req).await?.handle(|resp| {
if !resp.success {
let Some(cur) = resp
.prev_kv
.map(|kv|TableRouteValue::try_from_raw_value(kv.value))
.transpose()?
else {
return UnexpectedSnafu {
err_msg: format!("compare_and_put expect None but failed with current value None, key: {key}, val: {val:?}"),
}.fail();
};
ensure!(
cur==val,
UnexpectedSnafu {
err_msg: format!("current value '{cur:?}' already existed for key '{key}', {val:?} is not set"),
}
);
}
Ok(())
})
}
/// Compares and puts value of key. `expect` is the expected value, if backend's current value associated
/// with key is the same as `expect`, the value will be updated to `val`.
///
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
/// - If associated value is not the same as `expect`, no value will be updated and an
/// `Ok(Err(Option<TableRoute>))` will be returned. The `Option<TableRoute>` indicates
/// the current associated value of key.
/// - If any error happens during operation, an `Err(Error)` will be returned.
pub async fn compare_and_put(
&self,
table_id: TableId,
expect: Option<TableRouteValue>,
region_routes: Vec<RegionRoute>,
) -> Result<std::result::Result<(), Option<TableRouteValue>>> {
let key = NextTableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let (expect, version) = if let Some(x) = expect {
(x.try_as_raw_value()?, x.version + 1)
} else {
(vec![], 0)
};
let value = TableRouteValue {
region_routes,
version,
};
let raw_value = value.try_as_raw_value()?;
let req = CompareAndPutRequest::new()
.with_key(raw_key)
.with_expect(expect)
.with_value(raw_value);
self.kv_backend.compare_and_put(req).await?.handle(|resp| {
Ok(if resp.success {
Ok(())
} else {
Err(resp
.prev_kv
.map(|x| TableRouteValue::try_from_raw_value(x.value))
.transpose()?)
})
})
}
pub async fn remove(&self, table_id: TableId) -> Result<()> {
let key = NextTableRouteKey::new(table_id).as_raw_key();
let removed_key = to_removed_key(&String::from_utf8_lossy(&key));
let req = MoveValueRequest::new(key, removed_key.as_bytes());
self.kv_backend.move_value(req).await?;
Ok(())
}
}
#[derive(Copy, Clone)]
pub struct TableRouteKey<'a> {
pub table_id: TableId,
@@ -59,9 +208,51 @@ impl<'a> Display for TableRouteKey<'a> {
#[cfg(test)]
mod tests {
use api::v1::meta::TableName;
use std::sync::Arc;
use api::v1::meta::TableName as PbTableName;
use super::TableRouteKey;
use crate::key::table_route::{TableRouteManager, TableRouteValue};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::rpc::router::{RegionRoute, Table, TableRoute};
use crate::table_name::TableName;
#[tokio::test]
async fn test_table_route_manager() {
let mgr = TableRouteManager::new(Arc::new(MemoryKvBackend::default()));
let table_id = 1024u32;
let table = Table {
id: table_id as u64,
table_name: TableName::new("foo", "bar", "baz"),
table_schema: b"mock schema".to_vec(),
};
let region_route = RegionRoute::default();
let region_routes = vec![region_route];
mgr.create(table_id, region_routes.clone()).await.unwrap();
let got = mgr.get(1024).await.unwrap().unwrap();
assert_eq!(got.region_routes, region_routes);
let empty = mgr.get(1023).await.unwrap();
assert!(empty.is_none());
let expect = TableRouteValue::new(region_routes);
let mut updated = expect.clone();
updated.region_routes.push(RegionRoute::default());
mgr.compare_and_put(1024, Some(expect.clone()), updated.region_routes.clone())
.await
.unwrap();
mgr.compare_and_put(1024, Some(expect.clone()), updated.region_routes)
.await
.unwrap();
}
#[test]
fn test_table_route_key() {
@@ -87,7 +278,7 @@ mod tests {
#[test]
fn test_with_table_name() {
let table_name = TableName {
let table_name = PbTableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),

View File

@@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue,
};
use serde::{Deserialize, Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
@@ -85,7 +85,6 @@ impl TableRoute {
.iter()
.map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
.collect::<HashMap<_, _>>();
Self {
table,
region_routes,
@@ -237,7 +236,7 @@ impl TryFrom<PbTableRouteValue> for TableRoute {
pub struct Table {
pub id: u64,
pub table_name: TableName,
#[serde(serialize_with = "as_utf8")]
#[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
pub table_schema: Vec<u8>,
}
@@ -281,7 +280,7 @@ pub struct Region {
pub id: RegionId,
pub name: String,
pub partition: Option<Partition>,
pub attrs: HashMap<String, String>,
pub attrs: BTreeMap<String, String>,
}
impl From<PbRegion> for Region {
@@ -290,7 +289,7 @@ impl From<PbRegion> for Region {
id: r.id.into(),
name: r.name,
partition: r.partition.map(Into::into),
attrs: r.attrs,
attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
}
}
}
@@ -301,7 +300,7 @@ impl From<Region> for PbRegion {
id: region.id.into(),
name: region.name,
partition: region.partition.map(Into::into),
attrs: region.attrs,
attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
}
}
}
@@ -322,6 +321,15 @@ fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::O
)
}
pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(s.into_bytes())
}
fn as_utf8_vec<S: Serializer>(
val: &[Vec<u8>],
serializer: S,
@@ -513,7 +521,7 @@ mod tests {
id: 1.into(),
name: "r1".to_string(),
partition: None,
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(2, "a2")),
follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
@@ -523,7 +531,7 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
partition: None,
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],

View File

@@ -612,9 +612,9 @@ impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
}
impl CompareAndPutResponse {
pub fn handle<R, E, F>(&self, f: F) -> std::result::Result<R, E>
pub fn handle<R, E, F>(self, f: F) -> std::result::Result<R, E>
where
F: FnOnce(&Self) -> std::result::Result<R, E>,
F: FnOnce(Self) -> std::result::Result<R, E>,
{
f(self)
}

View File

@@ -377,7 +377,7 @@ impl PartitionExec {
#[cfg(test)]
pub(crate) mod test {
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU32, Ordering};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -445,7 +445,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(3, "")),
follower_peers: vec![],
@@ -462,7 +462,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(2, "")),
follower_peers: vec![],
@@ -479,7 +479,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
@@ -517,7 +517,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
@@ -537,7 +537,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
@@ -554,7 +554,7 @@ pub(crate) mod test {
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],