From dd62f4c4074b2b54ad6f5ace6a4e833746506c3c Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 17 Jul 2023 15:06:32 +0800 Subject: [PATCH] feat: tool to migrate table metadata values (#1971) feat: tool to migrate table metadata values when upgrading to version 0.4 --- Cargo.lock | 2 + src/catalog/src/remote/manager.rs | 17 +++ src/cmd/Cargo.toml | 2 + src/cmd/src/cli.rs | 22 ++- src/cmd/src/cli/upgrade.rs | 165 ++++++++++++++++++++++ src/cmd/src/error.rs | 11 +- src/common/meta/src/key.rs | 27 +++- src/common/meta/src/key/datanode_table.rs | 6 +- src/common/meta/src/key/table_info.rs | 9 ++ src/common/meta/src/key/table_name.rs | 2 +- src/common/meta/src/key/table_region.rs | 9 ++ src/meta-srv/src/bootstrap.rs | 2 +- src/meta-srv/src/service/store/etcd.rs | 6 +- 13 files changed, 263 insertions(+), 17 deletions(-) create mode 100644 src/cmd/src/cli/upgrade.rs diff --git a/Cargo.lock b/Cargo.lock index c59bd9e789..60ae23a969 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1589,6 +1589,7 @@ name = "cmd" version = "0.3.2" dependencies = [ "anymap", + "async-trait", "build-data", "catalog", "clap 3.2.25", @@ -1603,6 +1604,7 @@ dependencies = [ "config", "datanode", "either", + "etcd-client", "frontend", "futures", "meta-client", diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index b7ff3f7d4e..399c1a0d96 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -458,6 +458,23 @@ async fn open_and_register_table( })?; info!("Successfully opened table, {table_ident}"); + if !memory_catalog_manager + .catalog_exist(&table_ident.catalog) + .await? + { + memory_catalog_manager.register_catalog_sync(table_ident.catalog.clone())?; + } + + if !memory_catalog_manager + .schema_exist(&table_ident.catalog, &table_ident.schema) + .await? + { + memory_catalog_manager.register_schema_sync(RegisterSchemaRequest { + catalog: table_ident.catalog.clone(), + schema: table_ident.schema.clone(), + })?; + } + let request = RegisterTableRequest { catalog: table_ident.catalog.clone(), schema: table_ident.schema.clone(), diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 94eed1841b..6a50259c7c 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,6 +16,7 @@ metrics-process = ["servers/metrics-process"] [dependencies] anymap = "1.0.0-beta.2" +async-trait.workspace = true catalog = { path = "../catalog" } clap = { version = "3.1", features = ["derive"] } client = { path = "../client" } @@ -30,6 +31,7 @@ common-telemetry = { path = "../common/telemetry", features = [ config = "0.13" datanode = { path = "../datanode" } either = "1.8" +etcd-client.workspace = true frontend = { path = "../frontend" } futures.workspace = true meta-client = { path = "../meta-client" } diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 778c2cb070..898288101a 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -15,21 +15,33 @@ mod cmd; mod helper; mod repl; +mod upgrade; +use async_trait::async_trait; use clap::Parser; use common_telemetry::logging::LoggingOptions; pub use repl::Repl; +use upgrade::UpgradeCommand; use crate::error::Result; use crate::options::{Options, TopLevelOptions}; -pub struct Instance { - repl: Repl, +#[async_trait] +pub trait Tool { + async fn do_work(&self) -> Result<()>; +} + +pub enum Instance { + Repl(Repl), + Tool(Box), } impl Instance { pub async fn start(&mut self) -> Result<()> { - self.repl.run().await + match self { + Instance::Repl(repl) => repl.run().await, + Instance::Tool(tool) => tool.do_work().await, + } } pub async fn stop(&self) -> Result<()> { @@ -63,12 +75,14 @@ impl Command { #[derive(Parser)] enum SubCommand { Attach(AttachCommand), + Upgrade(UpgradeCommand), } impl SubCommand { async fn build(self) -> Result { match self { SubCommand::Attach(cmd) => cmd.build().await, + SubCommand::Upgrade(cmd) => cmd.build().await, } } } @@ -86,7 +100,7 @@ pub(crate) struct AttachCommand { impl AttachCommand { async fn build(self) -> Result { let repl = Repl::try_new(&self).await?; - Ok(Instance { repl }) + Ok(Instance::Repl(repl)) } } diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs new file mode 100644 index 0000000000..edc26ee480 --- /dev/null +++ b/src/cmd/src/cli/upgrade.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use catalog::helper::TableGlobalValue; +use clap::Parser; +use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; +use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; +use common_meta::key::table_name::{TableNameKey, TableNameValue}; +use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue}; +use common_meta::key::TableMetaKey; +use common_meta::rpc::store::{BatchPutRequest, PutRequest, RangeRequest}; +use common_telemetry::info; +use etcd_client::Client; +use meta_srv::service::store::etcd::EtcdStore; +use meta_srv::service::store::kv::KvStoreRef; +use snafu::ResultExt; + +use crate::cli::{Instance, Tool}; +use crate::error::{ConnectEtcdSnafu, Result}; + +#[derive(Debug, Default, Parser)] +pub struct UpgradeCommand { + #[clap(long)] + etcd_addr: String, + #[clap(long)] + dryrun: bool, +} + +impl UpgradeCommand { + pub async fn build(&self) -> Result { + let client = Client::connect([&self.etcd_addr], None) + .await + .context(ConnectEtcdSnafu { + etcd_addr: &self.etcd_addr, + })?; + let tool = MigrateTableMetadata { + etcd_store: EtcdStore::with_etcd_client(client), + dryrun: self.dryrun, + }; + Ok(Instance::Tool(Box::new(tool))) + } +} + +struct MigrateTableMetadata { + etcd_store: KvStoreRef, + dryrun: bool, +} + +#[async_trait] +impl Tool for MigrateTableMetadata { + async fn do_work(&self) -> Result<()> { + let req = RangeRequest::new().with_prefix(b"__tg".to_vec()); + let resp = self.etcd_store.range(req).await.unwrap(); + for kv in resp.kvs { + let key = String::from_utf8_lossy(kv.key()); + let value = TableGlobalValue::from_bytes(kv.value()) + .unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}")); + + self.create_table_name_key(&value).await; + + self.create_datanode_table_keys(&value).await; + + self.split_table_global_value(&key, value).await; + } + Ok(()) + } +} + +impl MigrateTableMetadata { + async fn split_table_global_value(&self, key: &str, value: TableGlobalValue) { + let table_id = value.table_id(); + let region_distribution: RegionDistribution = value.regions_id_map.into_iter().collect(); + + let table_info_key = TableInfoKey::new(table_id); + let table_info_value = TableInfoValue::new(value.table_info); + + let table_region_key = TableRegionKey::new(table_id); + let table_region_value = TableRegionValue::new(region_distribution); + + info!("Splitting TableGlobalKey '{key}' into '{table_info_key}' and '{table_region_key}'"); + + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store + .batch_put( + BatchPutRequest::new() + .add_kv( + table_info_key.as_raw_key(), + table_info_value.try_as_raw_value().unwrap(), + ) + .add_kv( + table_region_key.as_raw_key(), + table_region_value.try_as_raw_value().unwrap(), + ), + ) + .await + .unwrap(); + } + } + + async fn create_table_name_key(&self, value: &TableGlobalValue) { + let table_info = &value.table_info; + let table_id = value.table_id(); + + let table_name_key = TableNameKey::new( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name, + ); + let table_name_value = TableNameValue::new(table_id); + + info!("Creating '{table_name_key}' => {table_id}"); + + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store + .put( + PutRequest::new() + .with_key(table_name_key.as_raw_key()) + .with_value(table_name_value.try_as_raw_value().unwrap()), + ) + .await + .unwrap(); + } + } + + async fn create_datanode_table_keys(&self, value: &TableGlobalValue) { + let table_id = value.table_id(); + let region_distribution: RegionDistribution = + value.regions_id_map.clone().into_iter().collect(); + + let datanode_table_kvs = region_distribution + .into_iter() + .map(|(datanode_id, regions)| { + let k = DatanodeTableKey::new(datanode_id, table_id); + info!("Creating DatanodeTableKey '{k}' => {regions:?}"); + (k, DatanodeTableValue::new(table_id, regions)) + }) + .collect::>(); + + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + let mut req = BatchPutRequest::new(); + for (key, value) in datanode_table_kvs { + req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap()); + } + self.etcd_store.batch_put(req).await.unwrap(); + } + } +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index d4386bc031..c291ef5d4c 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -154,6 +154,13 @@ pub enum Error { location: Location, source: catalog::error::Error, }, + + #[snafu(display("Failed to connect to Etcd at {etcd_addr}, source: {}", source))] + ConnectEtcd { + etcd_addr: String, + source: etcd_client::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -173,7 +180,9 @@ impl ErrorExt for Error { | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } | Error::InvalidReplCommand { .. } - | Error::IllegalAuthConfig { .. } => StatusCode::InvalidArguments, + | Error::IllegalAuthConfig { .. } + | Error::ConnectEtcd { .. } => StatusCode::InvalidArguments, + Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c691bd0ea5..99af558283 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -49,13 +49,13 @@ mod table_route; use std::sync::Arc; -use datanode_table::{DatanodeTableManager, DatanodeTableValue}; +use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; use snafu::ResultExt; -use table_info::{TableInfoManager, TableInfoValue}; -use table_name::{TableNameManager, TableNameValue}; -use table_region::{TableRegionManager, TableRegionValue}; +use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; +use table_name::{TableNameKey, TableNameManager, TableNameValue}; +use table_region::{TableRegionKey, TableRegionManager, TableRegionValue}; use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; @@ -126,6 +126,25 @@ impl TableMetadataManager { } } +macro_rules! impl_table_meta_key { + ( $($val_ty: ty), *) => { + $( + impl std::fmt::Display for $val_ty { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key())) + } + } + )* + } +} + +impl_table_meta_key!( + TableNameKey<'_>, + TableInfoKey, + TableRegionKey, + DatanodeTableKey +); + macro_rules! impl_table_meta_value { ( $($val_ty: ty), *) => { $( diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 58cfb9de66..a1908dd5ef 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -27,13 +27,13 @@ use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest}; use crate::DatanodeId; -struct DatanodeTableKey { +pub struct DatanodeTableKey { datanode_id: DatanodeId, table_id: TableId, } impl DatanodeTableKey { - fn new(datanode_id: DatanodeId, table_id: TableId) -> Self { + pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self { Self { datanode_id, table_id, @@ -81,7 +81,7 @@ pub struct DatanodeTableValue { } impl DatanodeTableValue { - fn new(table_id: TableId, regions: Vec) -> Self { + pub fn new(table_id: TableId, regions: Vec) -> Self { Self { table_id, regions, diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index ba46ab8932..9510027356 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -46,6 +46,15 @@ pub struct TableInfoValue { version: u64, } +impl TableInfoValue { + pub fn new(table_info: RawTableInfo) -> Self { + Self { + table_info, + version: 0, + } + } +} + pub struct TableInfoManager { kv_backend: KvBackendRef, } diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 2c16db0b92..d45a88a6fd 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -123,7 +123,7 @@ pub struct TableNameValue { } impl TableNameValue { - fn new(table_id: TableId) -> Self { + pub fn new(table_id: TableId) -> Self { Self { table_id } } diff --git a/src/common/meta/src/key/table_region.rs b/src/common/meta/src/key/table_region.rs index 21beeb5817..a1e259ea94 100644 --- a/src/common/meta/src/key/table_region.rs +++ b/src/common/meta/src/key/table_region.rs @@ -52,6 +52,15 @@ pub struct TableRegionValue { version: u64, } +impl TableRegionValue { + pub fn new(region_distribution: RegionDistribution) -> Self { + Self { + region_distribution, + version: 0, + } + } +} + pub struct TableRegionManager { kv_backend: KvBackendRef, } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 1463cc1baa..6c5746df1b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -166,7 +166,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { .await .context(error::ConnectEtcdSnafu)?; ( - EtcdStore::with_etcd_client(etcd_client.clone())?, + EtcdStore::with_etcd_client(etcd_client.clone()), Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?), Some(EtcdLock::with_etcd_client(etcd_client)?), ) diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 1a6891da76..50c18fc9be 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -57,11 +57,11 @@ impl EtcdStore { .await .context(error::ConnectEtcdSnafu)?; - Self::with_etcd_client(client) + Ok(Self::with_etcd_client(client)) } - pub fn with_etcd_client(client: Client) -> Result { - Ok(Arc::new(Self { client })) + pub fn with_etcd_client(client: Client) -> KvStoreRef { + Arc::new(Self { client }) } async fn do_multi_txn(&self, txn_ops: Vec) -> Result> {