From 059cb6fdc3bd73f2dcdab54e99387893f495d636 Mon Sep 17 00:00:00 2001 From: Yohan Wal Date: Fri, 7 Feb 2025 23:09:37 +0800 Subject: [PATCH] feat: update topic-region map when create and drop table (#5423) * feat: update topic-region map * fix: parse topic correctly * test: add unit test forraft engine wal * Update src/common/meta/src/ddl/drop_table.rs Co-authored-by: Weny Xu * test: fix unit tests * test: fix unit tests * chore: error handling and tests * refactor: manage region-topic map in table_metadata_keys * refactor: use WalOptions instead of String in deletion * chore: revert unused change * chore: follow review comments * Apply suggestions from code review Co-authored-by: jeremyhi * chore: follow review comments --------- Co-authored-by: Weny Xu Co-authored-by: jeremyhi --- src/cli/src/bench.rs | 3 +- src/cli/src/bench/metadata.rs | 17 +- .../meta/src/ddl/drop_database/executor.rs | 18 +- src/common/meta/src/ddl/drop_table.rs | 25 ++- .../meta/src/ddl/drop_table/executor.rs | 37 +++- .../meta/src/ddl/drop_table/metadata.rs | 18 +- src/common/meta/src/ddl/utils.rs | 35 +++- src/common/meta/src/error.rs | 12 +- src/common/meta/src/key.rs | 184 ++++++++++++++++-- src/common/meta/src/key/datanode_table.rs | 24 ++- src/common/meta/src/key/topic_region.rs | 111 ++++++++++- src/common/meta/src/wal_options_allocator.rs | 4 +- 12 files changed, 442 insertions(+), 46 deletions(-) diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index c948859b52..a0a9f008e6 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -28,6 +28,7 @@ use common_meta::kv_backend::postgres::PgStore; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_telemetry::info; +use common_wal::options::WalOptions; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use rand::Rng; @@ -184,7 +185,7 @@ fn create_region_routes(regions: Vec) -> Vec { region_routes } -fn create_region_wal_options(regions: Vec) -> HashMap { +fn create_region_wal_options(regions: Vec) -> HashMap { // TODO(niebayes): construct region wal options for benchmark. let _ = regions; HashMap::default() diff --git a/src/cli/src/bench/metadata.rs b/src/cli/src/bench/metadata.rs index 28343232a1..68dfb5441a 100644 --- a/src/cli/src/bench/metadata.rs +++ b/src/cli/src/bench/metadata.rs @@ -49,7 +49,12 @@ impl TableMetadataBencher { let regions: Vec<_> = (0..64).collect(); let region_routes = create_region_routes(regions.clone()); - let region_wal_options = create_region_wal_options(regions); + let region_wal_options = create_region_wal_options(regions) + .into_iter() + .map(|(region_id, wal_options)| { + (region_id, serde_json::to_string(&wal_options).unwrap()) + }) + .collect(); let start = Instant::now(); @@ -109,9 +114,17 @@ impl TableMetadataBencher { let table_info = table_info.unwrap(); let table_route = table_route.unwrap(); let table_id = table_info.table_info.ident.table_id; + + let regions: Vec<_> = (0..64).collect(); + let region_wal_options = create_region_wal_options(regions); let _ = self .table_metadata_manager - .delete_table_metadata(table_id, &table_info.table_name(), &table_route) + .delete_table_metadata( + table_id, + &table_info.table_name(), + &table_route, + ®ion_wal_options, + ) .await; start.elapsed() }, diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index a8d8ed9d1f..f840c51b48 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use common_procedure::Status; use common_telemetry::info; @@ -25,6 +26,7 @@ use super::cursor::DropDatabaseCursor; use super::{DropDatabaseContext, DropTableTarget}; use crate::ddl::drop_database::State; use crate::ddl::drop_table::executor::DropTableExecutor; +use crate::ddl::utils::extract_region_wal_options; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; @@ -107,8 +109,22 @@ impl State for DropDatabaseExecutor { self.physical_table_id, self.physical_region_routes.clone(), ); + + // Deletes topic-region mapping if dropping physical table + let region_wal_options = + if let TableRouteValue::Physical(table_route_value) = &table_route_value { + let datanode_table_values = ddl_ctx + .table_metadata_manager + .datanode_table_manager() + .regions(self.physical_table_id, table_route_value) + .await?; + extract_region_wal_options(&datanode_table_values)? + } else { + HashMap::new() + }; + executor - .on_destroy_metadata(ddl_ctx, &table_route_value) + .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options) .await?; executor.invalidate_table_cache(ddl_ctx).await?; executor diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 7720c7ee8e..968fa65cf2 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -15,6 +15,8 @@ pub(crate) mod executor; mod metadata; +use std::collections::HashMap; + use async_trait::async_trait; use common_error::ext::BoxedError; use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu}; @@ -24,8 +26,10 @@ use common_procedure::{ }; use common_telemetry::info; use common_telemetry::tracing::warn; +use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionNumber; use strum::AsRefStr; use table::metadata::TableId; use table::table_reference::TableReference; @@ -131,7 +135,11 @@ impl DropTableProcedure { ); // Deletes table metadata logically. self.executor - .on_delete_metadata(&self.context, table_route_value) + .on_delete_metadata( + &self.context, + table_route_value, + &self.data.region_wal_options, + ) .await?; info!("Deleted table metadata for table {table_id}"); self.data.state = DropTableState::InvalidateTableCache; @@ -163,7 +171,11 @@ impl DropTableProcedure { self.data.physical_region_routes.clone(), ); self.executor - .on_delete_metadata_tombstone(&self.context, table_route_value) + .on_delete_metadata_tombstone( + &self.context, + table_route_value, + &self.data.region_wal_options, + ) .await?; self.dropping_regions.clear(); @@ -243,7 +255,11 @@ impl Procedure for DropTableProcedure { self.data.physical_region_routes.clone(), ); self.executor - .on_restore_metadata(&self.context, table_route_value) + .on_restore_metadata( + &self.context, + table_route_value, + &self.data.region_wal_options, + ) .await .map_err(ProcedureError::external) } @@ -257,6 +273,8 @@ pub struct DropTableData { pub physical_region_routes: Vec, pub physical_table_id: Option, #[serde(default)] + pub region_wal_options: HashMap, + #[serde(default)] pub allow_rollback: bool, } @@ -268,6 +286,7 @@ impl DropTableData { task, physical_region_routes: vec![], physical_table_id: None, + region_wal_options: HashMap::new(), allow_rollback: false, } } diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 3848eeb4fc..5b4f9bd5fb 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::{ region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader, }; @@ -19,9 +21,10 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; +use common_wal::options::WalOptions; use futures::future::join_all; use snafu::ensure; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use table::table_name::TableName; @@ -113,9 +116,15 @@ impl DropTableExecutor { &self, ctx: &DdlContext, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { ctx.table_metadata_manager - .delete_table_metadata(self.table_id, &self.table, table_route_value) + .delete_table_metadata( + self.table_id, + &self.table, + table_route_value, + region_wal_options, + ) .await } @@ -124,9 +133,15 @@ impl DropTableExecutor { &self, ctx: &DdlContext, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { ctx.table_metadata_manager - .delete_table_metadata_tombstone(self.table_id, &self.table, table_route_value) + .delete_table_metadata_tombstone( + self.table_id, + &self.table, + table_route_value, + region_wal_options, + ) .await } @@ -135,9 +150,15 @@ impl DropTableExecutor { &self, ctx: &DdlContext, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { ctx.table_metadata_manager - .destroy_table_metadata(self.table_id, &self.table, table_route_value) + .destroy_table_metadata( + self.table_id, + &self.table, + table_route_value, + region_wal_options, + ) .await?; let detecting_regions = if table_route_value.is_physical() { @@ -156,9 +177,15 @@ impl DropTableExecutor { &self, ctx: &DdlContext, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { ctx.table_metadata_manager - .restore_table_metadata(self.table_id, &self.table, table_route_value) + .restore_table_metadata( + self.table_id, + &self.table, + table_route_value, + region_wal_options, + ) .await } diff --git a/src/common/meta/src/ddl/drop_table/metadata.rs b/src/common/meta/src/ddl/drop_table/metadata.rs index 5e182720fe..c1a8a90d4e 100644 --- a/src/common/meta/src/ddl/drop_table/metadata.rs +++ b/src/common/meta/src/ddl/drop_table/metadata.rs @@ -17,6 +17,7 @@ use snafu::OptionExt; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::utils::extract_region_wal_options; use crate::error::{self, Result}; impl DropTableProcedure { @@ -30,9 +31,6 @@ impl DropTableProcedure { .get_physical_table_route(task.table_id) .await?; - self.data.physical_region_routes = physical_table_route_value.region_routes; - self.data.physical_table_id = Some(physical_table_id); - if physical_table_id == self.data.table_id() { let table_info_value = self .context @@ -47,9 +45,21 @@ impl DropTableProcedure { let engine = table_info_value.table_info.meta.engine; // rollback only if dropping the metric physical table fails - self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME + self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME; + + // Deletes topic-region mapping if dropping physical table + let datanode_table_values = self + .context + .table_metadata_manager + .datanode_table_manager() + .regions(physical_table_id, &physical_table_route_value) + .await?; + self.data.region_wal_options = extract_region_wal_options(&datanode_table_values)?; } + self.data.physical_region_routes = physical_table_route_value.region_routes; + self.data.physical_table_id = Some(physical_table_id); + Ok(()) } } diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index b1608d40e3..f6852db753 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_catalog::consts::METRIC_ENGINE; use common_error::ext::BoxedError; use common_procedure::error::Error as ProcedureError; +use common_wal::options::WalOptions; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use store_api::storage::RegionNumber; use table::metadata::TableId; use table::table_reference::TableReference; use crate::ddl::DetectingRegion; -use crate::error::{Error, OperateDatanodeSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu}; +use crate::error::{ + Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu, +}; +use crate::key::datanode_table::DatanodeTableValue; use crate::key::table_name::TableNameKey; use crate::key::TableMetadataManagerRef; use crate::peer::Peer; @@ -151,6 +158,32 @@ pub fn convert_region_routes_to_detecting_regions( .collect::>() } +/// Parses [WalOptions] from serialized strings in hashmap. +pub fn parse_region_wal_options( + serialized_options: &HashMap, +) -> Result> { + let mut region_wal_options = HashMap::with_capacity(serialized_options.len()); + for (region_number, wal_options) in serialized_options { + let wal_option = serde_json::from_str::(wal_options) + .context(ParseWalOptionsSnafu { wal_options })?; + region_wal_options.insert(*region_number, wal_option); + } + Ok(region_wal_options) +} + +/// Extracts region wal options from [DatanodeTableValue]s. +pub fn extract_region_wal_options( + datanode_table_values: &Vec, +) -> Result> { + let mut region_wal_options = HashMap::new(); + for value in datanode_table_values { + let serialized_options = &value.region_info.region_wal_options; + let parsed_options = parse_region_wal_options(serialized_options)?; + region_wal_options.extend(parsed_options); + } + Ok(region_wal_options) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index bccdb258a9..7324847294 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -710,6 +710,15 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to parse wal options: {}", wal_options))] + ParseWalOptions { + wal_options: String, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: serde_json::Error, + }, } pub type Result = std::result::Result; @@ -762,7 +771,8 @@ impl ErrorExt for Error { | UnexpectedLogicalRouteTable { .. } | ProcedureOutput { .. } | FromUtf8 { .. } - | MetadataCorruption { .. } => StatusCode::Unexpected, + | MetadataCorruption { .. } + | ParseWalOptions { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 9690b885ae..5226a1f6c7 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -57,7 +57,7 @@ //! - This key is mainly used in constructing the view in Datanode and Frontend. //! //! 12. Kafka topic key: `__topic_name/kafka/{topic_name}` -//! - The key is used to mark existing topics in kafka for WAL. +//! - The key is used to mark existing topics in kafka for WAL. //! //! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}` //! - Mapping {topic_name} to {region_id} @@ -122,6 +122,7 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, }; use common_telemetry::warn; +use common_wal::options::WalOptions; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use flow::flow_route::FlowRouteValue; use flow::table_flow::TableFlowValue; @@ -136,6 +137,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_name::TableName; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; +use topic_region::{TopicRegionKey, TopicRegionManager}; use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; @@ -306,6 +308,7 @@ pub struct TableMetadataManager { schema_manager: SchemaManager, table_route_manager: TableRouteManager, tombstone_manager: TombstoneManager, + topic_region_manager: TopicRegionManager, kv_backend: KvBackendRef, } @@ -456,6 +459,7 @@ impl TableMetadataManager { schema_manager: SchemaManager::new(kv_backend.clone()), table_route_manager: TableRouteManager::new(kv_backend.clone()), tombstone_manager: TombstoneManager::new(kv_backend.clone()), + topic_region_manager: TopicRegionManager::new(kv_backend.clone()), kv_backend, } } @@ -648,10 +652,15 @@ impl TableMetadataManager { .table_route_storage() .build_create_txn(table_id, &table_route_value)?; + let create_topic_region_txn = self + .topic_region_manager + .build_create_txn(table_id, ®ion_wal_options)?; + let mut txn = Txn::merge_all(vec![ create_table_name_txn, create_table_info_txn, create_table_route_txn, + create_topic_region_txn, ]); if let TableRouteValue::Physical(x) = &table_route_value { @@ -785,6 +794,7 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result>> { // Builds keys let datanode_ids = if table_route_value.is_physical() { @@ -806,13 +816,22 @@ impl TableMetadataManager { .into_iter() .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) .collect::>(); - + let topic_region_map = self + .topic_region_manager + .get_topic_region_mapping(table_id, region_wal_options); + let topic_region_keys = topic_region_map + .iter() + .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic)) + .collect::>(); keys.push(table_name.to_bytes()); keys.push(table_info_key.to_bytes()); keys.push(table_route_key.to_bytes()); for key in &datanode_table_keys { keys.push(key.to_bytes()); } + for key in topic_region_keys { + keys.push(key.to_bytes()); + } Ok(keys) } @@ -823,8 +842,10 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { - let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + let keys = + self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; self.tombstone_manager.create(keys).await } @@ -835,9 +856,11 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { - let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; - self.tombstone_manager.delete(keys).await + let table_metadata_keys = + self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; + self.tombstone_manager.delete(table_metadata_keys).await } /// Restores metadata for table. @@ -847,8 +870,10 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { - let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + let keys = + self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; self.tombstone_manager.restore(keys).await } @@ -859,8 +884,10 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, + region_wal_options: &HashMap, ) -> Result<()> { - let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + let keys = + self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; let _ = self .kv_backend .batch_delete(BatchDeleteRequest::new().with_keys(keys)) @@ -1309,8 +1336,9 @@ mod tests { use bytes::Bytes; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::util::current_time_millis; + use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::TryStreamExt; - use store_api::storage::RegionId; + use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{RawTableInfo, TableInfo}; use table::table_name::TableName; @@ -1323,10 +1351,15 @@ mod tests { use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; - use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue}; + use crate::key::{ + DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX, + }; use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; use crate::peer::Peer; use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute}; + use crate::rpc::store::RangeRequest; + use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator}; #[test] fn test_deserialized_value_with_bytes() { @@ -1398,16 +1431,63 @@ mod tests { table_metadata_manager: &TableMetadataManager, table_info: RawTableInfo, region_routes: Vec, + region_wal_options: HashMap, ) -> Result<()> { table_metadata_manager .create_table_metadata( table_info, TableRouteValue::physical(region_routes), - HashMap::default(), + region_wal_options, ) .await } + fn create_mock_region_wal_options() -> HashMap { + let topics = (0..2) + .map(|i| format!("greptimedb_topic{}", i)) + .collect::>(); + let wal_options = topics + .iter() + .map(|topic| { + WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + }) + }) + .collect::>(); + + (0..16) + .enumerate() + .map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone())) + .collect() + } + + #[tokio::test] + async fn test_raft_engine_topic_region_map() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let region_route = new_test_region_route(); + let region_routes = &vec![region_route.clone()]; + let table_info: RawTableInfo = + new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let wal_allocator = WalOptionsAllocator::RaftEngine; + let regions = (0..16).collect(); + let region_wal_options = allocate_region_wal_options(regions, &wal_allocator).unwrap(); + create_physical_table_metadata( + &table_metadata_manager, + table_info.clone(), + region_routes.clone(), + region_wal_options.clone(), + ) + .await + .unwrap(); + + let topic_region_key = TOPIC_REGION_PREFIX.to_string(); + let range_req = RangeRequest::new().with_prefix(topic_region_key); + let resp = mem_kv.range(range_req).await.unwrap(); + // Should be empty because the topic region map is empty for raft engine. + assert!(resp.kvs.is_empty()); + } + #[tokio::test] async fn test_create_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); @@ -1416,11 +1496,17 @@ mod tests { let region_routes = &vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let region_wal_options = create_mock_region_wal_options() + .into_iter() + .map(|(k, v)| (k, serde_json::to_string(&v).unwrap())) + .collect::>(); + // creates metadata. create_physical_table_metadata( &table_metadata_manager, table_info.clone(), region_routes.clone(), + region_wal_options.clone(), ) .await .unwrap(); @@ -1430,6 +1516,7 @@ mod tests { &table_metadata_manager, table_info.clone(), region_routes.clone(), + region_wal_options.clone(), ) .await .is_ok()); @@ -1440,7 +1527,8 @@ mod tests { assert!(create_physical_table_metadata( &table_metadata_manager, table_info.clone(), - modified_region_routes + modified_region_routes, + region_wal_options.clone(), ) .await .is_err()); @@ -1462,6 +1550,19 @@ mod tests { .unwrap(), region_routes ); + + for i in 0..2 { + let region_number = i as u32; + let region_id = RegionId::new(table_info.ident.table_id, region_number); + let topic = format!("greptimedb_topic{}", i); + let regions = table_metadata_manager + .topic_region_manager + .regions(&topic) + .await + .unwrap(); + assert_eq!(regions.len(), 8); + assert_eq!(regions[0], region_id); + } } #[tokio::test] @@ -1557,12 +1658,18 @@ mod tests { new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; let datanode_id = 2; + let region_wal_options = create_mock_region_wal_options(); + let serialized_region_wal_options = region_wal_options + .iter() + .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) + .collect::>(); // creates metadata. create_physical_table_metadata( &table_metadata_manager, table_info.clone(), region_routes.clone(), + serialized_region_wal_options, ) .await .unwrap(); @@ -1575,12 +1682,22 @@ mod tests { let table_route_value = &TableRouteValue::physical(region_routes.clone()); // deletes metadata. table_metadata_manager - .delete_table_metadata(table_id, &table_name, table_route_value) + .delete_table_metadata( + table_id, + &table_name, + table_route_value, + ®ion_wal_options, + ) .await .unwrap(); // Should be ignored. table_metadata_manager - .delete_table_metadata(table_id, &table_name, table_route_value) + .delete_table_metadata( + table_id, + &table_name, + table_route_value, + ®ion_wal_options, + ) .await .unwrap(); assert!(table_metadata_manager @@ -1617,6 +1734,19 @@ mod tests { .await .unwrap(); assert!(table_route.is_none()); + // Logical delete removes the topic region mapping as well. + let regions = table_metadata_manager + .topic_region_manager + .regions("greptimedb_topic0") + .await + .unwrap(); + assert_eq!(regions.len(), 0); + let regions = table_metadata_manager + .topic_region_manager + .regions("greptimedb_topic1") + .await + .unwrap(); + assert_eq!(regions.len(), 0); } #[tokio::test] @@ -1633,6 +1763,7 @@ mod tests { &table_metadata_manager, table_info.clone(), region_routes.clone(), + HashMap::new(), ) .await .unwrap(); @@ -1705,6 +1836,7 @@ mod tests { &table_metadata_manager, table_info.clone(), region_routes.clone(), + HashMap::new(), ) .await .unwrap(); @@ -1790,6 +1922,7 @@ mod tests { &table_metadata_manager, table_info.clone(), region_routes.clone(), + HashMap::new(), ) .await .unwrap(); @@ -1870,6 +2003,7 @@ mod tests { &table_metadata_manager, table_info.clone(), region_routes.clone(), + HashMap::new(), ) .await .unwrap(); @@ -1980,7 +2114,11 @@ mod tests { let table_id = 1025; let table_name = "foo"; let task = test_create_table_task(table_name, table_id); - let options = [(0, "test".to_string())].into(); + let options = create_mock_region_wal_options(); + let serialized_options = options + .iter() + .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) + .collect::>(); table_metadata_manager .create_table_metadata( task.table_info, @@ -2007,7 +2145,7 @@ mod tests { leader_down_since: None, }, ]), - options, + serialized_options, ) .await .unwrap(); @@ -2020,7 +2158,7 @@ mod tests { .unwrap() .unwrap(); table_metadata_manager - .destroy_table_metadata(table_id, &table_name, &table_route_value) + .destroy_table_metadata(table_id, &table_name, &table_route_value, &options) .await .unwrap(); assert!(mem_kv.is_empty()); @@ -2033,7 +2171,11 @@ mod tests { let table_id = 1025; let table_name = "foo"; let task = test_create_table_task(table_name, table_id); - let options = [(0, "test".to_string())].into(); + let options = create_mock_region_wal_options(); + let serialized_options = options + .iter() + .map(|(k, v)| (*k, serde_json::to_string(v).unwrap())) + .collect::>(); table_metadata_manager .create_table_metadata( task.table_info, @@ -2060,7 +2202,7 @@ mod tests { leader_down_since: None, }, ]), - options, + serialized_options, ) .await .unwrap(); @@ -2076,18 +2218,18 @@ mod tests { let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); let table_route_value = TableRouteValue::physical(region_routes.clone()); table_metadata_manager - .delete_table_metadata(table_id, &table_name, &table_route_value) + .delete_table_metadata(table_id, &table_name, &table_route_value, &options) .await .unwrap(); table_metadata_manager - .restore_table_metadata(table_id, &table_name, &table_route_value) + .restore_table_metadata(table_id, &table_name, &table_route_value, &options) .await .unwrap(); let kvs = mem_kv.dump(); assert_eq!(kvs, expected_result); // Should be ignored. table_metadata_manager - .restore_table_metadata(table_id, &table_name, &table_route_value) + .restore_table_metadata(table_id, &table_name, &table_route_value, &options) .await .unwrap(); let kvs = mem_kv.dump(); diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index a8226e631b..c64ee89b85 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -21,6 +21,7 @@ use snafu::OptionExt; use store_api::storage::RegionNumber; use table::metadata::TableId; +use super::table_route::PhysicalTableRouteValue; use super::MetadataKey; use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result}; use crate::key::{ @@ -29,7 +30,8 @@ use crate::key::{ use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::RangeRequest; +use crate::rpc::router::region_distribution; +use crate::rpc::store::{BatchGetRequest, RangeRequest}; use crate::rpc::KeyValue; use crate::DatanodeId; @@ -172,6 +174,26 @@ impl DatanodeTableManager { Box::pin(stream) } + /// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue]. + pub async fn regions( + &self, + table_id: TableId, + table_routes: &PhysicalTableRouteValue, + ) -> Result> { + let keys = region_distribution(&table_routes.region_routes) + .into_keys() + .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) + .collect::>(); + let req = BatchGetRequest { + keys: keys.iter().map(|k| k.to_bytes()).collect(), + }; + let resp = self.kv_backend.batch_get(req).await?; + resp.kvs + .into_iter() + .map(datanode_table_value_decoder) + .collect() + } + /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes. pub fn build_create_txn( &self, diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index 52103ff6ba..b1bf6c8699 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -26,18 +26,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{self, Display}; +use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::OptionExt; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::TableId; +use crate::ddl::utils::parse_region_wal_options; use crate::error::{Error, InvalidMetadataSnafu, Result}; use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX}; +use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{BatchPutRequest, PutRequest, RangeRequest}; +use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest}; use crate::rpc::KeyValue; +// The TopicRegionKey is a key for the topic-region mapping in the kvbackend. +// The layout of the key is `__topic_region/{topic_name}/{region_id}`. #[derive(Debug, Clone, PartialEq)] pub struct TopicRegionKey<'a> { pub region_id: RegionId, @@ -53,7 +60,7 @@ impl<'a> TopicRegionKey<'a> { } pub fn range_topic_key(topic: &str) -> String { - format!("{}/{}", TOPIC_REGION_PREFIX, topic) + format!("{}/{}/", TOPIC_REGION_PREFIX, topic) } } @@ -80,7 +87,7 @@ impl Display for TopicRegionKey<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{}/{}", + "{}{}", Self::range_topic_key(self.topic), self.region_id.as_u64() ) @@ -151,6 +158,24 @@ impl TopicRegionManager { Ok(()) } + pub fn build_create_txn( + &self, + table_id: TableId, + region_wal_options: &HashMap, + ) -> Result { + let region_wal_options = parse_region_wal_options(region_wal_options)?; + let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options); + let topic_region_keys = topic_region_mapping + .iter() + .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id)) + .collect::>(); + let operations = topic_region_keys + .into_iter() + .map(|key| TxnOp::Put(key.to_bytes(), vec![])) + .collect::>(); + Ok(Txn::new().and_then(operations)) + } + /// Returns the list of region ids using specified topic. pub async fn regions(&self, topic: &str) -> Result> { let prefix = TopicRegionKey::range_topic_key(topic); @@ -169,12 +194,49 @@ impl TopicRegionManager { self.kv_backend.delete(&raw_key, false).await?; Ok(()) } + + pub async fn batch_delete(&self, keys: Vec>) -> Result<()> { + let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::>(); + let req = BatchDeleteRequest { + keys: raw_keys, + prev_kv: false, + }; + self.kv_backend.batch_delete(req).await?; + Ok(()) + } + + /// Retrieves a mapping of [`RegionId`]s to their corresponding topics name + /// based on the provided table ID and WAL options. + /// + /// # Returns + /// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name. + pub fn get_topic_region_mapping<'a>( + &self, + table_id: TableId, + region_wal_options: &'a HashMap, + ) -> Vec<(RegionId, &'a str)> { + region_wal_options + .keys() + .filter_map( + |region_number| match region_wal_options.get(region_number) { + Some(WalOptions::Kafka(kafka)) => { + let region_id = RegionId::new(table_id, *region_number); + Some((region_id, kafka.topic.as_str())) + } + Some(WalOptions::RaftEngine) => None, + None => None, + }, + ) + .collect::>() + } } #[cfg(test)] mod tests { use std::sync::Arc; + use common_wal::options::KafkaWalOptions; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; @@ -220,4 +282,45 @@ mod tests { key_values.sort_by_key(|id| id.as_u64()); assert_eq!(key_values, expected); } + + #[test] + fn test_topic_region_map() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + + let table_id = 1; + let region_wal_options = (0..64) + .map(|i| { + let region_number = i; + let wal_options = if i % 2 == 0 { + WalOptions::Kafka(KafkaWalOptions { + topic: format!("topic_{}", i), + }) + } else { + WalOptions::RaftEngine + }; + (region_number, serde_json::to_string(&wal_options).unwrap()) + }) + .collect::>(); + + let region_wal_options = parse_region_wal_options(®ion_wal_options).unwrap(); + let mut topic_region_mapping = + manager.get_topic_region_mapping(table_id, ®ion_wal_options); + let mut expected = (0..64) + .filter_map(|i| { + if i % 2 == 0 { + Some((RegionId::new(table_id, i), format!("topic_{}", i))) + } else { + None + } + }) + .collect::>(); + topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64()); + let topic_region_map = topic_region_mapping + .iter() + .map(|(region_id, topic)| (*region_id, topic.to_string())) + .collect::>(); + expected.sort_by_key(|(region_id, _)| region_id.as_u64()); + assert_eq!(topic_region_map, expected); + } } diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 9177fd0756..cbcc30f0b1 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -13,9 +13,9 @@ // limitations under the License. mod selector; -mod topic_creator; +pub(crate) mod topic_creator; mod topic_manager; -mod topic_pool; +pub(crate) mod topic_pool; use std::collections::HashMap; use std::sync::Arc;