chore: "fix: revert unfinished route table change" (#3009)

Revert "fix: revert unfinished route table change (#3008)"

This reverts commit 8ce8a8f3c7.
This commit is contained in:
LFC
2023-12-27 10:40:59 +08:00
committed by GitHub
parent 7c5c75568d
commit eadde72973
34 changed files with 1039 additions and 877 deletions

716
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@
use std::time::Instant;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
@@ -53,7 +54,11 @@ impl TableMetadataBencher {
let start = Instant::now();
self.table_metadata_manager
.create_table_metadata(table_info, region_routes, region_wal_options)
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();

View File

@@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey};
use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
@@ -153,7 +153,7 @@ impl MigrateTableMetadata {
)
.unwrap();
let new_table_value = NextTableRouteValue::new(table_route.region_routes);
let new_table_value = NextTableRouteValue::physical(table_route.region_routes);
let table_id = table_route.table.id as u32;
let new_key = TableRouteKey::new(table_id);

View File

@@ -21,10 +21,10 @@ use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
pub mod alter_table;
pub mod create_table;
@@ -58,7 +58,7 @@ pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
pub table_route: TableRouteValue,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,

View File

@@ -45,7 +45,6 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
@@ -183,25 +182,23 @@ impl AlterTableProcedure {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
let TableRouteValue { region_routes, .. } = self
let table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();
let leaders = find_leaders(&region_routes);
let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
let regions = find_leader_regions(region_routes, &datanode);
for region in regions {
let region_id = RegionId::new(table_id, region);

View File

@@ -18,9 +18,8 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_config::WAL_OPTIONS_KEY;
use common_error::ext::BoxedError;
use common_procedure::error::{
@@ -40,8 +39,9 @@ use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableInfoNotFoundSnafu};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
@@ -60,13 +60,13 @@ impl CreateTableProcedure {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
}
}
@@ -78,10 +78,12 @@ impl CreateTableProcedure {
opening_regions: vec![],
};
creator
.register_opening_regions(&context)
if let TableRouteValue::Physical(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
Ok(CreateTableProcedure { context, creator })
}
@@ -94,10 +96,6 @@ impl CreateTableProcedure {
self.table_info().ident.table_id
}
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.creator.data.region_routes
}
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}
@@ -132,7 +130,10 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}
pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
@@ -191,16 +192,54 @@ impl CreateTableProcedure {
options: create_table_expr.table_options.clone(),
};
let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
Ok(builder)
Ok(CreateRequestBuilder {
template,
physical_table_id,
})
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&region_routes, request_builder).await
}
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
self.create_regions(region_routes, request_builder).await
}
}
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Registers opening regions
self.creator.register_opening_regions(&self.context)?;
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;
let create_table_expr = &create_table_data.task.create_table;
@@ -208,8 +247,6 @@ impl CreateTableProcedure {
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let mut request_builder = self.new_region_request_builder()?;
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
@@ -221,12 +258,7 @@ impl CreateTableProcedure {
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.build_one(region_id, storage_path.clone(), region_wal_options)
.await?;
requests.push(PbRegionRequest::Create(create_region_request));
@@ -270,10 +302,13 @@ impl CreateTableProcedure {
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.await?;
info!("Created table metadata for table {table_id}");
@@ -329,7 +364,7 @@ impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
@@ -337,21 +372,23 @@ impl TableCreator {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
table_route,
region_wal_options,
},
opening_regions: vec![],
}
}
/// Register opening regions if doesn't exist.
pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> {
let region_routes = &self.data.region_routes;
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
&self,
context: &DdlContext,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(());
return Ok(vec![]);
}
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
@@ -366,9 +403,7 @@ impl TableCreator {
})?;
opening_region_guards.push(guard);
}
self.opening_regions = opening_region_guards;
Ok(())
Ok(opening_region_guards)
}
}
@@ -386,7 +421,7 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}
@@ -399,28 +434,18 @@ impl CreateTableData {
/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
context: DdlContext,
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}
impl CreateRequestBuilder {
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
Self {
context,
template,
physical_table_id: None,
}
}
pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}
async fn build_one(
&mut self,
create_expr: &CreateTableExpr,
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
@@ -438,49 +463,18 @@ impl CreateRequestBuilder {
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
.await?;
}
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
Ok(request)
}
async fn metric_engine_hook(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
request: &mut PbCreateRegionRequest,
) -> Result<()> {
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
let table_id = if let Some(table_id) = self.physical_table_id {
table_id
} else {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_key = TableNameKey::new(
&create_expr.catalog_name,
&create_expr.schema_name,
physical_table_name,
);
let table_id = table_name_manager
.get(table_name_key)
.await?
.context(TableInfoNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();
self.physical_table_id = Some(table_id);
table_id
};
// Concat physical table's table id and corresponding region number to get
// the physical region id.
let physical_region_id = RegionId::new(table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}
Ok(())
Ok(request)
}
}

View File

@@ -307,7 +307,7 @@ impl DropTableData {
}
fn region_routes(&self) -> &Vec<RegionRoute> {
&self.table_route_value.region_routes
self.table_route_value.region_routes()
}
fn table_info(&self) -> &RawTableInfo {

View File

@@ -177,7 +177,7 @@ impl DdlManager {
&self,
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -185,7 +185,7 @@ impl DdlManager {
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
region_routes,
table_route,
region_wal_options,
context,
);
@@ -275,11 +275,10 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let table_route = table_route_value.into_inner().region_routes;
let table_route = table_route_value.into_inner().region_routes().clone();
let id = ddl_manager
.submit_truncate_table_task(
@@ -356,9 +355,8 @@ async fn handle_drop_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let id = ddl_manager
.submit_drop_table_task(
@@ -392,7 +390,7 @@ async fn handle_create_table_task(
let TableMetadata {
table_id,
region_routes,
table_route,
region_wal_options,
} = table_meta;
@@ -402,7 +400,7 @@ async fn handle_create_table_task(
.submit_create_table_task(
cluster_id,
create_table_task,
region_routes,
table_route,
region_wal_options,
)
.await?;

View File

@@ -135,9 +135,9 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Table route not found: {}", table_name))]
#[snafu(display("Failed to find table route for table id {}", table_id))]
TableRouteNotFound {
table_name: String,
table_id: TableId,
location: Location,
},

View File

@@ -147,6 +147,14 @@ pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
Self: Sized;
fn try_as_raw_value(&self) -> Result<Vec<u8>>;
}
pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
pub struct TableMetadataManager {
@@ -221,7 +229,9 @@ impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T
}
}
impl<'de, T: DeserializeOwned + Serialize> Deserialize<'de> for DeserializedValueWithBytes<T> {
impl<'de, T: DeserializeOwned + Serialize + TableMetaValue> Deserialize<'de>
for DeserializedValueWithBytes<T>
{
/// - Deserialize behaviors:
///
/// The `inner` field will be deserialized from the `bytes` field.
@@ -248,11 +258,11 @@ impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithByt
}
}
impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
impl<T: Serialize + DeserializeOwned + TableMetaValue> DeserializedValueWithBytes<T> {
/// Returns a struct containing a deserialized value and an original `bytes`.
/// It accepts original bytes of inner.
pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
let inner = serde_json::from_slice(&bytes).context(error::SerdeJsonSnafu)?;
let inner = T::try_from_raw_value(&bytes)?;
Ok(Self { bytes, inner })
}
@@ -373,13 +383,10 @@ impl TableMetadataManager {
pub async fn create_table_metadata(
&self,
mut table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
table_route_value: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = region_routes
.iter()
.map(|region| region.region.id.region_number())
.collect::<Vec<_>>();
let region_numbers = table_route_value.region_numbers();
table_info.meta.region_numbers = region_numbers;
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.clone();
@@ -403,29 +410,27 @@ impl TableMetadataManager {
.table_info_manager()
.build_create_txn(table_id, &table_info_value)?;
// Creates datanode table key value pairs.
let distribution = region_distribution(&region_routes)?;
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
let mut txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_table_route_txn,
]);
if let TableRouteValue::Physical(x) = &table_route_value {
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
table_id,
&engine,
&region_storage_path,
region_options,
region_wal_options,
distribution,
region_distribution(&x.region_routes)?,
)?;
// Creates table route.
let table_route_value = TableRouteValue::new(region_routes);
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
let txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_datanode_table_txn,
create_table_route_txn,
]);
txn = txn.merge(create_datanode_table_txn);
}
let r = self.kv_backend.txn(txn).await?;
@@ -478,7 +483,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;
// Deletes datanode table key value pairs.
let distribution = region_distribution(&table_route_value.region_routes)?;
let distribution = region_distribution(table_route_value.region_routes())?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
@@ -603,7 +608,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(&current_table_route_value.region_routes)?;
region_distribution(current_table_route_value.region_routes())?;
let new_region_distribution = region_distribution(&new_region_routes)?;
let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
@@ -651,7 +656,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes.clone();
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut updated = 0;
for route in &mut new_region_routes {
@@ -711,12 +716,12 @@ impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey);
macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
$(
impl $val_ty {
pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
impl $crate::key::TableMetaValue for $val_ty {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
}
pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
@@ -744,8 +749,7 @@ macro_rules! impl_optional_meta_value {
impl_table_meta_value! {
TableNameValue,
TableInfoValue,
DatanodeTableValue,
TableRouteValue
DatanodeTableValue
}
impl_optional_meta_value! {
@@ -765,6 +769,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -780,14 +785,14 @@ mod tests {
let region_routes = vec![region_route.clone()];
let expected_region_routes =
TableRouteValue::new(vec![region_route.clone(), region_route.clone()]);
TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
let expected = serde_json::to_vec(&expected_region_routes).unwrap();
// Serialize behaviors:
// The inner field will be ignored.
let value = DeserializedValueWithBytes {
// ignored
inner: TableRouteValue::new(region_routes.clone()),
inner: TableRouteValue::physical(region_routes.clone()),
bytes: Bytes::from(expected.clone()),
};
@@ -831,40 +836,53 @@ mod tests {
test_utils::new_test_table_info(10, region_numbers)
}
async fn create_physical_table_metadata(
table_metadata_manager: &TableMetadataManager,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) -> Result<()> {
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
}
#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
// if metadata was already created, it should be ok.
table_metadata_manager
.create_table_metadata(
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
.is_ok());
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(region_route.clone());
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_table_metadata(
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
modified_region_routes,
HashMap::default()
modified_region_routes
)
.await
.is_err());
@@ -879,7 +897,7 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes,
remote_table_route.unwrap().into_inner().region_routes(),
region_routes
);
}
@@ -889,20 +907,20 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::physical(
region_routes.clone(),
));
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
@@ -960,7 +978,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes, region_routes);
assert_eq!(removed_table_route.region_routes(), region_routes);
}
#[tokio::test]
@@ -973,14 +991,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let new_table_name = "another_name".to_string();
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
@@ -1045,14 +1063,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let mut new_table_info = table_info.clone();
new_table_info.name = "hi".to_string();
let current_table_info_value =
@@ -1123,14 +1141,15 @@ mod tests {
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
@@ -1154,11 +1173,11 @@ mod tests {
.unwrap();
assert_eq!(
updated_route_value.region_routes[0].leader_status,
updated_route_value.region_routes()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes[1].leader_status,
updated_route_value.region_routes()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
@@ -1193,17 +1212,19 @@ mod tests {
let engine = table_info.meta.engine.as_str();
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
// creates metadata.
table_metadata_manager
.create_table_metadata(
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
let new_region_routes = vec![
new_region_route(1, 1),

View File

@@ -24,7 +24,8 @@ use table::metadata::TableId;
use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::key::{
RegionDistribution, TableMetaKey, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX,
RegionDistribution, TableMetaKey, TableMetaValue, DATANODE_TABLE_KEY_PATTERN,
DATANODE_TABLE_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::{DeserializedValueWithBytes, TABLE_INFO_KEY_PREFIX};
use super::{DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use crate::error::{Error, InvalidTableMetadataSnafu, Result};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;

View File

@@ -71,8 +71,8 @@ impl_table_meta_value! {TableRegionValue}
#[cfg(test)]
mod tests {
use super::*;
use crate::key::TableMetaValue;
#[test]
fn test_serde() {

View File

@@ -16,11 +16,12 @@ use std::collections::HashMap;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use super::DeserializedValueWithBytes;
use crate::error::Result;
use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
@@ -38,41 +39,120 @@ impl TableRouteKey {
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct TableRouteValue {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct PhysicalTableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
}
impl TableRouteValue {
pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
})
}
/// Returns the version.
///
/// For test purpose.
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.physical_table_route().version
}
/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}
/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
}
fn physical_table_route(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
pub fn region_numbers(&self) -> Vec<RegionNumber> {
match self {
TableRouteValue::Physical(x) => x
.region_routes
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect::<Vec<_>>(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect::<Vec<_>>(),
}
}
}
impl TableMetaValue for TableRouteValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
let r = serde_json::from_slice::<TableRouteValue>(raw_value);
match r {
// Compatible with old TableRouteValue.
Err(e) if e.is_data() => Ok(Self::Physical(
serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
.context(SerdeJsonSnafu)?,
)),
Ok(x) => Ok(x),
Err(e) => Err(e).context(SerdeJsonSnafu),
}
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: 0,
}
}
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: self.version + 1,
}
}
/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
impl LogicalTableRouteValue {
pub fn physical_table_id(&self) -> TableId {
todo!()
}
/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
pub fn region_ids(&self) -> Vec<RegionId> {
todo!()
}
}
@@ -269,7 +349,24 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.map(|table_route| region_distribution(table_route.region_routes()))
.transpose()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
let new_raw_v = format!("{:?}", v);
assert_eq!(
new_raw_v,
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }], version: 0 })"#
);
}
}

View File

@@ -18,10 +18,14 @@ use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::region::check_response_header;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -34,7 +38,7 @@ use common_telemetry::{debug, info, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
@@ -151,17 +155,29 @@ impl StandaloneTableMetadataAllocator {
};
Ok(table_id)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
fn create_table_route(table_id: TableId, task: &CreateTableTask) -> TableRouteValue {
if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let region_routes = task
.partitions
.iter()
@@ -182,13 +198,22 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
}
}
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_route = create_table_route(table_id, task);
let region_wal_options = self.create_wal_options(&table_route)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -197,8 +222,8 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
Ok(TableMetadata {
table_id,
region_routes,
region_wal_options: HashMap::default(),
table_route,
region_wal_options,
})
}
}

View File

@@ -104,6 +104,7 @@ mod test {
use std::sync::Arc;
use common_meta::distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -161,7 +162,11 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
@@ -303,7 +308,11 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -207,7 +207,7 @@ mod tests {
.unwrap();
let should_downgraded = table_route_value
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();

View File

@@ -85,7 +85,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;
let mut new_region_routes = table_route_value.region_routes.clone();
let mut new_region_routes = table_route_value.region_routes().clone();
for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
@@ -233,7 +233,8 @@ mod tests {
.unwrap()
.unwrap()
.into_inner()
.region_routes
.region_routes()
.clone()
}
// Original region routes:
@@ -395,8 +396,8 @@ mod tests {
.unwrap()
.into_inner();
let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
@@ -415,7 +416,7 @@ mod tests {
.unwrap()
.into_inner();
let map = region_distribution(&table_route_value.region_routes).unwrap();
let map = region_distribution(table_route_value.region_routes()).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));

View File

@@ -84,7 +84,7 @@ impl RegionMigrationStart {
let table_route = ctx.get_table_route_value().await?;
let region_route = table_route
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id == region_id)
.cloned()
@@ -137,7 +137,6 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -187,10 +186,8 @@ mod tests {
..Default::default()
};
env.table_metadata_manager()
.create_table_metadata(table_info, vec![region_route], HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, vec![region_route])
.await;
let err = state
.retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
@@ -221,10 +218,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -254,10 +249,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -281,10 +274,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -187,6 +187,7 @@ mod tests {
use std::assert_matches::assert_matches;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -409,7 +410,11 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -22,6 +22,7 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -144,6 +145,22 @@ impl TestingEnv {
provider: Arc::new(MockContextProvider::default()),
}
}
// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) {
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}
}
/// Generates a [InstructionReply::OpenRegion] reply.
@@ -369,7 +386,11 @@ impl ProcedureMigrationTestSuite {
) {
self.env
.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}
@@ -377,7 +398,7 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let region_routes = self
let table_route = self
.env
.table_metadata_manager
.table_route_manager()
@@ -385,22 +406,25 @@ impl ProcedureMigrationTestSuite {
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
.into_inner();
let region_routes = table_route.region_routes();
let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;
let region_route = region_routes
.into_iter()
.iter()
.find(|route| route.region.id == region_id)
.unwrap();
assert!(!region_route.is_leader_downgraded());
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert!(!region_route
.follower_peers
.into_iter()
.iter()
.any(|route| route.id == removed_follower_id))
}
}

View File

@@ -74,7 +74,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -136,12 +135,10 @@ mod tests {
},
];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -190,11 +187,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -212,7 +208,7 @@ mod tests {
// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(!latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
@@ -233,11 +229,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -253,7 +248,7 @@ mod tests {
.unwrap()
.unwrap();
assert!(latest_table_route.region_routes[0].is_leader_downgraded());
assert!(latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}

View File

@@ -59,7 +59,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -128,12 +127,10 @@ mod tests {
region_routes
};
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let old_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -166,15 +163,14 @@ mod tests {
state.rollback_downgraded_region(&mut ctx).await.unwrap();
let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}
#[tokio::test]
@@ -214,11 +210,10 @@ mod tests {
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -229,14 +224,13 @@ mod tests {
assert!(ctx.volatile_ctx.table_route.is_none());
let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}
}

View File

@@ -33,7 +33,7 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let mut region_routes = table_route_value.region_routes.clone();
let mut region_routes = table_route_value.region_routes().clone();
let region_route = region_routes
.iter_mut()
.find(|route| route.region.id == region_id)
@@ -81,7 +81,7 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let region_routes = table_route_value.region_routes.clone();
let region_routes = table_route_value.region_routes().clone();
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
@@ -176,7 +176,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -225,11 +224,8 @@ mod tests {
..Default::default()
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -254,11 +250,8 @@ mod tests {
..Default::default()
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -285,11 +278,8 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let new_region_routes = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -326,12 +316,10 @@ mod tests {
},
];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -385,11 +373,8 @@ mod tests {
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(!updated);
@@ -411,11 +396,8 @@ mod tests {
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(updated);
@@ -437,11 +419,8 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
@@ -470,24 +449,23 @@ mod tests {
.unwrap();
ctx.volatile_ctx.opening_region_guard = Some(guard);
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
.into_inner();
let region_routes = table_route.region_routes();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(ctx.volatile_ctx.opening_region_guard.is_none());

View File

@@ -100,12 +100,12 @@ fn test_region_request_builder() {
let procedure = CreateTableProcedure::new(
1,
create_table_task(),
test_data::new_region_routes(),
TableRouteValue::physical(test_data::new_region_routes()),
HashMap::default(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);
let template = procedure.new_region_request_builder().unwrap();
let template = procedure.new_region_request_builder(None).unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
@@ -191,7 +191,7 @@ async fn test_on_datanode_create_regions() {
let mut procedure = CreateTableProcedure::new(
1,
create_table_task(),
region_routes,
TableRouteValue::physical(region_routes),
HashMap::default(),
test_data::new_ddl_context(datanode_manager),
);
@@ -247,7 +247,7 @@ async fn test_on_datanode_drop_regions() {
let procedure = DropTableProcedure::new(
1,
drop_table_task,
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes)),
DeserializedValueWithBytes::from_inner(TableRouteValue::physical(region_routes)),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(datanode_manager),
);
@@ -373,7 +373,7 @@ async fn test_submit_alter_region_requests() {
.table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await

View File

@@ -188,6 +188,7 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -291,7 +292,11 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
@@ -378,7 +383,11 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -143,7 +143,7 @@ async fn get_leader_peer_ids(
.context(error::TableMetadataManagerSnafu)
.map(|route| {
route.map_or_else(Vec::new, |route| {
find_leaders(&route.region_routes)
find_leaders(route.region_routes())
.into_iter()
.map(|peer| peer.id)
.collect()

View File

@@ -12,17 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use std::collections::HashMap;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
use common_telemetry::{debug, warn};
use common_meta::ClusterId;
use common_telemetry::debug;
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
use store_api::storage::{RegionId, RegionNumber, TableId, MAX_REGION_SEQ};
use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
@@ -49,6 +55,83 @@ impl MetaSrvTableMetadataAllocator {
wal_options_allocator,
}
}
async fn create_table_route(
&self,
cluster_id: ClusterId,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let regions = task.partitions.len();
ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
let mut peers = self
.selector
.select(
cluster_id,
&self.ctx,
SelectorOptions {
min_required_items: regions,
allow_duplication: true,
},
)
.await?;
ensure!(
peers.len() >= regions,
error::NoEnoughAvailableDatanodeSnafu {
required: regions,
available: peers.len(),
}
);
peers.truncate(regions);
let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as RegionNumber),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
#[async_trait::async_trait]
@@ -58,23 +141,15 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let (table_id, region_routes) = handle_create_region_routes(
ctx.cluster_id,
task,
&self.ctx,
&self.selector,
&self.table_id_sequence,
)
let table_id = self.table_id_sequence.next().await? as TableId;
let table_route = self
.create_table_route(ctx.cluster_id, table_id, task)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
.context(ExternalSnafu)?;
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
let region_wal_options = self.create_wal_options(&table_route)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -83,84 +158,8 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
Ok(TableMetadata {
table_id,
region_routes,
table_route,
region_wal_options,
})
}
}
/// pre-allocates create table's table id and region routes.
async fn handle_create_region_routes(
cluster_id: u64,
task: &CreateTableTask,
ctx: &SelectorContext,
selector: &SelectorRef,
table_id_sequence: &SequenceRef,
) -> Result<(TableId, Vec<RegionRoute>)> {
let table_info = &task.table_info;
let partitions = &task.partitions;
let mut peers = selector
.select(
cluster_id,
ctx,
SelectorOptions {
min_required_items: partitions.len(),
allow_duplication: true,
},
)
.await?;
if peers.len() < partitions.len() {
warn!(
"Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}",
format_full_table_name(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name
),
partitions.len(),
peers.len()
);
return error::NoEnoughAvailableDatanodeSnafu {
required: partitions.len(),
available: peers.len(),
}
.fail();
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
let table_id = table_id_sequence
.next()
.await
.context(error::NextSequenceSnafu)? as u32;
ensure!(
partitions.len() <= MAX_REGION_SEQ as usize,
TooManyPartitionsSnafu
);
let region_routes = partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
follower_peers: vec![], // follower_peers is not supported at the moment
leader_status: None,
}
})
.collect::<Vec<_>>();
Ok((table_id, region_routes))
}

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -145,7 +146,11 @@ pub(crate) async fn prepare_table_region_and_info_value(
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}

View File

@@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -114,7 +115,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", regions.clone().into_iter()).into(),
vec![
TableRouteValue::physical(vec![
RegionRoute {
region: Region {
id: 3.into(),
@@ -169,7 +170,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
],
]),
region_wal_options.clone(),
)
.await
@@ -178,7 +179,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", regions.clone().into_iter()).into(),
vec![
TableRouteValue::physical(vec![
RegionRoute {
region: Region {
id: 1.into(),
@@ -239,7 +240,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
],
]),
region_wal_options,
)
.await

View File

@@ -19,7 +19,7 @@ use api::v1::Rows;
use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes};
use common_meta::rpc::router::RegionRoutes;
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
@@ -76,56 +76,7 @@ impl PartitionRuleManager {
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
Ok(RegionRoutes(route.region_routes))
}
/// Find datanodes of corresponding regions of given table.
pub async fn find_region_datanodes(
&self,
table_id: TableId,
regions: Vec<RegionNumber>,
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut datanodes = HashMap::with_capacity(regions.len());
let region_map = convert_to_region_leader_map(&route.region_routes);
for region in regions.iter() {
let datanode = *region_map.get(region).context(error::FindDatanodeSnafu {
table_id,
region: *region,
})?;
datanodes
.entry(datanode.clone())
.or_insert_with(Vec::new)
.push(*region);
}
Ok(datanodes)
}
/// Find all leader peers of given table.
pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result<Vec<Peer>> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut peers = Vec::with_capacity(route.region_routes.len());
for peer in &route.region_routes {
peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu {
region_id: peer.region.id,
table_id,
})?);
}
Ok(peers)
Ok(RegionRoutes(route.region_routes().clone()))
}
pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
@@ -136,13 +87,15 @@ impl PartitionRuleManager {
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let region_routes = route.region_routes();
ensure!(
!route.region_routes.is_empty(),
!region_routes.is_empty(),
error::FindTableRoutesSnafu { table_id }
);
let mut partitions = Vec::with_capacity(route.region_routes.len());
for r in route.region_routes.iter() {
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
let partition = r
.region
.partition

View File

@@ -521,7 +521,7 @@ CREATE TABLE {table_name} (
.unwrap()
.into_inner();
let region_to_dn_map = region_distribution(&table_route_value.region_routes)
let region_to_dn_map = region_distribution(table_route_value.region_routes())
.unwrap()
.iter()
.map(|(k, v)| (v[0], *k))

View File

@@ -216,7 +216,7 @@ mod tests {
.unwrap()
.into_inner();
let region_to_dn_map = region_distribution(&table_route_value.region_routes)
let region_to_dn_map = region_distribution(table_route_value.region_routes())
.unwrap()
.iter()
.map(|(k, v)| (v[0], *k))