feat: update topic-region map when create and drop table (#5423)

* feat: update topic-region map

* fix: parse topic correctly

* test: add unit test forraft engine wal

* Update src/common/meta/src/ddl/drop_table.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* test: fix unit tests

* test: fix unit tests

* chore: error handling and tests

* refactor: manage region-topic map in table_metadata_keys

* refactor: use WalOptions instead of String in deletion

* chore: revert unused change

* chore: follow review comments

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* chore: follow review comments

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Yohan Wal
2025-02-07 23:09:37 +08:00
committed by GitHub
parent 29218b5fe7
commit 059cb6fdc3
12 changed files with 442 additions and 46 deletions

View File

@@ -28,6 +28,7 @@ use common_meta::kv_backend::postgres::PgStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::info;
use common_wal::options::WalOptions;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
@@ -184,7 +185,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
region_routes
}
fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, WalOptions> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()

View File

@@ -49,7 +49,12 @@ impl TableMetadataBencher {
let regions: Vec<_> = (0..64).collect();
let region_routes = create_region_routes(regions.clone());
let region_wal_options = create_region_wal_options(regions);
let region_wal_options = create_region_wal_options(regions)
.into_iter()
.map(|(region_id, wal_options)| {
(region_id, serde_json::to_string(&wal_options).unwrap())
})
.collect();
let start = Instant::now();
@@ -109,9 +114,17 @@ impl TableMetadataBencher {
let table_info = table_info.unwrap();
let table_route = table_route.unwrap();
let table_id = table_info.table_info.ident.table_id;
let regions: Vec<_> = (0..64).collect();
let region_wal_options = create_region_wal_options(regions);
let _ = self
.table_metadata_manager
.delete_table_metadata(table_id, &table_info.table_name(), &table_route)
.delete_table_metadata(
table_id,
&table_info.table_name(),
&table_route,
&region_wal_options,
)
.await;
start.elapsed()
},

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::Status;
use common_telemetry::info;
@@ -25,6 +26,7 @@ use super::cursor::DropDatabaseCursor;
use super::{DropDatabaseContext, DropTableTarget};
use crate::ddl::drop_database::State;
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::utils::extract_region_wal_options;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
@@ -107,8 +109,22 @@ impl State for DropDatabaseExecutor {
self.physical_table_id,
self.physical_region_routes.clone(),
);
// Deletes topic-region mapping if dropping physical table
let region_wal_options =
if let TableRouteValue::Physical(table_route_value) = &table_route_value {
let datanode_table_values = ddl_ctx
.table_metadata_manager
.datanode_table_manager()
.regions(self.physical_table_id, table_route_value)
.await?;
extract_region_wal_options(&datanode_table_values)?
} else {
HashMap::new()
};
executor
.on_destroy_metadata(ddl_ctx, &table_route_value)
.on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor

View File

@@ -15,6 +15,8 @@
pub(crate) mod executor;
mod metadata;
use std::collections::HashMap;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
@@ -24,8 +26,10 @@ use common_procedure::{
};
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::TableId;
use table::table_reference::TableReference;
@@ -131,7 +135,11 @@ impl DropTableProcedure {
);
// Deletes table metadata logically.
self.executor
.on_delete_metadata(&self.context, table_route_value)
.on_delete_metadata(
&self.context,
table_route_value,
&self.data.region_wal_options,
)
.await?;
info!("Deleted table metadata for table {table_id}");
self.data.state = DropTableState::InvalidateTableCache;
@@ -163,7 +171,11 @@ impl DropTableProcedure {
self.data.physical_region_routes.clone(),
);
self.executor
.on_delete_metadata_tombstone(&self.context, table_route_value)
.on_delete_metadata_tombstone(
&self.context,
table_route_value,
&self.data.region_wal_options,
)
.await?;
self.dropping_regions.clear();
@@ -243,7 +255,11 @@ impl Procedure for DropTableProcedure {
self.data.physical_region_routes.clone(),
);
self.executor
.on_restore_metadata(&self.context, table_route_value)
.on_restore_metadata(
&self.context,
table_route_value,
&self.data.region_wal_options,
)
.await
.map_err(ProcedureError::external)
}
@@ -257,6 +273,8 @@ pub struct DropTableData {
pub physical_region_routes: Vec<RegionRoute>,
pub physical_table_id: Option<TableId>,
#[serde(default)]
pub region_wal_options: HashMap<RegionNumber, WalOptions>,
#[serde(default)]
pub allow_rollback: bool,
}
@@ -268,6 +286,7 @@ impl DropTableData {
task,
physical_region_routes: vec![],
physical_table_id: None,
region_wal_options: HashMap::new(),
allow_rollback: false,
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{
region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader,
};
@@ -19,9 +21,10 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use common_wal::options::WalOptions;
use futures::future::join_all;
use snafu::ensure;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::table_name::TableName;
@@ -113,9 +116,15 @@ impl DropTableExecutor {
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<()> {
ctx.table_metadata_manager
.delete_table_metadata(self.table_id, &self.table, table_route_value)
.delete_table_metadata(
self.table_id,
&self.table,
table_route_value,
region_wal_options,
)
.await
}
@@ -124,9 +133,15 @@ impl DropTableExecutor {
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<u32, WalOptions>,
) -> Result<()> {
ctx.table_metadata_manager
.delete_table_metadata_tombstone(self.table_id, &self.table, table_route_value)
.delete_table_metadata_tombstone(
self.table_id,
&self.table,
table_route_value,
region_wal_options,
)
.await
}
@@ -135,9 +150,15 @@ impl DropTableExecutor {
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<u32, WalOptions>,
) -> Result<()> {
ctx.table_metadata_manager
.destroy_table_metadata(self.table_id, &self.table, table_route_value)
.destroy_table_metadata(
self.table_id,
&self.table,
table_route_value,
region_wal_options,
)
.await?;
let detecting_regions = if table_route_value.is_physical() {
@@ -156,9 +177,15 @@ impl DropTableExecutor {
&self,
ctx: &DdlContext,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<u32, WalOptions>,
) -> Result<()> {
ctx.table_metadata_manager
.restore_table_metadata(self.table_id, &self.table, table_route_value)
.restore_table_metadata(
self.table_id,
&self.table,
table_route_value,
region_wal_options,
)
.await
}

View File

@@ -17,6 +17,7 @@ use snafu::OptionExt;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::utils::extract_region_wal_options;
use crate::error::{self, Result};
impl DropTableProcedure {
@@ -30,9 +31,6 @@ impl DropTableProcedure {
.get_physical_table_route(task.table_id)
.await?;
self.data.physical_region_routes = physical_table_route_value.region_routes;
self.data.physical_table_id = Some(physical_table_id);
if physical_table_id == self.data.table_id() {
let table_info_value = self
.context
@@ -47,9 +45,21 @@ impl DropTableProcedure {
let engine = table_info_value.table_info.meta.engine;
// rollback only if dropping the metric physical table fails
self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME
self.data.allow_rollback = engine.as_str() == METRIC_ENGINE_NAME;
// Deletes topic-region mapping if dropping physical table
let datanode_table_values = self
.context
.table_metadata_manager
.datanode_table_manager()
.regions(physical_table_id, &physical_table_route_value)
.await?;
self.data.region_wal_options = extract_region_wal_options(&datanode_table_values)?;
}
self.data.physical_region_routes = physical_table_route_value.region_routes;
self.data.physical_table_id = Some(physical_table_id);
Ok(())
}
}

View File

@@ -12,16 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use table::table_reference::TableReference;
use crate::ddl::DetectingRegion;
use crate::error::{Error, OperateDatanodeSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::error::{
Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
};
use crate::key::datanode_table::DatanodeTableValue;
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
@@ -151,6 +158,32 @@ pub fn convert_region_routes_to_detecting_regions(
.collect::<Vec<_>>()
}
/// Parses [WalOptions] from serialized strings in hashmap.
pub fn parse_region_wal_options(
serialized_options: &HashMap<RegionNumber, String>,
) -> Result<HashMap<RegionNumber, WalOptions>> {
let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
for (region_number, wal_options) in serialized_options {
let wal_option = serde_json::from_str::<WalOptions>(wal_options)
.context(ParseWalOptionsSnafu { wal_options })?;
region_wal_options.insert(*region_number, wal_option);
}
Ok(region_wal_options)
}
/// Extracts region wal options from [DatanodeTableValue]s.
pub fn extract_region_wal_options(
datanode_table_values: &Vec<DatanodeTableValue>,
) -> Result<HashMap<RegionNumber, WalOptions>> {
let mut region_wal_options = HashMap::new();
for value in datanode_table_values {
let serialized_options = &value.region_info.region_wal_options;
let parsed_options = parse_region_wal_options(serialized_options)?;
region_wal_options.extend(parsed_options);
}
Ok(region_wal_options)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -710,6 +710,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse wal options: {}", wal_options))]
ParseWalOptions {
wal_options: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: serde_json::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -762,7 +771,8 @@ impl ErrorExt for Error {
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. } => StatusCode::Unexpected,
| MetadataCorruption { .. }
| ParseWalOptions { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,

View File

@@ -57,7 +57,7 @@
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
// - The key is used to mark existing topics in kafka for WAL.
//! - The key is used to mark existing topics in kafka for WAL.
//!
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
//! - Mapping {topic_name} to {region_id}
@@ -122,6 +122,7 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
use common_telemetry::warn;
use common_wal::options::WalOptions;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use flow::flow_route::FlowRouteValue;
use flow::table_flow::TableFlowValue;
@@ -136,6 +137,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use topic_region::{TopicRegionKey, TopicRegionManager};
use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
@@ -306,6 +308,7 @@ pub struct TableMetadataManager {
schema_manager: SchemaManager,
table_route_manager: TableRouteManager,
tombstone_manager: TombstoneManager,
topic_region_manager: TopicRegionManager,
kv_backend: KvBackendRef,
}
@@ -456,6 +459,7 @@ impl TableMetadataManager {
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new(kv_backend.clone()),
topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
kv_backend,
}
}
@@ -648,10 +652,15 @@ impl TableMetadataManager {
.table_route_storage()
.build_create_txn(table_id, &table_route_value)?;
let create_topic_region_txn = self
.topic_region_manager
.build_create_txn(table_id, &region_wal_options)?;
let mut txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_table_route_txn,
create_topic_region_txn,
]);
if let TableRouteValue::Physical(x) = &table_route_value {
@@ -785,6 +794,7 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<Vec<Vec<u8>>> {
// Builds keys
let datanode_ids = if table_route_value.is_physical() {
@@ -806,13 +816,22 @@ impl TableMetadataManager {
.into_iter()
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();
let topic_region_map = self
.topic_region_manager
.get_topic_region_mapping(table_id, region_wal_options);
let topic_region_keys = topic_region_map
.iter()
.map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
.collect::<Vec<_>>();
keys.push(table_name.to_bytes());
keys.push(table_info_key.to_bytes());
keys.push(table_route_key.to_bytes());
for key in &datanode_table_keys {
keys.push(key.to_bytes());
}
for key in topic_region_keys {
keys.push(key.to_bytes());
}
Ok(keys)
}
@@ -823,8 +842,10 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.create(keys).await
}
@@ -835,9 +856,11 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.delete(keys).await
let table_metadata_keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.delete(table_metadata_keys).await
}
/// Restores metadata for table.
@@ -847,8 +870,10 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.restore(keys).await
}
@@ -859,8 +884,10 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
region_wal_options: &HashMap<RegionNumber, WalOptions>,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
@@ -1309,8 +1336,9 @@ mod tests {
use bytes::Bytes;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::util::current_time_millis;
use common_wal::options::{KafkaWalOptions, WalOptions};
use futures::TryStreamExt;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableInfo};
use table::table_name::TableName;
@@ -1323,10 +1351,15 @@ mod tests {
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue};
use crate::key::{
DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, LeaderState, Region, RegionRoute};
use crate::rpc::store::RangeRequest;
use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocator};
#[test]
fn test_deserialized_value_with_bytes() {
@@ -1398,16 +1431,63 @@ mod tests {
table_metadata_manager: &TableMetadataManager,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
region_wal_options,
)
.await
}
fn create_mock_region_wal_options() -> HashMap<RegionNumber, WalOptions> {
let topics = (0..2)
.map(|i| format!("greptimedb_topic{}", i))
.collect::<Vec<_>>();
let wal_options = topics
.iter()
.map(|topic| {
WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
})
})
.collect::<Vec<_>>();
(0..16)
.enumerate()
.map(|(i, region_number)| (region_number, wal_options[i % wal_options.len()].clone()))
.collect()
}
#[tokio::test]
async fn test_raft_engine_topic_region_map() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options = allocate_region_wal_options(regions, &wal_allocator).unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
region_wal_options.clone(),
)
.await
.unwrap();
let topic_region_key = TOPIC_REGION_PREFIX.to_string();
let range_req = RangeRequest::new().with_prefix(topic_region_key);
let resp = mem_kv.range(range_req).await.unwrap();
// Should be empty because the topic region map is empty for raft engine.
assert!(resp.kvs.is_empty());
}
#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -1416,11 +1496,17 @@ mod tests {
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let region_wal_options = create_mock_region_wal_options()
.into_iter()
.map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
.collect::<HashMap<_, _>>();
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
region_wal_options.clone(),
)
.await
.unwrap();
@@ -1430,6 +1516,7 @@ mod tests {
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
region_wal_options.clone(),
)
.await
.is_ok());
@@ -1440,7 +1527,8 @@ mod tests {
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
modified_region_routes
modified_region_routes,
region_wal_options.clone(),
)
.await
.is_err());
@@ -1462,6 +1550,19 @@ mod tests {
.unwrap(),
region_routes
);
for i in 0..2 {
let region_number = i as u32;
let region_id = RegionId::new(table_info.ident.table_id, region_number);
let topic = format!("greptimedb_topic{}", i);
let regions = table_metadata_manager
.topic_region_manager
.regions(&topic)
.await
.unwrap();
assert_eq!(regions.len(), 8);
assert_eq!(regions[0], region_id);
}
}
#[tokio::test]
@@ -1557,12 +1658,18 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let region_wal_options = create_mock_region_wal_options();
let serialized_region_wal_options = region_wal_options
.iter()
.map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
.collect::<HashMap<_, _>>();
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
serialized_region_wal_options,
)
.await
.unwrap();
@@ -1575,12 +1682,22 @@ mod tests {
let table_route_value = &TableRouteValue::physical(region_routes.clone());
// deletes metadata.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, table_route_value)
.delete_table_metadata(
table_id,
&table_name,
table_route_value,
&region_wal_options,
)
.await
.unwrap();
// Should be ignored.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, table_route_value)
.delete_table_metadata(
table_id,
&table_name,
table_route_value,
&region_wal_options,
)
.await
.unwrap();
assert!(table_metadata_manager
@@ -1617,6 +1734,19 @@ mod tests {
.await
.unwrap();
assert!(table_route.is_none());
// Logical delete removes the topic region mapping as well.
let regions = table_metadata_manager
.topic_region_manager
.regions("greptimedb_topic0")
.await
.unwrap();
assert_eq!(regions.len(), 0);
let regions = table_metadata_manager
.topic_region_manager
.regions("greptimedb_topic1")
.await
.unwrap();
assert_eq!(regions.len(), 0);
}
#[tokio::test]
@@ -1633,6 +1763,7 @@ mod tests {
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();
@@ -1705,6 +1836,7 @@ mod tests {
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();
@@ -1790,6 +1922,7 @@ mod tests {
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();
@@ -1870,6 +2003,7 @@ mod tests {
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();
@@ -1980,7 +2114,11 @@ mod tests {
let table_id = 1025;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
let options = [(0, "test".to_string())].into();
let options = create_mock_region_wal_options();
let serialized_options = options
.iter()
.map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
.collect::<HashMap<_, _>>();
table_metadata_manager
.create_table_metadata(
task.table_info,
@@ -2007,7 +2145,7 @@ mod tests {
leader_down_since: None,
},
]),
options,
serialized_options,
)
.await
.unwrap();
@@ -2020,7 +2158,7 @@ mod tests {
.unwrap()
.unwrap();
table_metadata_manager
.destroy_table_metadata(table_id, &table_name, &table_route_value)
.destroy_table_metadata(table_id, &table_name, &table_route_value, &options)
.await
.unwrap();
assert!(mem_kv.is_empty());
@@ -2033,7 +2171,11 @@ mod tests {
let table_id = 1025;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
let options = [(0, "test".to_string())].into();
let options = create_mock_region_wal_options();
let serialized_options = options
.iter()
.map(|(k, v)| (*k, serde_json::to_string(v).unwrap()))
.collect::<HashMap<_, _>>();
table_metadata_manager
.create_table_metadata(
task.table_info,
@@ -2060,7 +2202,7 @@ mod tests {
leader_down_since: None,
},
]),
options,
serialized_options,
)
.await
.unwrap();
@@ -2076,18 +2218,18 @@ mod tests {
let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
let table_route_value = TableRouteValue::physical(region_routes.clone());
table_metadata_manager
.delete_table_metadata(table_id, &table_name, &table_route_value)
.delete_table_metadata(table_id, &table_name, &table_route_value, &options)
.await
.unwrap();
table_metadata_manager
.restore_table_metadata(table_id, &table_name, &table_route_value)
.restore_table_metadata(table_id, &table_name, &table_route_value, &options)
.await
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
// Should be ignored.
table_metadata_manager
.restore_table_metadata(table_id, &table_name, &table_route_value)
.restore_table_metadata(table_id, &table_name, &table_route_value, &options)
.await
.unwrap();
let kvs = mem_kv.dump();

View File

@@ -21,6 +21,7 @@ use snafu::OptionExt;
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use super::table_route::PhysicalTableRouteValue;
use super::MetadataKey;
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
use crate::key::{
@@ -29,7 +30,8 @@ use crate::key::{
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::router::region_distribution;
use crate::rpc::store::{BatchGetRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::DatanodeId;
@@ -172,6 +174,26 @@ impl DatanodeTableManager {
Box::pin(stream)
}
/// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue].
pub async fn regions(
&self,
table_id: TableId,
table_routes: &PhysicalTableRouteValue,
) -> Result<Vec<DatanodeTableValue>> {
let keys = region_distribution(&table_routes.region_routes)
.into_keys()
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<Vec<_>>();
let req = BatchGetRequest {
keys: keys.iter().map(|k| k.to_bytes()).collect(),
};
let resp = self.kv_backend.batch_get(req).await?;
resp.kvs
.into_iter()
.map(datanode_table_value_decoder)
.collect()
}
/// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
pub fn build_create_txn(
&self,

View File

@@ -26,18 +26,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{self, Display};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::ddl::utils::parse_region_wal_options;
use crate::error::{Error, InvalidMetadataSnafu, Result};
use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
use crate::rpc::KeyValue;
// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
#[derive(Debug, Clone, PartialEq)]
pub struct TopicRegionKey<'a> {
pub region_id: RegionId,
@@ -53,7 +60,7 @@ impl<'a> TopicRegionKey<'a> {
}
pub fn range_topic_key(topic: &str) -> String {
format!("{}/{}", TOPIC_REGION_PREFIX, topic)
format!("{}/{}/", TOPIC_REGION_PREFIX, topic)
}
}
@@ -80,7 +87,7 @@ impl Display for TopicRegionKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}",
"{}{}",
Self::range_topic_key(self.topic),
self.region_id.as_u64()
)
@@ -151,6 +158,24 @@ impl TopicRegionManager {
Ok(())
}
pub fn build_create_txn(
&self,
table_id: TableId,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Txn> {
let region_wal_options = parse_region_wal_options(region_wal_options)?;
let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
let topic_region_keys = topic_region_mapping
.iter()
.map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
.collect::<Vec<_>>();
let operations = topic_region_keys
.into_iter()
.map(|key| TxnOp::Put(key.to_bytes(), vec![]))
.collect::<Vec<_>>();
Ok(Txn::new().and_then(operations))
}
/// Returns the list of region ids using specified topic.
pub async fn regions(&self, topic: &str) -> Result<Vec<RegionId>> {
let prefix = TopicRegionKey::range_topic_key(topic);
@@ -169,12 +194,49 @@ impl TopicRegionManager {
self.kv_backend.delete(&raw_key, false).await?;
Ok(())
}
pub async fn batch_delete(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
let req = BatchDeleteRequest {
keys: raw_keys,
prev_kv: false,
};
self.kv_backend.batch_delete(req).await?;
Ok(())
}
/// Retrieves a mapping of [`RegionId`]s to their corresponding topics name
/// based on the provided table ID and WAL options.
///
/// # Returns
/// A vector of tuples, where each tuple contains a [`RegionId`] and its corresponding topic name.
pub fn get_topic_region_mapping<'a>(
&self,
table_id: TableId,
region_wal_options: &'a HashMap<RegionNumber, WalOptions>,
) -> Vec<(RegionId, &'a str)> {
region_wal_options
.keys()
.filter_map(
|region_number| match region_wal_options.get(region_number) {
Some(WalOptions::Kafka(kafka)) => {
let region_id = RegionId::new(table_id, *region_number);
Some((region_id, kafka.topic.as_str()))
}
Some(WalOptions::RaftEngine) => None,
None => None,
},
)
.collect::<Vec<_>>()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_wal::options::KafkaWalOptions;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -220,4 +282,45 @@ mod tests {
key_values.sort_by_key(|id| id.as_u64());
assert_eq!(key_values, expected);
}
#[test]
fn test_topic_region_map() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let region_wal_options = (0..64)
.map(|i| {
let region_number = i;
let wal_options = if i % 2 == 0 {
WalOptions::Kafka(KafkaWalOptions {
topic: format!("topic_{}", i),
})
} else {
WalOptions::RaftEngine
};
(region_number, serde_json::to_string(&wal_options).unwrap())
})
.collect::<HashMap<_, _>>();
let region_wal_options = parse_region_wal_options(&region_wal_options).unwrap();
let mut topic_region_mapping =
manager.get_topic_region_mapping(table_id, &region_wal_options);
let mut expected = (0..64)
.filter_map(|i| {
if i % 2 == 0 {
Some((RegionId::new(table_id, i), format!("topic_{}", i)))
} else {
None
}
})
.collect::<Vec<_>>();
topic_region_mapping.sort_by_key(|(region_id, _)| region_id.as_u64());
let topic_region_map = topic_region_mapping
.iter()
.map(|(region_id, topic)| (*region_id, topic.to_string()))
.collect::<Vec<_>>();
expected.sort_by_key(|(region_id, _)| region_id.as_u64());
assert_eq!(topic_region_map, expected);
}
}

View File

@@ -13,9 +13,9 @@
// limitations under the License.
mod selector;
mod topic_creator;
pub(crate) mod topic_creator;
mod topic_manager;
mod topic_pool;
pub(crate) mod topic_pool;
use std::collections::HashMap;
use std::sync::Arc;