From a858f552578d4f5f06226d1cd1c60faae63271c2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 9 Jul 2025 14:48:27 +0800 Subject: [PATCH] refactor(meta): separate validation and execution logic in alter logical tables procedure (#6478) * refactor(meta): separate validation and execution logic in alter logical tables procedure Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/cli/src/metadata/repair.rs | 1 - src/cli/src/metadata/repair/alter_table.rs | 3 +- .../meta/src/ddl/alter_logical_tables.rs | 147 +++++---- .../src/ddl/alter_logical_tables/check.rs | 136 --------- .../src/ddl/alter_logical_tables/executor.rs | 216 +++++++++++++ .../src/ddl/alter_logical_tables/metadata.rs | 158 ---------- .../alter_logical_tables/region_request.rs | 113 ------- .../alter_logical_tables/table_cache_keys.rs | 50 --- .../alter_logical_tables/update_metadata.rs | 31 +- .../src/ddl/alter_logical_tables/validator.rs | 284 ++++++++++++++++++ src/common/meta/src/ddl/utils.rs | 2 + src/common/meta/src/ddl/utils/table_id.rs | 46 +++ src/common/meta/src/ddl/utils/table_info.rs | 44 +++ src/common/meta/src/key/table_name.rs | 20 ++ 14 files changed, 717 insertions(+), 534 deletions(-) delete mode 100644 src/common/meta/src/ddl/alter_logical_tables/check.rs create mode 100644 src/common/meta/src/ddl/alter_logical_tables/executor.rs delete mode 100644 src/common/meta/src/ddl/alter_logical_tables/metadata.rs delete mode 100644 src/common/meta/src/ddl/alter_logical_tables/region_request.rs delete mode 100644 src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs create mode 100644 src/common/meta/src/ddl/alter_logical_tables/validator.rs create mode 100644 src/common/meta/src/ddl/utils/table_id.rs create mode 100644 src/common/meta/src/ddl/utils/table_info.rs diff --git a/src/cli/src/metadata/repair.rs b/src/cli/src/metadata/repair.rs index 212a52d6e1..98b2f8fee9 100644 --- a/src/cli/src/metadata/repair.rs +++ b/src/cli/src/metadata/repair.rs @@ -241,7 +241,6 @@ impl RepairTool { let alter_table_request = alter_table::make_alter_region_request_for_peer( logical_table_id, &alter_table_expr, - full_table_metadata.table_info.ident.version, peer, physical_region_routes, )?; diff --git a/src/cli/src/metadata/repair/alter_table.rs b/src/cli/src/metadata/repair/alter_table.rs index adfdd95ef7..53827d0b42 100644 --- a/src/cli/src/metadata/repair/alter_table.rs +++ b/src/cli/src/metadata/repair/alter_table.rs @@ -66,7 +66,6 @@ pub fn generate_alter_table_expr_for_all_columns( pub fn make_alter_region_request_for_peer( logical_table_id: TableId, alter_table_expr: &AlterTableExpr, - schema_version: u64, peer: &Peer, region_routes: &[RegionRoute], ) -> Result { @@ -74,7 +73,7 @@ pub fn make_alter_region_request_for_peer( let mut requests = Vec::with_capacity(regions_on_this_peer.len()); for region_number in ®ions_on_this_peer { let region_id = RegionId::new(logical_table_id, *region_number); - let request = make_alter_region_request(region_id, alter_table_expr, schema_version); + let request = make_alter_region_request(region_id, alter_table_expr); requests.push(request); } diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 61f4196b54..ca1d4bf5af 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -12,20 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod check; -mod metadata; -mod region_request; -mod table_cache_keys; +mod executor; mod update_metadata; +mod validator; use api::region::RegionResponse; use async_trait::async_trait; use common_catalog::format_full_table_name; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context, LockKey, Procedure, Status}; -use common_telemetry::{error, info, warn}; -use futures_util::future; -pub use region_request::make_alter_region_request; +use common_telemetry::{debug, error, info, warn}; +pub use executor::make_alter_region_request; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; @@ -33,10 +30,12 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; -use crate::ddl::utils::{ - add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error, - sync_follower_regions, +use crate::cache_invalidator::Context as CacheContext; +use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor; +use crate::ddl::alter_logical_tables::validator::{ + retain_unskipped, AlterLogicalTableValidator, ValidatorResult, }; +use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions}; use crate::ddl::DdlContext; use crate::error::Result; use crate::instruction::CacheIdent; @@ -46,13 +45,38 @@ use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::rpc::ddl::AlterTableTask; -use crate::rpc::router::{find_leaders, RegionRoute}; +use crate::rpc::router::RegionRoute; pub struct AlterLogicalTablesProcedure { pub context: DdlContext, pub data: AlterTablesData, } +/// Builds the validator from the [`AlterTablesData`]. +fn build_validator_from_alter_table_data<'a>( + data: &'a AlterTablesData, +) -> AlterLogicalTableValidator<'a> { + let phsycial_table_id = data.physical_table_id; + let alters = data + .tasks + .iter() + .map(|task| &task.alter_table) + .collect::>(); + AlterLogicalTableValidator::new(phsycial_table_id, alters) +} + +/// Builds the executor from the [`AlterTablesData`]. +fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> { + debug_assert_eq!(data.tasks.len(), data.table_info_values.len()); + let alters = data + .tasks + .iter() + .zip(data.table_info_values.iter()) + .map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table)) + .collect::>(); + AlterLogicalTablesExecutor::new(alters) +} + impl AlterLogicalTablesProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables"; @@ -82,35 +106,44 @@ impl AlterLogicalTablesProcedure { } pub(crate) async fn on_prepare(&mut self) -> Result { - // Checks all the tasks - self.check_input_tasks()?; - // Fills the table info values - self.fill_table_info_values().await?; - // Checks the physical table, must after [fill_table_info_values] - self.check_physical_table().await?; - // Fills the physical table info - self.fill_physical_table_info().await?; - // Filter the finished tasks - let finished_tasks = self.check_finished_tasks()?; - let already_finished_count = finished_tasks - .iter() - .map(|x| if *x { 1 } else { 0 }) - .sum::(); - let apply_tasks_count = self.data.tasks.len(); - if already_finished_count == apply_tasks_count { + let validator = build_validator_from_alter_table_data(&self.data); + let ValidatorResult { + num_skipped, + skip_alter, + table_info_values, + physical_table_info, + physical_table_route, + } = validator + .validate(&self.context.table_metadata_manager) + .await?; + + let num_tasks = self.data.tasks.len(); + if num_skipped == num_tasks { info!("All the alter tasks are finished, will skip the procedure."); + let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys( + &physical_table_info, + &table_info_values + .iter() + .map(|v| v.get_inner_ref()) + .collect::>(), + ); + self.data.table_cache_keys_to_invalidate = cache_ident_keys; // Re-invalidate the table cache self.data.state = AlterTablesState::InvalidateTableCache; return Ok(Status::executing(true)); - } else if already_finished_count > 0 { + } else if num_skipped > 0 { info!( "There are {} alter tasks, {} of them were already finished.", - apply_tasks_count, already_finished_count + num_tasks, num_skipped ); } - self.filter_task(&finished_tasks)?; - // Next state + // Updates the procedure state. + retain_unskipped(&mut self.data.tasks, &skip_alter); + self.data.physical_table_info = Some(physical_table_info); + self.data.physical_table_route = Some(physical_table_route); + self.data.table_info_values = table_info_values; + debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len()); self.data.state = AlterTablesState::SubmitAlterRegionRequests; Ok(Status::executing(true)) } @@ -118,25 +151,13 @@ impl AlterLogicalTablesProcedure { pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result { // Safety: we have checked the state in on_prepare let physical_table_route = &self.data.physical_table_route.as_ref().unwrap(); - let leaders = find_leaders(&physical_table_route.region_routes); - let mut alter_region_tasks = Vec::with_capacity(leaders.len()); - - for peer in leaders { - let requester = self.context.node_manager.datanode(&peer).await; - let request = self.make_request(&peer, &physical_table_route.region_routes)?; - - alter_region_tasks.push(async move { - requester - .handle(request) - .await - .map_err(add_peer_context_if_needed(peer)) - }); - } - - let mut results = future::join_all(alter_region_tasks) - .await - .into_iter() - .collect::>>()?; + let executor = build_executor_from_alter_expr(&self.data); + let mut results = executor + .on_alter_regions( + &self.context.node_manager, + &physical_table_route.region_routes, + ) + .await?; if let Some(column_metadatas) = extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)? @@ -177,7 +198,18 @@ impl AlterLogicalTablesProcedure { self.update_physical_table_metadata().await?; self.update_logical_tables_metadata().await?; - self.data.build_cache_keys_to_invalidate(); + let logical_table_info_values = self + .data + .table_info_values + .iter() + .map(|v| v.get_inner_ref()) + .collect::>(); + + let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys( + self.data.physical_table_info.as_ref().unwrap(), + &logical_table_info_values, + ); + self.data.table_cache_keys_to_invalidate = cache_ident_keys; self.data.clear_metadata_fields(); self.data.state = AlterTablesState::InvalidateTableCache; @@ -187,9 +219,16 @@ impl AlterLogicalTablesProcedure { pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result { let to_invalidate = &self.data.table_cache_keys_to_invalidate; + let ctx = CacheContext { + subject: Some(format!( + "Invalidate table cache by altering logical tables, physical_table_id: {}", + self.data.physical_table_id, + )), + }; + self.context .cache_invalidator - .invalidate(&Default::default(), to_invalidate) + .invalidate(&ctx, to_invalidate) .await?; Ok(Status::done()) } @@ -209,6 +248,10 @@ impl Procedure for AlterLogicalTablesProcedure { let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE .with_label_values(&[step]) .start_timer(); + debug!( + "Executing alter logical tables procedure, state: {:?}", + state + ); match state { AlterTablesState::Prepare => self.on_prepare().await, diff --git a/src/common/meta/src/ddl/alter_logical_tables/check.rs b/src/common/meta/src/ddl/alter_logical_tables/check.rs deleted file mode 100644 index a80ef3cd8c..0000000000 --- a/src/common/meta/src/ddl/alter_logical_tables/check.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use api::v1::alter_table_expr::Kind; -use snafu::{ensure, OptionExt}; - -use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result}; -use crate::key::table_info::TableInfoValue; -use crate::key::table_route::TableRouteValue; -use crate::rpc::ddl::AlterTableTask; - -impl AlterLogicalTablesProcedure { - pub(crate) fn check_input_tasks(&self) -> Result<()> { - self.check_schema()?; - self.check_alter_kind()?; - Ok(()) - } - - pub(crate) async fn check_physical_table(&self) -> Result<()> { - let table_route_manager = self.context.table_metadata_manager.table_route_manager(); - let table_ids = self - .data - .table_info_values - .iter() - .map(|v| v.table_info.ident.table_id) - .collect::>(); - let table_routes = table_route_manager - .table_route_storage() - .batch_get(&table_ids) - .await?; - let physical_table_id = self.data.physical_table_id; - let is_same_physical_table = table_routes.iter().all(|r| { - if let Some(TableRouteValue::Logical(r)) = r { - r.physical_table_id() == physical_table_id - } else { - false - } - }); - - ensure!( - is_same_physical_table, - AlterLogicalTablesInvalidArgumentsSnafu { - err_msg: "All the tasks should have the same physical table id" - } - ); - - Ok(()) - } - - pub(crate) fn check_finished_tasks(&self) -> Result> { - let task = &self.data.tasks; - let table_info_values = &self.data.table_info_values; - - Ok(task - .iter() - .zip(table_info_values.iter()) - .map(|(task, table)| Self::check_finished_task(task, table)) - .collect()) - } - - // Checks if the schemas of the tasks are the same - fn check_schema(&self) -> Result<()> { - let is_same_schema = self.data.tasks.windows(2).all(|pair| { - pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name - && pair[0].alter_table.schema_name == pair[1].alter_table.schema_name - }); - - ensure!( - is_same_schema, - AlterLogicalTablesInvalidArgumentsSnafu { - err_msg: "Schemas of the tasks are not the same" - } - ); - - Ok(()) - } - - fn check_alter_kind(&self) -> Result<()> { - for task in &self.data.tasks { - let kind = task.alter_table.kind.as_ref().context( - AlterLogicalTablesInvalidArgumentsSnafu { - err_msg: "Alter kind is missing", - }, - )?; - let Kind::AddColumns(_) = kind else { - return AlterLogicalTablesInvalidArgumentsSnafu { - err_msg: "Only support add columns operation", - } - .fail(); - }; - } - - Ok(()) - } - - fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool { - let columns = table - .table_info - .meta - .schema - .column_schemas - .iter() - .map(|c| &c.name) - .collect::>(); - - let Some(kind) = task.alter_table.kind.as_ref() else { - return true; // Never get here since we have checked it in `check_alter_kind` - }; - let Kind::AddColumns(add_columns) = kind else { - return true; // Never get here since we have checked it in `check_alter_kind` - }; - - // We only check that all columns have been finished. That is to say, - // if one part is finished but another part is not, it will be considered - // unfinished. - add_columns - .add_columns - .iter() - .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name)) - .all(|column| column.map(|c| columns.contains(c)).unwrap_or(false)) - } -} diff --git a/src/common/meta/src/ddl/alter_logical_tables/executor.rs b/src/common/meta/src/ddl/alter_logical_tables/executor.rs new file mode 100644 index 0000000000..fde4a334ca --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/executor.rs @@ -0,0 +1,216 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::region::RegionResponse; +use api::v1::alter_table_expr::Kind; +use api::v1::region::{ + alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests, + RegionColumnDef, RegionRequest, RegionRequestHeader, +}; +use api::v1::{self, AlterTableExpr}; +use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{debug, warn}; +use futures::future; +use store_api::metadata::ColumnMetadata; +use store_api::storage::{RegionId, RegionNumber, TableId}; + +use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info}; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef}; +use crate::node_manager::NodeManagerRef; +use crate::rpc::router::{find_leaders, region_distribution, RegionRoute}; + +/// [AlterLogicalTablesExecutor] performs: +/// - Alters logical regions on the datanodes. +/// - Updates table metadata for alter table operation. +pub struct AlterLogicalTablesExecutor<'a> { + /// The alter table expressions. + /// + /// The first element is the logical table id, the second element is the alter table expression. + alters: Vec<(TableId, &'a AlterTableExpr)>, +} + +impl<'a> AlterLogicalTablesExecutor<'a> { + pub fn new(alters: Vec<(TableId, &'a AlterTableExpr)>) -> Self { + Self { alters } + } + + /// Alters logical regions on the datanodes. + pub(crate) async fn on_alter_regions( + &self, + node_manager: &NodeManagerRef, + region_routes: &[RegionRoute], + ) -> Result> { + let region_distribution = region_distribution(region_routes); + let leaders = find_leaders(region_routes) + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let mut alter_region_tasks = Vec::with_capacity(leaders.len()); + for (datanode_id, region_role_set) in region_distribution { + if region_role_set.leader_regions.is_empty() { + continue; + } + // Safety: must exists. + let peer = leaders.get(&datanode_id).unwrap(); + let requester = node_manager.datanode(peer).await; + let requests = self.make_alter_region_request(®ion_role_set.leader_regions); + let requester = requester.clone(); + let peer = peer.clone(); + + debug!("Sending alter region requests to datanode {}", peer); + alter_region_tasks.push(async move { + requester + .handle(make_request(requests)) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + + future::join_all(alter_region_tasks) + .await + .into_iter() + .collect::>>() + } + + fn make_alter_region_request(&self, region_numbers: &[RegionNumber]) -> AlterRequests { + let mut requests = Vec::with_capacity(region_numbers.len() * self.alters.len()); + for (table_id, alter) in self.alters.iter() { + for region_number in region_numbers { + let region_id = RegionId::new(*table_id, *region_number); + let request = make_alter_region_request(region_id, alter); + requests.push(request); + } + } + + AlterRequests { requests } + } + + /// Updates table metadata for alter table operation. + /// + /// ## Panic: + /// - If the region distribution is not set when updating table metadata. + pub(crate) async fn on_alter_metadata( + physical_table_id: TableId, + table_metadata_manager: &TableMetadataManagerRef, + current_table_info_value: &DeserializedValueWithBytes, + region_distribution: RegionDistribution, + physical_columns: &[ColumnMetadata], + ) -> Result<()> { + if physical_columns.is_empty() { + warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables"); + return Ok(()); + } + + let table_ref = current_table_info_value.table_ref(); + let table_id = physical_table_id; + + // Generates new table info + let old_raw_table_info = current_table_info_value.table_info.clone(); + let new_raw_table_info = + raw_table_info::build_new_physical_table_info(old_raw_table_info, physical_columns); + + debug!( + "Starting update table: {} metadata, table_id: {}, new table info: {:?}", + table_ref, table_id, new_raw_table_info + ); + + table_metadata_manager + .update_table_info( + current_table_info_value, + Some(region_distribution), + new_raw_table_info, + ) + .await?; + + Ok(()) + } + + /// Builds the cache ident keys for the alter logical tables. + /// + /// The cache ident keys are: + /// - The table id of the logical tables. + /// - The table name of the logical tables. + /// - The table id of the physical table. + pub(crate) fn build_cache_ident_keys( + physical_table_info: &TableInfoValue, + logical_table_info_values: &[&TableInfoValue], + ) -> Vec { + let mut cache_keys = Vec::with_capacity(logical_table_info_values.len() * 2 + 2); + cache_keys.extend(logical_table_info_values.iter().flat_map(|table| { + vec![ + CacheIdent::TableId(table.table_info.ident.table_id), + CacheIdent::TableName(table.table_name()), + ] + })); + cache_keys.push(CacheIdent::TableId( + physical_table_info.table_info.ident.table_id, + )); + cache_keys.push(CacheIdent::TableName(physical_table_info.table_name())); + + cache_keys + } +} + +fn make_request(alter_requests: AlterRequests) -> RegionRequest { + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Alters(alter_requests)), + } +} + +/// Makes an alter region request. +pub fn make_alter_region_request( + region_id: RegionId, + alter_table_expr: &AlterTableExpr, +) -> AlterRequest { + let region_id = region_id.as_u64(); + let kind = match &alter_table_expr.kind { + Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns( + to_region_add_columns(add_columns), + )), + _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks + }; + + AlterRequest { + region_id, + schema_version: 0, + kind, + } +} + +fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns { + let add_columns = add_columns + .add_columns + .iter() + .map(|add_column| { + let region_column_def = RegionColumnDef { + column_def: add_column.column_def.clone(), + ..Default::default() // other fields are not used in alter logical table + }; + AddColumn { + column_def: Some(region_column_def), + ..Default::default() // other fields are not used in alter logical table + } + }) + .collect(); + AddColumns { add_columns } +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs deleted file mode 100644 index 8734b4ef37..0000000000 --- a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_catalog::format_full_table_name; -use snafu::OptionExt; -use table::metadata::TableId; - -use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::error::{ - AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu, - TableRouteNotFoundSnafu, -}; -use crate::key::table_info::TableInfoValue; -use crate::key::table_name::TableNameKey; -use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; -use crate::rpc::ddl::AlterTableTask; - -impl AlterLogicalTablesProcedure { - pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> { - debug_assert_eq!(finished_tasks.len(), self.data.tasks.len()); - debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len()); - self.data.tasks = self - .data - .tasks - .drain(..) - .zip(finished_tasks.iter()) - .filter_map(|(task, finished)| if *finished { None } else { Some(task) }) - .collect(); - self.data.table_info_values = self - .data - .table_info_values - .drain(..) - .zip(finished_tasks.iter()) - .filter_map(|(table_info_value, finished)| { - if *finished { - None - } else { - Some(table_info_value) - } - }) - .collect(); - - Ok(()) - } - - pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> { - let (physical_table_info, physical_table_route) = self - .context - .table_metadata_manager - .get_full_table_info(self.data.physical_table_id) - .await?; - - let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu { - table: format!("table id - {}", self.data.physical_table_id), - })?; - let physical_table_route = physical_table_route - .context(TableRouteNotFoundSnafu { - table_id: self.data.physical_table_id, - })? - .into_inner(); - - self.data.physical_table_info = Some(physical_table_info); - let TableRouteValue::Physical(physical_table_route) = physical_table_route else { - return AlterLogicalTablesInvalidArgumentsSnafu { - err_msg: format!( - "expected a physical table but got a logical table: {:?}", - self.data.physical_table_id - ), - } - .fail(); - }; - self.data.physical_table_route = Some(physical_table_route); - - Ok(()) - } - - pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> { - let table_ids = self.get_all_table_ids().await?; - let table_info_values = self.get_all_table_info_values(&table_ids).await?; - debug_assert_eq!(table_info_values.len(), self.data.tasks.len()); - self.data.table_info_values = table_info_values; - - Ok(()) - } - - async fn get_all_table_info_values( - &self, - table_ids: &[TableId], - ) -> Result>> { - let table_info_manager = self.context.table_metadata_manager.table_info_manager(); - let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?; - let mut table_info_values = Vec::with_capacity(table_ids.len()); - for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) { - let table_info_value = - table_info_map - .remove(table_id) - .with_context(|| TableInfoNotFoundSnafu { - table: extract_table_name(task), - })?; - table_info_values.push(table_info_value); - } - - Ok(table_info_values) - } - - async fn get_all_table_ids(&self) -> Result> { - let table_name_manager = self.context.table_metadata_manager.table_name_manager(); - let table_name_keys = self - .data - .tasks - .iter() - .map(|task| extract_table_name_key(task)) - .collect(); - - let table_name_values = table_name_manager.batch_get(table_name_keys).await?; - let mut table_ids = Vec::with_capacity(table_name_values.len()); - for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) { - let table_id = value - .with_context(|| TableNotFoundSnafu { - table_name: extract_table_name(task), - })? - .table_id(); - table_ids.push(table_id); - } - - Ok(table_ids) - } -} - -#[inline] -fn extract_table_name(task: &AlterTableTask) -> String { - format_full_table_name( - &task.alter_table.catalog_name, - &task.alter_table.schema_name, - &task.alter_table.table_name, - ) -} - -#[inline] -fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey { - TableNameKey::new( - &task.alter_table.catalog_name, - &task.alter_table.schema_name, - &task.alter_table.table_name, - ) -} diff --git a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs deleted file mode 100644 index 6bd1a12193..0000000000 --- a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::alter_table_expr::Kind; -use api::v1::region::{ - alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests, - RegionColumnDef, RegionRequest, RegionRequestHeader, -}; -use api::v1::{self, AlterTableExpr}; -use common_telemetry::tracing_context::TracingContext; -use store_api::storage::RegionId; - -use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::error::Result; -use crate::peer::Peer; -use crate::rpc::router::{find_leader_regions, RegionRoute}; - -impl AlterLogicalTablesProcedure { - pub(crate) fn make_request( - &self, - peer: &Peer, - region_routes: &[RegionRoute], - ) -> Result { - let alter_requests = self.make_alter_region_requests(peer, region_routes)?; - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(region_request::Body::Alters(alter_requests)), - }; - - Ok(request) - } - - fn make_alter_region_requests( - &self, - peer: &Peer, - region_routes: &[RegionRoute], - ) -> Result { - let tasks = &self.data.tasks; - let regions_on_this_peer = find_leader_regions(region_routes, peer); - let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len()); - for (task, table) in self - .data - .tasks - .iter() - .zip(self.data.table_info_values.iter()) - { - for region_number in ®ions_on_this_peer { - let region_id = RegionId::new(table.table_info.ident.table_id, *region_number); - let request = make_alter_region_request( - region_id, - &task.alter_table, - table.table_info.ident.version, - ); - requests.push(request); - } - } - - Ok(AlterRequests { requests }) - } -} - -/// Makes an alter region request. -pub fn make_alter_region_request( - region_id: RegionId, - alter_table_expr: &AlterTableExpr, - schema_version: u64, -) -> AlterRequest { - let region_id = region_id.as_u64(); - let kind = match &alter_table_expr.kind { - Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns( - to_region_add_columns(add_columns), - )), - _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks - }; - - AlterRequest { - region_id, - schema_version, - kind, - } -} - -fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns { - let add_columns = add_columns - .add_columns - .iter() - .map(|add_column| { - let region_column_def = RegionColumnDef { - column_def: add_column.column_def.clone(), - ..Default::default() // other fields are not used in alter logical table - }; - AddColumn { - column_def: Some(region_column_def), - ..Default::default() // other fields are not used in alter logical table - } - }) - .collect(); - AddColumns { add_columns } -} diff --git a/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs b/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs deleted file mode 100644 index 2c839da0fd..0000000000 --- a/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use table::metadata::RawTableInfo; -use table::table_name::TableName; - -use crate::ddl::alter_logical_tables::AlterTablesData; -use crate::instruction::CacheIdent; - -impl AlterTablesData { - pub(crate) fn build_cache_keys_to_invalidate(&mut self) { - let mut cache_keys = self - .table_info_values - .iter() - .flat_map(|table| { - vec![ - CacheIdent::TableId(table.table_info.ident.table_id), - CacheIdent::TableName(extract_table_name(&table.table_info)), - ] - }) - .collect::>(); - cache_keys.push(CacheIdent::TableId(self.physical_table_id)); - // Safety: physical_table_info already filled in previous steps - let physical_table_info = &self.physical_table_info.as_ref().unwrap().table_info; - cache_keys.push(CacheIdent::TableName(extract_table_name( - physical_table_info, - ))); - - self.table_cache_keys_to_invalidate = cache_keys; - } -} - -fn extract_table_name(table_info: &RawTableInfo) -> TableName { - TableName::new( - &table_info.catalog_name, - &table_info.schema_name, - &table_info.name, - ) -} diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index bc75ba1f7e..f730f0cda6 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -13,13 +13,12 @@ // limitations under the License. use common_grpc_expr::alter_expr_to_request; -use common_telemetry::warn; use itertools::Itertools; use snafu::ResultExt; use table::metadata::{RawTableInfo, TableInfo}; +use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::ddl::utils::raw_table_info; use crate::error; use crate::error::{ConvertAlterTableRequestSnafu, Result}; use crate::key::table_info::TableInfoValue; @@ -29,32 +28,20 @@ use crate::rpc::router::region_distribution; impl AlterLogicalTablesProcedure { pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { - if self.data.physical_columns.is_empty() { - warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables"); - return Ok(()); - } - // Safety: must exist. let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); let physical_table_route = self.data.physical_table_route.as_ref().unwrap(); let region_distribution = region_distribution(&physical_table_route.region_routes); - // Generates new table info - let old_raw_table_info = physical_table_info.table_info.clone(); - let new_raw_table_info = raw_table_info::build_new_physical_table_info( - old_raw_table_info, - &self.data.physical_columns, - ); - // Updates physical table's metadata. - self.context - .table_metadata_manager - .update_table_info( - physical_table_info, - Some(region_distribution), - new_raw_table_info, - ) - .await?; + AlterLogicalTablesExecutor::on_alter_metadata( + self.data.physical_table_id, + &self.context.table_metadata_manager, + physical_table_info, + region_distribution, + &self.data.physical_columns, + ) + .await?; Ok(()) } diff --git a/src/common/meta/src/ddl/alter_logical_tables/validator.rs b/src/common/meta/src/ddl/alter_logical_tables/validator.rs new file mode 100644 index 0000000000..e7295e2cd2 --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/validator.rs @@ -0,0 +1,284 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::alter_table_expr::Kind; +use api::v1::AlterTableExpr; +use snafu::{ensure, OptionExt}; +use store_api::storage::TableId; +use table::table_reference::TableReference; + +use crate::ddl::utils::table_id::get_all_table_ids_by_names; +use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids; +use crate::error::{ + AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, + TableRouteNotFoundSnafu, +}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue}; +use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; + +/// [AlterLogicalTableValidator] validates the alter logical expressions. +pub struct AlterLogicalTableValidator<'a> { + physical_table_id: TableId, + alters: Vec<&'a AlterTableExpr>, +} + +impl<'a> AlterLogicalTableValidator<'a> { + pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self { + Self { + physical_table_id, + alters, + } + } + + /// Validates all alter table expressions have the same schema and catalog. + fn validate_schema(&self) -> Result<()> { + let is_same_schema = self.alters.windows(2).all(|pair| { + pair[0].catalog_name == pair[1].catalog_name + && pair[0].schema_name == pair[1].schema_name + }); + + ensure!( + is_same_schema, + AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Schemas of the alter table expressions are not the same" + } + ); + + Ok(()) + } + + /// Validates that all alter table expressions are of the supported kind. + /// Currently only supports `AddColumns` operations. + fn validate_alter_kind(&self) -> Result<()> { + for alter in &self.alters { + let kind = alter + .kind + .as_ref() + .context(AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Alter kind is missing", + })?; + + let Kind::AddColumns(_) = kind else { + return AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Only support add columns operation", + } + .fail(); + }; + } + + Ok(()) + } + + fn table_names(&self) -> Vec { + self.alters + .iter() + .map(|alter| { + TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name) + }) + .collect() + } + + /// Validates that the physical table info and route exist. + /// + /// This method performs the following validations: + /// 1. Retrieves the full table info and route for the given physical table id + /// 2. Ensures the table info and table route exists + /// 3. Verifies that the table route is actually a physical table route, not a logical one + /// + /// Returns a tuple containing the validated table info and physical table route. + async fn validate_physical_table( + &self, + table_metadata_manager: &TableMetadataManagerRef, + ) -> Result<( + DeserializedValueWithBytes, + PhysicalTableRouteValue, + )> { + let (table_info, table_route) = table_metadata_manager + .get_full_table_info(self.physical_table_id) + .await?; + + let table_info = table_info.with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.physical_table_id), + })?; + + let physical_table_route = table_route + .context(TableRouteNotFoundSnafu { + table_id: self.physical_table_id, + })? + .into_inner(); + + let TableRouteValue::Physical(table_route) = physical_table_route else { + return AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: format!( + "expected a physical table but got a logical table: {:?}", + self.physical_table_id + ), + } + .fail(); + }; + + Ok((table_info, table_route)) + } + + /// Validates that all logical table routes have the same physical table id. + /// + /// This method performs the following validations: + /// 1. Retrieves table routes for all the given table ids. + /// 2. Ensures that all retrieved routes are logical table routes (not physical) + /// 3. Verifies that all logical table routes reference the same physical table id. + /// 4. Returns an error if any route is not logical or references a different physical table. + async fn validate_logical_table_routes( + &self, + table_route_manager: &TableRouteManager, + table_ids: &[TableId], + ) -> Result<()> { + let table_routes = table_route_manager + .table_route_storage() + .batch_get(table_ids) + .await?; + + let physical_table_id = self.physical_table_id; + + let is_same_physical_table = table_routes.iter().all(|r| { + if let Some(TableRouteValue::Logical(r)) = r { + r.physical_table_id() == physical_table_id + } else { + false + } + }); + + ensure!( + is_same_physical_table, + AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "All the tasks should have the same physical table id" + } + ); + + Ok(()) + } + + /// Validates the alter logical expressions. + /// + /// This method performs the following validations: + /// 1. Validates that all alter table expressions have the same schema and catalog. + /// 2. Validates that all alter table expressions are of the supported kind. + /// 3. Validates that the physical table info and route exist. + /// 4. Validates that all logical table routes have the same physical table id. + /// + /// Returns a [ValidatorResult] containing the validation results. + pub async fn validate( + &self, + table_metadata_manager: &TableMetadataManagerRef, + ) -> Result { + self.validate_schema()?; + self.validate_alter_kind()?; + let (physical_table_info, physical_table_route) = + self.validate_physical_table(table_metadata_manager).await?; + let table_names = self.table_names(); + let table_ids = + get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names) + .await?; + let mut table_info_values = get_all_table_info_values_by_table_ids( + table_metadata_manager.table_info_manager(), + &table_ids, + &table_names, + ) + .await?; + self.validate_logical_table_routes( + table_metadata_manager.table_route_manager(), + &table_ids, + ) + .await?; + let skip_alter = self + .alters + .iter() + .zip(table_info_values.iter()) + .map(|(task, table)| skip_alter_logical_region(task, table)) + .collect::>(); + retain_unskipped(&mut table_info_values, &skip_alter); + let num_skipped = skip_alter.iter().filter(|&&x| x).count(); + + Ok(ValidatorResult { + num_skipped, + skip_alter, + table_info_values, + physical_table_info, + physical_table_route, + }) + } +} + +/// The result of the validator. +pub(crate) struct ValidatorResult { + pub(crate) num_skipped: usize, + pub(crate) skip_alter: Vec, + pub(crate) table_info_values: Vec>, + pub(crate) physical_table_info: DeserializedValueWithBytes, + pub(crate) physical_table_route: PhysicalTableRouteValue, +} + +/// Retains the elements that are not skipped. +pub(crate) fn retain_unskipped(target: &mut Vec, skipped: &[bool]) { + debug_assert_eq!(target.len(), skipped.len()); + let mut iter = skipped.iter(); + target.retain(|_| !iter.next().unwrap()); +} + +/// Returns true if does not required to alter the logical region. +fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool { + let existing_columns = table + .table_info + .meta + .schema + .column_schemas + .iter() + .map(|c| &c.name) + .collect::>(); + + let Some(kind) = alter.kind.as_ref() else { + return true; // Never get here since we have checked it in `validate_alter_kind` + }; + let Kind::AddColumns(add_columns) = kind else { + return true; // Never get here since we have checked it in `validate_alter_kind` + }; + + // We only check that all columns have been finished. That is to say, + // if one part is finished but another part is not, it will be considered + // unfinished. + add_columns + .add_columns + .iter() + .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name)) + .all(|column| { + column + .map(|c| existing_columns.contains(c)) + .unwrap_or(false) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_retain_unskipped() { + let mut target = vec![1, 2, 3, 4, 5]; + let skipped = vec![false, true, false, true, false]; + retain_unskipped(&mut target, &skipped); + assert_eq!(target, vec![1, 3, 5]); + } +} diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 7ddf5ebba4..c0e8bd96e0 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -13,6 +13,8 @@ // limitations under the License. pub(crate) mod raw_table_info; +pub(crate) mod table_id; +pub(crate) mod table_info; use std::collections::HashMap; use std::fmt::Debug; diff --git a/src/common/meta/src/ddl/utils/table_id.rs b/src/common/meta/src/ddl/utils/table_id.rs new file mode 100644 index 0000000000..e0f62de818 --- /dev/null +++ b/src/common/meta/src/ddl/utils/table_id.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::OptionExt; +use store_api::storage::TableId; +use table::table_reference::TableReference; + +use crate::error::{Result, TableNotFoundSnafu}; +use crate::key::table_name::{TableNameKey, TableNameManager}; + +/// Get all the table ids from the table names. +/// +/// Returns an error if any table does not exist. +pub(crate) async fn get_all_table_ids_by_names<'a>( + table_name_manager: &TableNameManager, + table_names: &[TableReference<'a>], +) -> Result> { + let table_name_keys = table_names + .iter() + .map(TableNameKey::from) + .collect::>(); + let table_name_values = table_name_manager.batch_get(table_name_keys).await?; + let mut table_ids = Vec::with_capacity(table_name_values.len()); + for (value, table_name) in table_name_values.into_iter().zip(table_names) { + let value = value + .with_context(|| TableNotFoundSnafu { + table_name: table_name.to_string(), + })? + .table_id(); + + table_ids.push(value); + } + + Ok(table_ids) +} diff --git a/src/common/meta/src/ddl/utils/table_info.rs b/src/common/meta/src/ddl/utils/table_info.rs new file mode 100644 index 0000000000..155e466797 --- /dev/null +++ b/src/common/meta/src/ddl/utils/table_info.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::OptionExt; +use store_api::storage::TableId; +use table::table_reference::TableReference; + +use crate::error::{Result, TableInfoNotFoundSnafu}; +use crate::key::table_info::{TableInfoManager, TableInfoValue}; +use crate::key::DeserializedValueWithBytes; + +/// Get all table info values by table ids. +/// +/// Returns an error if any table does not exist. +pub(crate) async fn get_all_table_info_values_by_table_ids<'a>( + table_info_manager: &TableInfoManager, + table_ids: &[TableId], + table_names: &[TableReference<'a>], +) -> Result>> { + let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?; + let mut table_info_values = Vec::with_capacity(table_ids.len()); + for (table_id, table_name) in table_ids.iter().zip(table_names) { + let table_info_value = + table_info_map + .remove(table_id) + .with_context(|| TableInfoNotFoundSnafu { + table: table_name.to_string(), + })?; + table_info_values.push(table_info_value); + } + + Ok(table_info_values) +} diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 9dbe405aec..2cb8586293 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -103,6 +103,26 @@ pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> { Ok((table_name_key.table.to_string(), table_name_value)) } +impl<'a> From<&TableReference<'a>> for TableNameKey<'a> { + fn from(value: &TableReference<'a>) -> Self { + Self { + catalog: value.catalog, + schema: value.schema, + table: value.table, + } + } +} + +impl<'a> From> for TableNameKey<'a> { + fn from(value: TableReference<'a>) -> Self { + Self { + catalog: value.catalog, + schema: value.schema, + table: value.table, + } + } +} + impl<'a> From<&'a TableName> for TableNameKey<'a> { fn from(value: &'a TableName) -> Self { Self {