From 49cb4da6d277eff5ae7cf90b4eb15a82966e4180 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 17 Jun 2025 21:53:59 +0800 Subject: [PATCH] feat: introduce CLI tool for repairing logical table metadata (#6322) * feat: introduce logical table metadata repair cli tool Signed-off-by: WenyXu * chore: deps Signed-off-by: WenyXu * refactor: flatten doctor module structure Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 2 + src/cli/Cargo.toml | 2 + src/cli/src/error.rs | 50 ++- src/cli/src/metadata.rs | 9 +- src/cli/src/metadata/control.rs | 3 +- src/cli/src/metadata/control/get.rs | 2 +- src/cli/src/metadata/repair.rs | 369 ++++++++++++++++++ src/cli/src/metadata/repair/alter_table.rs | 85 ++++ src/cli/src/metadata/repair/create_table.rs | 89 +++++ src/cli/src/metadata/utils.rs | 178 +++++++++ src/common/meta/Cargo.toml | 2 +- .../meta/src/ddl/alter_logical_tables.rs | 1 + .../alter_logical_tables/region_request.rs | 47 +-- .../meta/src/ddl/create_logical_tables.rs | 1 + .../create_logical_tables/region_request.rs | 31 +- src/common/meta/src/ddl/create_table.rs | 7 +- .../meta/src/ddl/create_table_template.rs | 6 +- 17 files changed, 828 insertions(+), 56 deletions(-) create mode 100644 src/cli/src/metadata/repair.rs create mode 100644 src/cli/src/metadata/repair/alter_table.rs create mode 100644 src/cli/src/metadata/repair/create_table.rs create mode 100644 src/cli/src/metadata/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 5f703e2c94..76f85c88b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,6 +1950,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" name = "cli" version = "0.15.0" dependencies = [ + "async-stream", "async-trait", "auth", "base64 0.22.1", @@ -1982,6 +1983,7 @@ dependencies = [ "meta-srv", "nu-ansi-term", "object-store", + "operator", "query", "rand 0.9.0", "reqwest", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index bf2c449c49..204817465c 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -16,6 +16,7 @@ mysql_kvbackend = ["common-meta/mysql_kvbackend", "meta-srv/mysql_kvbackend"] workspace = true [dependencies] +async-stream.workspace = true async-trait.workspace = true auth.workspace = true base64.workspace = true @@ -50,6 +51,7 @@ meta-client.workspace = true meta-srv.workspace = true nu-ansi-term = "0.46" object-store.workspace = true +operator.workspace = true query.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 0f01973d0b..0ee6d55b30 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_meta::peer::Peer; use object_store::Error as ObjectStoreError; use snafu::{Location, Snafu}; @@ -73,6 +74,20 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to get table metadata"))] + TableMetadata { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Unexpected error: {}", msg))] + Unexpected { + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Missing config, msg: {}", msg))] MissingConfig { msg: String, @@ -267,6 +282,29 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to init backend"))] + InitBackend { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: ObjectStoreError, + }, + + #[snafu(display("Covert column schemas to defs failed"))] + CovertColumnSchemasToDefs { + #[snafu(implicit)] + location: Location, + source: operator::error::Error, + }, + + #[snafu(display("Failed to send request to datanode: {}", peer))] + SendRequestToDatanode { + peer: Peer, + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, } pub type Result = std::result::Result; @@ -274,9 +312,9 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { - source.status_code() - } + Error::InitMetadata { source, .. } + | Error::InitDdlManager { source, .. } + | Error::TableMetadata { source, .. } => source.status_code(), Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } @@ -290,6 +328,9 @@ impl ErrorExt for Error { | Error::InvalidArguments { .. } | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, + Error::CovertColumnSchemasToDefs { source, .. } => source.status_code(), + Error::SendRequestToDatanode { source, .. } => source.status_code(), + Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), Error::StartWalOptionsAllocator { source, .. } => source.status_code(), @@ -297,6 +338,7 @@ impl ErrorExt for Error { Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => { source.status_code() } + Error::Unexpected { .. } => StatusCode::Unexpected, Error::SerdeJson { .. } | Error::FileIo { .. } @@ -305,7 +347,7 @@ impl ErrorExt for Error { | Error::BuildClient { .. } => StatusCode::Unexpected, Error::Other { source, .. } => source.status_code(), - Error::OpenDal { .. } => StatusCode::Internal, + Error::OpenDal { .. } | Error::InitBackend { .. } => StatusCode::Internal, Error::S3ConfigNotSet { .. } | Error::OutputDirNotSet { .. } | Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments, diff --git a/src/cli/src/metadata.rs b/src/cli/src/metadata.rs index 6a6a5ae3f6..f8ec28414a 100644 --- a/src/cli/src/metadata.rs +++ b/src/cli/src/metadata.rs @@ -14,22 +14,28 @@ mod common; mod control; +mod repair; mod snapshot; +mod utils; use clap::Subcommand; use common_error::ext::BoxedError; use crate::metadata::control::ControlCommand; +use crate::metadata::repair::RepairLogicalTablesCommand; use crate::metadata::snapshot::SnapshotCommand; use crate::Tool; -/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots. +/// Command for managing metadata operations, +/// including saving and restoring metadata snapshots, +/// controlling metadata operations, and diagnosing and repairing metadata. #[derive(Subcommand)] pub enum MetadataCommand { #[clap(subcommand)] Snapshot(SnapshotCommand), #[clap(subcommand)] Control(ControlCommand), + RepairLogicalTables(RepairLogicalTablesCommand), } impl MetadataCommand { @@ -37,6 +43,7 @@ impl MetadataCommand { match self { MetadataCommand::Snapshot(cmd) => cmd.build().await, MetadataCommand::Control(cmd) => cmd.build().await, + MetadataCommand::RepairLogicalTables(cmd) => cmd.build().await, } } } diff --git a/src/cli/src/metadata/control.rs b/src/cli/src/metadata/control.rs index 573bffda2c..3d10093b04 100644 --- a/src/cli/src/metadata/control.rs +++ b/src/cli/src/metadata/control.rs @@ -21,10 +21,9 @@ use get::GetCommand; use crate::Tool; -/// Subcommand for metadata control. +/// Subcommand for metadata control, including getting metadata from metadata store #[derive(Subcommand)] pub enum ControlCommand { - /// Get the metadata from the metasrv. #[clap(subcommand)] Get(GetCommand), } diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs index 89b3bdc1c8..875512689d 100644 --- a/src/cli/src/metadata/control/get.rs +++ b/src/cli/src/metadata/control/get.rs @@ -33,7 +33,7 @@ use crate::metadata::common::StoreConfig; use crate::metadata::control::utils::{decode_key_value, json_fromatter}; use crate::Tool; -/// Subcommand for get command. +/// Getting metadata from metadata store. #[derive(Subcommand)] pub enum GetCommand { Key(GetKeyCommand), diff --git a/src/cli/src/metadata/repair.rs b/src/cli/src/metadata/repair.rs new file mode 100644 index 0000000000..8e853956a0 --- /dev/null +++ b/src/cli/src/metadata/repair.rs @@ -0,0 +1,369 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod alter_table; +mod create_table; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use clap::Parser; +use client::api::v1::CreateTableExpr; +use client::client_manager::NodeClients; +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_grpc::channel_manager::ChannelConfig; +use common_meta::error::Error as CommonMetaError; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::{find_leaders, RegionRoute}; +use common_telemetry::{error, info, warn}; +use futures::TryStreamExt; +use snafu::{ensure, ResultExt}; +use store_api::storage::TableId; + +use crate::error::{ + InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu, +}; +use crate::metadata::common::StoreConfig; +use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator}; +use crate::Tool; + +/// Repair metadata of logical tables. +#[derive(Debug, Default, Parser)] +pub struct RepairLogicalTablesCommand { + /// The names of the tables to repair. + #[clap(long, value_delimiter = ',')] + table_names: Vec, + + /// The id of the table to repair. + #[clap(long, value_delimiter = ',')] + table_ids: Vec, + + /// The schema of the tables to repair. + #[clap(long, default_value = DEFAULT_SCHEMA_NAME)] + schema_name: String, + + /// The catalog of the tables to repair. + #[clap(long, default_value = DEFAULT_CATALOG_NAME)] + catalog_name: String, + + /// Whether to fail fast if any repair operation fails. + #[clap(long)] + fail_fast: bool, + + #[clap(flatten)] + store: StoreConfig, + + /// The timeout for the client to operate the datanode. + #[clap(long, default_value_t = 30)] + client_timeout_secs: u64, + + /// The timeout for the client to connect to the datanode. + #[clap(long, default_value_t = 3)] + client_connect_timeout_secs: u64, +} + +impl RepairLogicalTablesCommand { + fn validate(&self) -> Result<()> { + ensure!( + !self.table_names.is_empty() || !self.table_ids.is_empty(), + InvalidArgumentsSnafu { + msg: "You must specify --table-names or --table-ids.", + } + ); + Ok(()) + } +} + +impl RepairLogicalTablesCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + self.validate().map_err(BoxedError::new)?; + let kv_backend = self.store.build().await?; + let node_client_channel_config = ChannelConfig::new() + .timeout(Duration::from_secs(self.client_timeout_secs)) + .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs)); + let node_manager = Arc::new(NodeClients::new(node_client_channel_config)); + + Ok(Box::new(RepairTool { + table_names: self.table_names.clone(), + table_ids: self.table_ids.clone(), + schema_name: self.schema_name.clone(), + catalog_name: self.catalog_name.clone(), + fail_fast: self.fail_fast, + kv_backend, + node_manager, + })) + } +} + +struct RepairTool { + table_names: Vec, + table_ids: Vec, + schema_name: String, + catalog_name: String, + fail_fast: bool, + kv_backend: KvBackendRef, + node_manager: NodeManagerRef, +} + +#[async_trait] +impl Tool for RepairTool { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + self.repair_tables().await.map_err(BoxedError::new) + } +} + +impl RepairTool { + fn generate_iterator_input(&self) -> Result { + if !self.table_names.is_empty() { + let table_names = &self.table_names; + let catalog = &self.catalog_name; + let schema_name = &self.schema_name; + + let table_names = table_names + .iter() + .map(|table_name| { + ( + catalog.to_string(), + schema_name.to_string(), + table_name.to_string(), + ) + }) + .collect::>(); + return Ok(IteratorInput::new_table_names(table_names)); + } else if !self.table_ids.is_empty() { + return Ok(IteratorInput::new_table_ids(self.table_ids.clone())); + }; + + InvalidArgumentsSnafu { + msg: "You must specify --table-names or --table-id.", + } + .fail() + } + + async fn repair_tables(&self) -> Result<()> { + let input = self.generate_iterator_input()?; + let mut table_metadata_iterator = + Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream()); + let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone()); + + let mut skipped_table = 0; + let mut success_table = 0; + while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? { + let full_table_name = full_table_metadata.full_table_name(); + if !full_table_metadata.is_metric_engine() { + warn!( + "Skipping repair for non-metric engine table: {}", + full_table_name + ); + skipped_table += 1; + continue; + } + + if full_table_metadata.is_physical_table() { + warn!("Skipping repair for physical table: {}", full_table_name); + skipped_table += 1; + continue; + } + + let (physical_table_id, physical_table_route) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(full_table_metadata.table_id) + .await + .context(TableMetadataSnafu)?; + + if let Err(err) = self + .repair_table( + &full_table_metadata, + physical_table_id, + &physical_table_route.region_routes, + ) + .await + { + error!( + err; + "Failed to repair table: {}, skipped table: {}", + full_table_name, + skipped_table, + ); + + if self.fail_fast { + return Err(err); + } + } else { + success_table += 1; + } + } + + info!( + "Repair logical tables result: {} tables repaired, {} tables skipped", + success_table, skipped_table + ); + + Ok(()) + } + + async fn alter_table_on_datanodes( + &self, + full_table_metadata: &FullTableMetadata, + physical_region_routes: &[RegionRoute], + ) -> Result> { + let logical_table_id = full_table_metadata.table_id; + let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns( + &full_table_metadata.table_info, + )?; + let node_manager = self.node_manager.clone(); + + let mut failed_peers = Vec::new(); + info!( + "Sending alter table requests to all datanodes for table: {}, number of regions:{}.", + full_table_metadata.full_table_name(), + physical_region_routes.len() + ); + let leaders = find_leaders(physical_region_routes); + for peer in &leaders { + let alter_table_request = alter_table::make_alter_region_request_for_peer( + logical_table_id, + &alter_table_expr, + full_table_metadata.table_info.ident.version, + peer, + physical_region_routes, + )?; + let datanode = node_manager.datanode(peer).await; + if let Err(err) = datanode.handle(alter_table_request).await { + failed_peers.push((peer.clone(), err)); + } + } + + Ok(failed_peers) + } + + async fn create_table_on_datanode( + &self, + create_table_expr: &CreateTableExpr, + logical_table_id: TableId, + physical_table_id: TableId, + peer: &Peer, + physical_region_routes: &[RegionRoute], + ) -> Result<()> { + let node_manager = self.node_manager.clone(); + let datanode = node_manager.datanode(peer).await; + let create_table_request = create_table::make_create_region_request_for_peer( + logical_table_id, + physical_table_id, + create_table_expr, + peer, + physical_region_routes, + )?; + + datanode + .handle(create_table_request) + .await + .with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?; + + Ok(()) + } + + async fn repair_table( + &self, + full_table_metadata: &FullTableMetadata, + physical_table_id: TableId, + physical_region_routes: &[RegionRoute], + ) -> Result<()> { + let full_table_name = full_table_metadata.full_table_name(); + // First we sends alter table requests to all datanodes with all columns. + let failed_peers = self + .alter_table_on_datanodes(full_table_metadata, physical_region_routes) + .await?; + + if failed_peers.is_empty() { + info!( + "All alter table requests sent successfully for table: {}", + full_table_name + ); + return Ok(()); + } + warn!( + "Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}", + full_table_name, + failed_peers.iter().map(|(peer, _)| peer.id).collect::>() + ); + + let create_table_expr = + create_table::generate_create_table_expr(&full_table_metadata.table_info)?; + + let mut errors = Vec::new(); + for (peer, err) in failed_peers { + if err.status_code() != StatusCode::RegionNotFound { + error!( + err; + "Sending alter table requests to datanode: {} for table: {} failed", + peer.id, + full_table_name, + ); + continue; + } + info!( + "Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode", + full_table_name, + peer.id + ); + + // If the alter table request fails for any datanode, we attempt to create the table on that datanode + // as a fallback mechanism to ensure table consistency across the cluster. + if let Err(err) = self + .create_table_on_datanode( + &create_table_expr, + full_table_metadata.table_id, + physical_table_id, + &peer, + physical_region_routes, + ) + .await + { + error!( + err; + "Failed to create table on datanode: {} for table: {}", + peer.id, full_table_name + ); + errors.push(err); + if self.fail_fast { + break; + } + } else { + info!( + "Created table on datanode: {} for table: {}", + peer.id, full_table_name + ); + } + } + + if !errors.is_empty() { + return UnexpectedSnafu { + msg: format!( + "Failed to create table on datanodes for table: {}", + full_table_name, + ), + } + .fail(); + } + + Ok(()) + } +} diff --git a/src/cli/src/metadata/repair/alter_table.rs b/src/cli/src/metadata/repair/alter_table.rs new file mode 100644 index 0000000000..adfdd95ef7 --- /dev/null +++ b/src/cli/src/metadata/repair/alter_table.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use client::api::v1::alter_table_expr::Kind; +use client::api::v1::region::{region_request, AlterRequests, RegionRequest, RegionRequestHeader}; +use client::api::v1::{AddColumn, AddColumns, AlterTableExpr}; +use common_meta::ddl::alter_logical_tables::make_alter_region_request; +use common_meta::peer::Peer; +use common_meta::rpc::router::{find_leader_regions, RegionRoute}; +use operator::expr_helper::column_schemas_to_defs; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; + +use crate::error::{CovertColumnSchemasToDefsSnafu, Result}; + +/// Generates alter table expression for all columns. +pub fn generate_alter_table_expr_for_all_columns( + table_info: &RawTableInfo, +) -> Result { + let schema = &table_info.meta.schema; + + let mut alter_table_expr = AlterTableExpr { + catalog_name: table_info.catalog_name.to_string(), + schema_name: table_info.schema_name.to_string(), + table_name: table_info.name.to_string(), + ..Default::default() + }; + + let primary_keys = table_info + .meta + .primary_key_indices + .iter() + .map(|i| schema.column_schemas[*i].name.clone()) + .collect::>(); + + let add_columns = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys) + .context(CovertColumnSchemasToDefsSnafu)?; + + alter_table_expr.kind = Some(Kind::AddColumns(AddColumns { + add_columns: add_columns + .into_iter() + .map(|col| AddColumn { + column_def: Some(col), + location: None, + add_if_not_exists: true, + }) + .collect(), + })); + + Ok(alter_table_expr) +} + +/// Makes an alter region request for a peer. +pub fn make_alter_region_request_for_peer( + logical_table_id: TableId, + alter_table_expr: &AlterTableExpr, + schema_version: u64, + peer: &Peer, + region_routes: &[RegionRoute], +) -> Result { + let regions_on_this_peer = find_leader_regions(region_routes, peer); + let mut requests = Vec::with_capacity(regions_on_this_peer.len()); + for region_number in ®ions_on_this_peer { + let region_id = RegionId::new(logical_table_id, *region_number); + let request = make_alter_region_request(region_id, alter_table_expr, schema_version); + requests.push(request); + } + + Ok(RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Alters(AlterRequests { requests })), + }) +} diff --git a/src/cli/src/metadata/repair/create_table.rs b/src/cli/src/metadata/repair/create_table.rs new file mode 100644 index 0000000000..a2cda030d0 --- /dev/null +++ b/src/cli/src/metadata/repair/create_table.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use client::api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use client::api::v1::CreateTableExpr; +use common_meta::ddl::create_logical_tables::create_region_request_builder; +use common_meta::ddl::utils::region_storage_path; +use common_meta::peer::Peer; +use common_meta::rpc::router::{find_leader_regions, RegionRoute}; +use operator::expr_helper::column_schemas_to_defs; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; + +use crate::error::{CovertColumnSchemasToDefsSnafu, Result}; + +/// Generates a `CreateTableExpr` from a `RawTableInfo`. +pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result { + let schema = &table_info.meta.schema; + let primary_keys = table_info + .meta + .primary_key_indices + .iter() + .map(|i| schema.column_schemas[*i].name.clone()) + .collect::>(); + + let timestamp_index = schema.timestamp_index.as_ref().unwrap(); + let time_index = schema.column_schemas[*timestamp_index].name.clone(); + let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys) + .context(CovertColumnSchemasToDefsSnafu)?; + let table_options = HashMap::from(&table_info.meta.options); + + Ok(CreateTableExpr { + catalog_name: table_info.catalog_name.to_string(), + schema_name: table_info.schema_name.to_string(), + table_name: table_info.name.to_string(), + desc: String::default(), + column_defs, + time_index, + primary_keys, + create_if_not_exists: true, + table_options, + table_id: None, + engine: table_info.meta.engine.to_string(), + }) +} + +/// Makes a create region request for a peer. +pub fn make_create_region_request_for_peer( + logical_table_id: TableId, + physical_table_id: TableId, + create_table_expr: &CreateTableExpr, + peer: &Peer, + region_routes: &[RegionRoute], +) -> Result { + let regions_on_this_peer = find_leader_regions(region_routes, peer); + let mut requests = Vec::with_capacity(regions_on_this_peer.len()); + let request_builder = + create_region_request_builder(create_table_expr, physical_table_id).unwrap(); + + let catalog = &create_table_expr.catalog_name; + let schema = &create_table_expr.schema_name; + let storage_path = region_storage_path(catalog, schema); + + for region_number in ®ions_on_this_peer { + let region_id = RegionId::new(logical_table_id, *region_number); + let region_request = + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new()); + requests.push(region_request); + } + + Ok(RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Creates(CreateRequests { requests })), + }) +} diff --git a/src/cli/src/metadata/utils.rs b/src/cli/src/metadata/utils.rs new file mode 100644 index 0000000000..e4c89b2342 --- /dev/null +++ b/src/cli/src/metadata/utils.rs @@ -0,0 +1,178 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use async_stream::try_stream; +use common_catalog::consts::METRIC_ENGINE; +use common_catalog::format_full_table_name; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use futures::Stream; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; +use table::metadata::RawTableInfo; + +use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu}; + +/// The input for the iterator. +pub enum IteratorInput { + TableIds(VecDeque), + TableNames(VecDeque<(String, String, String)>), +} + +impl IteratorInput { + /// Creates a new iterator input from a list of table ids. + pub fn new_table_ids(table_ids: Vec) -> Self { + Self::TableIds(table_ids.into()) + } + + /// Creates a new iterator input from a list of table names. + pub fn new_table_names(table_names: Vec<(String, String, String)>) -> Self { + Self::TableNames(table_names.into()) + } +} + +/// An iterator for retrieving table metadata from the metadata store. +/// +/// This struct provides functionality to iterate over table metadata based on +/// either [`TableId`] and their associated regions or fully qualified table names. +pub struct TableMetadataIterator { + input: IteratorInput, + table_metadata_manager: TableMetadataManager, +} + +/// The full table metadata. +pub struct FullTableMetadata { + pub table_id: TableId, + pub table_info: RawTableInfo, + pub table_route: TableRouteValue, +} + +impl FullTableMetadata { + /// Returns true if it's [TableRouteValue::Physical]. + pub fn is_physical_table(&self) -> bool { + self.table_route.is_physical() + } + + /// Returns true if it's a metric engine table. + pub fn is_metric_engine(&self) -> bool { + self.table_info.meta.engine == METRIC_ENGINE + } + + /// Returns the full table name. + pub fn full_table_name(&self) -> String { + format_full_table_name( + &self.table_info.catalog_name, + &self.table_info.schema_name, + &self.table_info.name, + ) + } +} + +impl TableMetadataIterator { + pub fn new(kvbackend: KvBackendRef, input: IteratorInput) -> Self { + let table_metadata_manager = TableMetadataManager::new(kvbackend); + Self { + input, + table_metadata_manager, + } + } + + /// Returns the next table metadata. + /// + /// This method handles two types of inputs: + /// - TableIds: Returns metadata for a specific [`TableId`]. + /// - TableNames: Returns metadata for a table identified by its full name (catalog.schema.table). + /// + /// Returns `None` when there are no more tables to process. + pub async fn next(&mut self) -> Result> { + match &mut self.input { + IteratorInput::TableIds(table_ids) => { + if let Some(table_id) = table_ids.pop_front() { + let full_table_metadata = self.get_table_metadata(table_id).await?; + return Ok(Some(full_table_metadata)); + } + } + + IteratorInput::TableNames(table_names) => { + if let Some(full_table_name) = table_names.pop_front() { + let table_id = self.get_table_id_by_name(full_table_name).await?; + let full_table_metadata = self.get_table_metadata(table_id).await?; + return Ok(Some(full_table_metadata)); + } + } + } + + Ok(None) + } + + /// Converts the iterator into a stream of table metadata. + pub fn into_stream(mut self) -> impl Stream> { + try_stream!({ + while let Some(full_table_metadata) = self.next().await? { + yield full_table_metadata; + } + }) + } + + async fn get_table_id_by_name( + &mut self, + (catalog_name, schema_name, table_name): (String, String, String), + ) -> Result { + let key = TableNameKey::new(&catalog_name, &schema_name, &table_name); + let table_id = self + .table_metadata_manager + .table_name_manager() + .get(key) + .await + .context(TableMetadataSnafu)? + .with_context(|| UnexpectedSnafu { + msg: format!( + "Table not found: {}", + format_full_table_name(&catalog_name, &schema_name, &table_name) + ), + })? + .table_id(); + Ok(table_id) + } + + async fn get_table_metadata(&mut self, table_id: TableId) -> Result { + let (table_info, table_route) = self + .table_metadata_manager + .get_full_table_info(table_id) + .await + .context(TableMetadataSnafu)?; + + let table_info = table_info + .with_context(|| UnexpectedSnafu { + msg: format!("Table info not found for table id: {table_id}"), + })? + .into_inner() + .table_info; + let table_route = table_route + .with_context(|| UnexpectedSnafu { + msg: format!("Table route not found for table id: {table_id}"), + })? + .into_inner(); + + Ok(FullTableMetadata { + table_id, + table_info, + table_route, + }) + } +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 3cd26d7abb..5f009305f6 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -17,7 +17,7 @@ workspace = true anymap2 = "0.13.0" api.workspace = true async-recursion = "1.0" -async-stream = "0.3" +async-stream.workspace = true async-trait.workspace = true backon = { workspace = true, optional = true } base64.workspace = true diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 03e1bcbfec..68bbae6c41 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -25,6 +25,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{Context, LockKey, Procedure, Status}; use common_telemetry::{error, info, warn}; use futures_util::future; +pub use region_request::make_alter_region_request; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; diff --git a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs index f4e2dd7099..6bd1a12193 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs @@ -12,20 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1; use api::v1::alter_table_expr::Kind; use api::v1::region::{ alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests, RegionColumnDef, RegionRequest, RegionRequestHeader, }; +use api::v1::{self, AlterTableExpr}; use common_telemetry::tracing_context::TracingContext; use store_api::storage::RegionId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::error::Result; -use crate::key::table_info::TableInfoValue; use crate::peer::Peer; -use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, RegionRoute}; impl AlterLogicalTablesProcedure { @@ -62,34 +60,37 @@ impl AlterLogicalTablesProcedure { { for region_number in ®ions_on_this_peer { let region_id = RegionId::new(table.table_info.ident.table_id, *region_number); - let request = self.make_alter_region_request(region_id, task, table)?; + let request = make_alter_region_request( + region_id, + &task.alter_table, + table.table_info.ident.version, + ); requests.push(request); } } Ok(AlterRequests { requests }) } +} - fn make_alter_region_request( - &self, - region_id: RegionId, - task: &AlterTableTask, - table: &TableInfoValue, - ) -> Result { - let region_id = region_id.as_u64(); - let schema_version = table.table_info.ident.version; - let kind = match &task.alter_table.kind { - Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns( - to_region_add_columns(add_columns), - )), - _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks - }; +/// Makes an alter region request. +pub fn make_alter_region_request( + region_id: RegionId, + alter_table_expr: &AlterTableExpr, + schema_version: u64, +) -> AlterRequest { + let region_id = region_id.as_u64(); + let kind = match &alter_table_expr.kind { + Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns( + to_region_add_columns(add_columns), + )), + _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks + }; - Ok(AlterRequest { - region_id, - schema_version, - kind, - }) + AlterRequest { + region_id, + schema_version, + kind, } } diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 06bc247f89..d1f1ac37b6 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -25,6 +25,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::{debug, error, warn}; use futures::future; +pub use region_request::create_region_request_builder; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index 9448f67a14..ecf951cd06 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -15,16 +15,16 @@ use std::collections::HashMap; use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use api::v1::CreateTableExpr; use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, TableId}; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::region_storage_path; use crate::error::Result; use crate::peer::Peer; -use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, RegionRoute}; impl CreateLogicalTablesProcedure { @@ -45,13 +45,15 @@ impl CreateLogicalTablesProcedure { let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; let logical_table_id = task.table_info.ident.table_id; + let physical_table_id = self.data.physical_table_id; let storage_path = region_storage_path(catalog, schema); - let request_builder = self.create_region_request_builder(task)?; + let request_builder = + create_region_request_builder(&task.create_table, physical_table_id)?; for region_number in ®ions_on_this_peer { let region_id = RegionId::new(logical_table_id, *region_number); let one_region_request = - request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new()); requests.push(one_region_request); } } @@ -69,16 +71,13 @@ impl CreateLogicalTablesProcedure { body: Some(region_request::Body::Creates(CreateRequests { requests })), })) } - - fn create_region_request_builder( - &self, - task: &CreateTableTask, - ) -> Result { - let create_expr = &task.create_table; - let template = build_template(create_expr)?; - Ok(CreateRequestBuilder::new( - template, - Some(self.data.physical_table_id), - )) - } +} + +/// Creates a region request builder. +pub fn create_region_request_builder( + create_table_expr: &CreateTableExpr, + physical_table_id: TableId, +) -> Result { + let template = build_template(create_table_expr)?; + Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index ee0dfd6c18..1a62c8e716 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -218,11 +218,8 @@ impl CreateTableProcedure { let mut requests = Vec::with_capacity(regions.len()); for region_number in regions { let region_id = RegionId::new(self.table_id(), region_number); - let create_region_request = request_builder.build_one( - region_id, - storage_path.clone(), - region_wal_options, - )?; + let create_region_request = + request_builder.build_one(region_id, storage_path.clone(), region_wal_options); requests.push(PbRegionRequest::Create(create_region_request)); } diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table_template.rs index fefb47ce37..290fc33308 100644 --- a/src/common/meta/src/ddl/create_table_template.rs +++ b/src/common/meta/src/ddl/create_table_template.rs @@ -105,12 +105,12 @@ impl CreateRequestBuilder { &self.template } - pub(crate) fn build_one( + pub fn build_one( &self, region_id: RegionId, storage_path: String, region_wal_options: &HashMap, - ) -> Result { + ) -> CreateRequest { let mut request = self.template.clone(); request.region_id = region_id.as_u64(); @@ -130,6 +130,6 @@ impl CreateRequestBuilder { ); } - Ok(request) + request } }