feat: tool to migrate table metadata values (#1971)

feat: tool to migrate table metadata values when upgrading to version 0.4
This commit is contained in:
LFC
2023-07-17 15:06:32 +08:00
committed by GitHub
parent 4fd37d9d4e
commit dd62f4c407
13 changed files with 263 additions and 17 deletions

2
Cargo.lock generated
View File

@@ -1589,6 +1589,7 @@ name = "cmd"
version = "0.3.2"
dependencies = [
"anymap",
"async-trait",
"build-data",
"catalog",
"clap 3.2.25",
@@ -1603,6 +1604,7 @@ dependencies = [
"config",
"datanode",
"either",
"etcd-client",
"frontend",
"futures",
"meta-client",

View File

@@ -458,6 +458,23 @@ async fn open_and_register_table(
})?;
info!("Successfully opened table, {table_ident}");
if !memory_catalog_manager
.catalog_exist(&table_ident.catalog)
.await?
{
memory_catalog_manager.register_catalog_sync(table_ident.catalog.clone())?;
}
if !memory_catalog_manager
.schema_exist(&table_ident.catalog, &table_ident.schema)
.await?
{
memory_catalog_manager.register_schema_sync(RegisterSchemaRequest {
catalog: table_ident.catalog.clone(),
schema: table_ident.schema.clone(),
})?;
}
let request = RegisterTableRequest {
catalog: table_ident.catalog.clone(),
schema: table_ident.schema.clone(),

View File

@@ -16,6 +16,7 @@ metrics-process = ["servers/metrics-process"]
[dependencies]
anymap = "1.0.0-beta.2"
async-trait.workspace = true
catalog = { path = "../catalog" }
clap = { version = "3.1", features = ["derive"] }
client = { path = "../client" }
@@ -30,6 +31,7 @@ common-telemetry = { path = "../common/telemetry", features = [
config = "0.13"
datanode = { path = "../datanode" }
either = "1.8"
etcd-client.workspace = true
frontend = { path = "../frontend" }
futures.workspace = true
meta-client = { path = "../meta-client" }

View File

@@ -15,21 +15,33 @@
mod cmd;
mod helper;
mod repl;
mod upgrade;
use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging::LoggingOptions;
pub use repl::Repl;
use upgrade::UpgradeCommand;
use crate::error::Result;
use crate::options::{Options, TopLevelOptions};
pub struct Instance {
repl: Repl,
#[async_trait]
pub trait Tool {
async fn do_work(&self) -> Result<()>;
}
pub enum Instance {
Repl(Repl),
Tool(Box<dyn Tool>),
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.repl.run().await
match self {
Instance::Repl(repl) => repl.run().await,
Instance::Tool(tool) => tool.do_work().await,
}
}
pub async fn stop(&self) -> Result<()> {
@@ -63,12 +75,14 @@ impl Command {
#[derive(Parser)]
enum SubCommand {
Attach(AttachCommand),
Upgrade(UpgradeCommand),
}
impl SubCommand {
async fn build(self) -> Result<Instance> {
match self {
SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build().await,
}
}
}
@@ -86,7 +100,7 @@ pub(crate) struct AttachCommand {
impl AttachCommand {
async fn build(self) -> Result<Instance> {
let repl = Repl::try_new(&self).await?;
Ok(Instance { repl })
Ok(Instance::Repl(repl))
}
}

165
src/cmd/src/cli/upgrade.rs Normal file
View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use catalog::helper::TableGlobalValue;
use clap::Parser;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue};
use common_meta::key::TableMetaKey;
use common_meta::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
use common_telemetry::info;
use etcd_client::Client;
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::KvStoreRef;
use snafu::ResultExt;
use crate::cli::{Instance, Tool};
use crate::error::{ConnectEtcdSnafu, Result};
#[derive(Debug, Default, Parser)]
pub struct UpgradeCommand {
#[clap(long)]
etcd_addr: String,
#[clap(long)]
dryrun: bool,
}
impl UpgradeCommand {
pub async fn build(&self) -> Result<Instance> {
let client = Client::connect([&self.etcd_addr], None)
.await
.context(ConnectEtcdSnafu {
etcd_addr: &self.etcd_addr,
})?;
let tool = MigrateTableMetadata {
etcd_store: EtcdStore::with_etcd_client(client),
dryrun: self.dryrun,
};
Ok(Instance::Tool(Box::new(tool)))
}
}
struct MigrateTableMetadata {
etcd_store: KvStoreRef,
dryrun: bool,
}
#[async_trait]
impl Tool for MigrateTableMetadata {
async fn do_work(&self) -> Result<()> {
let req = RangeRequest::new().with_prefix(b"__tg".to_vec());
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));
self.create_table_name_key(&value).await;
self.create_datanode_table_keys(&value).await;
self.split_table_global_value(&key, value).await;
}
Ok(())
}
}
impl MigrateTableMetadata {
async fn split_table_global_value(&self, key: &str, value: TableGlobalValue) {
let table_id = value.table_id();
let region_distribution: RegionDistribution = value.regions_id_map.into_iter().collect();
let table_info_key = TableInfoKey::new(table_id);
let table_info_value = TableInfoValue::new(value.table_info);
let table_region_key = TableRegionKey::new(table_id);
let table_region_value = TableRegionValue::new(region_distribution);
info!("Splitting TableGlobalKey '{key}' into '{table_info_key}' and '{table_region_key}'");
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store
.batch_put(
BatchPutRequest::new()
.add_kv(
table_info_key.as_raw_key(),
table_info_value.try_as_raw_value().unwrap(),
)
.add_kv(
table_region_key.as_raw_key(),
table_region_value.try_as_raw_value().unwrap(),
),
)
.await
.unwrap();
}
}
async fn create_table_name_key(&self, value: &TableGlobalValue) {
let table_info = &value.table_info;
let table_id = value.table_id();
let table_name_key = TableNameKey::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
);
let table_name_value = TableNameValue::new(table_id);
info!("Creating '{table_name_key}' => {table_id}");
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store
.put(
PutRequest::new()
.with_key(table_name_key.as_raw_key())
.with_value(table_name_value.try_as_raw_value().unwrap()),
)
.await
.unwrap();
}
}
async fn create_datanode_table_keys(&self, value: &TableGlobalValue) {
let table_id = value.table_id();
let region_distribution: RegionDistribution =
value.regions_id_map.clone().into_iter().collect();
let datanode_table_kvs = region_distribution
.into_iter()
.map(|(datanode_id, regions)| {
let k = DatanodeTableKey::new(datanode_id, table_id);
info!("Creating DatanodeTableKey '{k}' => {regions:?}");
(k, DatanodeTableValue::new(table_id, regions))
})
.collect::<Vec<_>>();
if self.dryrun {
info!("Dryrun: do nothing");
} else {
let mut req = BatchPutRequest::new();
for (key, value) in datanode_table_kvs {
req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap());
}
self.etcd_store.batch_put(req).await.unwrap();
}
}
}

View File

@@ -154,6 +154,13 @@ pub enum Error {
location: Location,
source: catalog::error::Error,
},
#[snafu(display("Failed to connect to Etcd at {etcd_addr}, source: {}", source))]
ConnectEtcd {
etcd_addr: String,
source: etcd_client::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -173,7 +180,9 @@ impl ErrorExt for Error {
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
| Error::IllegalAuthConfig { .. }
| Error::ConnectEtcd { .. } => StatusCode::InvalidArguments,
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }

View File

@@ -49,13 +49,13 @@ mod table_route;
use std::sync::Arc;
use datanode_table::{DatanodeTableManager, DatanodeTableValue};
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
use snafu::ResultExt;
use table_info::{TableInfoManager, TableInfoValue};
use table_name::{TableNameManager, TableNameValue};
use table_region::{TableRegionManager, TableRegionValue};
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use table_region::{TableRegionKey, TableRegionManager, TableRegionValue};
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
@@ -126,6 +126,25 @@ impl TableMetadataManager {
}
}
macro_rules! impl_table_meta_key {
( $($val_ty: ty), *) => {
$(
impl std::fmt::Display for $val_ty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key()))
}
}
)*
}
}
impl_table_meta_key!(
TableNameKey<'_>,
TableInfoKey,
TableRegionKey,
DatanodeTableKey
);
macro_rules! impl_table_meta_value {
( $($val_ty: ty), *) => {
$(

View File

@@ -27,13 +27,13 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest};
use crate::DatanodeId;
struct DatanodeTableKey {
pub struct DatanodeTableKey {
datanode_id: DatanodeId,
table_id: TableId,
}
impl DatanodeTableKey {
fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
Self {
datanode_id,
table_id,
@@ -81,7 +81,7 @@ pub struct DatanodeTableValue {
}
impl DatanodeTableValue {
fn new(table_id: TableId, regions: Vec<RegionNumber>) -> Self {
pub fn new(table_id: TableId, regions: Vec<RegionNumber>) -> Self {
Self {
table_id,
regions,

View File

@@ -46,6 +46,15 @@ pub struct TableInfoValue {
version: u64,
}
impl TableInfoValue {
pub fn new(table_info: RawTableInfo) -> Self {
Self {
table_info,
version: 0,
}
}
}
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}

View File

@@ -123,7 +123,7 @@ pub struct TableNameValue {
}
impl TableNameValue {
fn new(table_id: TableId) -> Self {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}

View File

@@ -52,6 +52,15 @@ pub struct TableRegionValue {
version: u64,
}
impl TableRegionValue {
pub fn new(region_distribution: RegionDistribution) -> Self {
Self {
region_distribution,
version: 0,
}
}
}
pub struct TableRegionManager {
kv_backend: KvBackendRef,
}

View File

@@ -166,7 +166,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
.await
.context(error::ConnectEtcdSnafu)?;
(
EtcdStore::with_etcd_client(etcd_client.clone())?,
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)

View File

@@ -57,11 +57,11 @@ impl EtcdStore {
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(client)
Ok(Self::with_etcd_client(client))
}
pub fn with_etcd_client(client: Client) -> Result<KvStoreRef> {
Ok(Arc::new(Self { client }))
pub fn with_etcd_client(client: Client) -> KvStoreRef {
Arc::new(Self { client })
}
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {