fix: revert unfinished route table change (#3008)

* Revert "refactor: hide `RegionRoute` behind `TableRouteValue` (#2989)"

This reverts commit 1641fd572a.

* Revert "feat: MetricsEngine table route (part 1) (#2952)"

This reverts commit 6ac47e939c.
This commit is contained in:
Ruihang Xia
2023-12-26 17:56:49 +08:00
committed by GitHub
parent bf635a6c7c
commit 8ce8a8f3c7
34 changed files with 875 additions and 1037 deletions

View File

@@ -14,7 +14,6 @@
use std::time::Instant;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
@@ -54,11 +53,7 @@ impl TableMetadataBencher {
let start = Instant::now();
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.create_table_metadata(table_info, region_routes, region_wal_options)
.await
.unwrap();

View File

@@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue};
use common_meta::key::{RegionDistribution, TableMetaKey};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
@@ -153,7 +153,7 @@ impl MigrateTableMetadata {
)
.unwrap();
let new_table_value = NextTableRouteValue::physical(table_route.region_routes);
let new_table_value = NextTableRouteValue::new(table_route.region_routes);
let table_id = table_route.table.id as u32;
let new_key = TableRouteKey::new(table_id);

View File

@@ -21,10 +21,10 @@ use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
pub mod alter_table;
pub mod create_table;
@@ -58,7 +58,7 @@ pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub table_route: TableRouteValue,
pub region_routes: Vec<RegionRoute>,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,

View File

@@ -45,6 +45,7 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
@@ -182,23 +183,25 @@ impl AlterTableProcedure {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
let table_route = self
let TableRouteValue { region_routes, .. } = self
.context
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.into_inner();
let region_routes = table_route.region_routes();
let leaders = find_leaders(region_routes);
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let regions = find_leader_regions(&region_routes, &datanode);
for region in regions {
let region_id = RegionId::new(table_id, region);

View File

@@ -18,8 +18,9 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, SemanticType};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_config::WAL_OPTIONS_KEY;
use common_error::ext::BoxedError;
use common_procedure::error::{
@@ -39,9 +40,8 @@ use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::error::{self, Result, TableInfoNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
@@ -60,13 +60,13 @@ impl CreateTableProcedure {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
table_route: TableRouteValue,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
}
}
@@ -78,12 +78,10 @@ impl CreateTableProcedure {
opening_regions: vec![],
};
if let TableRouteValue::Physical(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
creator
.register_opening_regions(&context)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(CreateTableProcedure { context, creator })
}
@@ -96,6 +94,10 @@ impl CreateTableProcedure {
self.table_info().ident.table_id
}
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.creator.data.region_routes
}
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}
@@ -130,10 +132,7 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
@@ -192,54 +191,16 @@ impl CreateTableProcedure {
options: create_table_expr.table_options.clone(),
};
Ok(CreateRequestBuilder {
template,
physical_table_id,
})
let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
Ok(builder)
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&region_routes, request_builder).await
}
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
self.create_regions(region_routes, request_builder).await
}
}
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}
self.creator.register_opening_regions(&self.context)?;
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;
let create_table_expr = &create_table_data.task.create_table;
@@ -247,6 +208,8 @@ impl CreateTableProcedure {
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let mut request_builder = self.new_region_request_builder()?;
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
@@ -258,7 +221,12 @@ impl CreateTableProcedure {
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(region_id, storage_path.clone(), region_wal_options)
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.await?;
requests.push(PbRegionRequest::Create(create_region_request));
@@ -302,13 +270,10 @@ impl CreateTableProcedure {
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");
@@ -364,7 +329,7 @@ impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
table_route: TableRouteValue,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
@@ -372,23 +337,21 @@ impl TableCreator {
state: CreateTableState::Prepare,
cluster_id,
task,
table_route,
region_routes,
region_wal_options,
},
opening_regions: vec![],
}
}
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
&self,
context: &DdlContext,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
/// Register opening regions if doesn't exist.
pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> {
let region_routes = &self.data.region_routes;
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(vec![]);
return Ok(());
}
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
@@ -403,7 +366,9 @@ impl TableCreator {
})?;
opening_region_guards.push(guard);
}
Ok(opening_region_guards)
self.opening_regions = opening_region_guards;
Ok(())
}
}
@@ -421,7 +386,7 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
table_route: TableRouteValue,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}
@@ -434,18 +399,28 @@ impl CreateTableData {
/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
context: DdlContext,
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}
impl CreateRequestBuilder {
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
Self {
context,
template,
physical_table_id: None,
}
}
pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}
async fn build_one(
&self,
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
@@ -463,18 +438,49 @@ impl CreateRequestBuilder {
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
.await?;
}
Ok(request)
}
async fn metric_engine_hook(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
request: &mut PbCreateRegionRequest,
) -> Result<()> {
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
let table_id = if let Some(table_id) = self.physical_table_id {
table_id
} else {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_key = TableNameKey::new(
&create_expr.catalog_name,
&create_expr.schema_name,
physical_table_name,
);
let table_id = table_name_manager
.get(table_name_key)
.await?
.context(TableInfoNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();
self.physical_table_id = Some(table_id);
table_id
};
// Concat physical table's table id and corresponding region number to get
// the physical region id.
let physical_region_id = RegionId::new(table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}
Ok(request)
Ok(())
}
}

View File

@@ -307,7 +307,7 @@ impl DropTableData {
}
fn region_routes(&self) -> &Vec<RegionRoute> {
self.table_route_value.region_routes()
&self.table_route_value.region_routes
}
fn table_info(&self) -> &RawTableInfo {

View File

@@ -177,7 +177,7 @@ impl DdlManager {
&self,
cluster_id: u64,
create_table_task: CreateTableTask,
table_route: TableRouteValue,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -185,7 +185,7 @@ impl DdlManager {
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
table_route,
region_routes,
region_wal_options,
context,
);
@@ -275,10 +275,11 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes;
let id = ddl_manager
.submit_truncate_table_task(
@@ -355,8 +356,9 @@ async fn handle_drop_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let id = ddl_manager
.submit_drop_table_task(
@@ -390,7 +392,7 @@ async fn handle_create_table_task(
let TableMetadata {
table_id,
table_route,
region_routes,
region_wal_options,
} = table_meta;
@@ -400,7 +402,7 @@ async fn handle_create_table_task(
.submit_create_table_task(
cluster_id,
create_table_task,
table_route,
region_routes,
region_wal_options,
)
.await?;

View File

@@ -135,9 +135,9 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Failed to find table route for table id {}", table_id))]
#[snafu(display("Table route not found: {}", table_name))]
TableRouteNotFound {
table_id: TableId,
table_name: String,
location: Location,
},

View File

@@ -147,14 +147,6 @@ pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
Self: Sized;
fn try_as_raw_value(&self) -> Result<Vec<u8>>;
}
pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
pub struct TableMetadataManager {
@@ -229,9 +221,7 @@ impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T
}
}
impl<'de, T: DeserializeOwned + Serialize + TableMetaValue> Deserialize<'de>
for DeserializedValueWithBytes<T>
{
impl<'de, T: DeserializeOwned + Serialize> Deserialize<'de> for DeserializedValueWithBytes<T> {
/// - Deserialize behaviors:
///
/// The `inner` field will be deserialized from the `bytes` field.
@@ -258,11 +248,11 @@ impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithByt
}
}
impl<T: Serialize + DeserializeOwned + TableMetaValue> DeserializedValueWithBytes<T> {
impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
/// Returns a struct containing a deserialized value and an original `bytes`.
/// It accepts original bytes of inner.
pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
let inner = T::try_from_raw_value(&bytes)?;
let inner = serde_json::from_slice(&bytes).context(error::SerdeJsonSnafu)?;
Ok(Self { bytes, inner })
}
@@ -383,10 +373,13 @@ impl TableMetadataManager {
pub async fn create_table_metadata(
&self,
mut table_info: RawTableInfo,
table_route_value: TableRouteValue,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = table_route_value.region_numbers();
let region_numbers = region_routes
.iter()
.map(|region| region.region.id.region_number())
.collect::<Vec<_>>();
table_info.meta.region_numbers = region_numbers;
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.clone();
@@ -410,28 +403,30 @@ impl TableMetadataManager {
.table_info_manager()
.build_create_txn(table_id, &table_info_value)?;
// Creates datanode table key value pairs.
let distribution = region_distribution(&region_routes)?;
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
table_id,
&engine,
&region_storage_path,
region_options,
region_wal_options,
distribution,
)?;
// Creates table route.
let table_route_value = TableRouteValue::new(region_routes);
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
let mut txn = Txn::merge_all(vec![
let txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_datanode_table_txn,
create_table_route_txn,
]);
if let TableRouteValue::Physical(x) = &table_route_value {
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
table_id,
&engine,
&region_storage_path,
region_options,
region_wal_options,
region_distribution(&x.region_routes)?,
)?;
txn = txn.merge(create_datanode_table_txn);
}
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already created.
@@ -483,7 +478,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;
// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(&table_route_value.region_routes)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
@@ -608,7 +603,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(&current_table_route_value.region_routes)?;
let new_region_distribution = region_distribution(&new_region_routes)?;
let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
@@ -656,7 +651,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes.clone();
let mut updated = 0;
for route in &mut new_region_routes {
@@ -716,12 +711,12 @@ impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey);
macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
$(
impl $crate::key::TableMetaValue for $val_ty {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
impl $val_ty {
pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
@@ -749,7 +744,8 @@ macro_rules! impl_optional_meta_value {
impl_table_meta_value! {
TableNameValue,
TableInfoValue,
DatanodeTableValue
DatanodeTableValue,
TableRouteValue
}
impl_optional_meta_value! {
@@ -769,7 +765,6 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -785,14 +780,14 @@ mod tests {
let region_routes = vec![region_route.clone()];
let expected_region_routes =
TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
TableRouteValue::new(vec![region_route.clone(), region_route.clone()]);
let expected = serde_json::to_vec(&expected_region_routes).unwrap();
// Serialize behaviors:
// The inner field will be ignored.
let value = DeserializedValueWithBytes {
// ignored
inner: TableRouteValue::physical(region_routes.clone()),
inner: TableRouteValue::new(region_routes.clone()),
bytes: Bytes::from(expected.clone()),
};
@@ -836,56 +831,43 @@ mod tests {
test_utils::new_test_table_info(10, region_numbers)
}
async fn create_physical_table_metadata(
table_metadata_manager: &TableMetadataManager,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) -> Result<()> {
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
}
#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
// if metadata was already created, it should be ok.
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.is_ok());
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(region_route.clone());
// if remote metadata was exists, it should return an error.
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
modified_region_routes
)
.await
.is_err());
assert!(table_metadata_manager
.create_table_metadata(
table_info.clone(),
modified_region_routes,
HashMap::default()
)
.await
.is_err());
let (remote_table_info, remote_table_route) = table_metadata_manager
.get_full_table_info(10)
@@ -897,7 +879,7 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes(),
remote_table_route.unwrap().into_inner().region_routes,
region_routes
);
}
@@ -907,23 +889,23 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::physical(
region_routes.clone(),
));
let table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
@@ -978,7 +960,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes(), region_routes);
assert_eq!(removed_table_route.region_routes, region_routes);
}
#[tokio::test]
@@ -991,14 +973,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let new_table_name = "another_name".to_string();
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
@@ -1063,14 +1045,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let mut new_table_info = table_info.clone();
new_table_info.name = "hi".to_string();
let current_table_info_value =
@@ -1141,18 +1123,17 @@ mod tests {
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |region_route| {
@@ -1173,11 +1154,11 @@ mod tests {
.unwrap();
assert_eq!(
updated_route_value.region_routes()[0].leader_status,
updated_route_value.region_routes[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes()[1].leader_status,
updated_route_value.region_routes[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
@@ -1212,19 +1193,17 @@ mod tests {
let engine = table_info.meta.engine.as_str();
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
let new_region_routes = vec![
new_region_route(1, 1),

View File

@@ -24,8 +24,7 @@ use table::metadata::TableId;
use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::key::{
RegionDistribution, TableMetaKey, TableMetaValue, DATANODE_TABLE_KEY_PATTERN,
DATANODE_TABLE_KEY_PREFIX,
RegionDistribution, TableMetaKey, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::{DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use super::{DeserializedValueWithBytes, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use crate::error::{Error, InvalidTableMetadataSnafu, Result};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;

View File

@@ -71,8 +71,8 @@ impl_table_meta_value! {TableRegionValue}
#[cfg(test)]
mod tests {
use super::*;
use crate::key::TableMetaValue;
#[test]
fn test_serde() {

View File

@@ -16,12 +16,11 @@ use std::collections::HashMap;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionId;
use table::metadata::TableId;
use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use super::DeserializedValueWithBytes;
use crate::error::Result;
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
@@ -39,120 +38,41 @@ impl TableRouteKey {
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct PhysicalTableRouteValue {
pub struct TableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
}
impl TableRouteValue {
pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
})
}
/// Returns the version.
///
/// For test purpose.
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.physical_table_route().version
}
/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}
/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
}
fn physical_table_route(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
pub fn region_numbers(&self) -> Vec<RegionNumber> {
match self {
TableRouteValue::Physical(x) => x
.region_routes
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect::<Vec<_>>(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect::<Vec<_>>(),
}
}
}
impl TableMetaValue for TableRouteValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
let r = serde_json::from_slice::<TableRouteValue>(raw_value);
match r {
// Compatible with old TableRouteValue.
Err(e) if e.is_data() => Ok(Self::Physical(
serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
.context(SerdeJsonSnafu)?,
)),
Ok(x) => Ok(x),
Err(e) => Err(e).context(SerdeJsonSnafu),
}
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: 0,
}
}
}
impl LogicalTableRouteValue {
pub fn physical_table_id(&self) -> TableId {
todo!()
/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: self.version + 1,
}
}
pub fn region_ids(&self) -> Vec<RegionId> {
todo!()
/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
}
/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}
}
@@ -349,24 +269,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(table_route.region_routes()))
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.transpose()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
let new_raw_v = format!("{:?}", v);
assert_eq!(
new_raw_v,
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }], version: 0 })"#
);
}
}

View File

@@ -18,14 +18,10 @@ use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::region::check_response_header;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -38,7 +34,7 @@ use common_telemetry::{debug, info, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber, TableId};
use store_api::storage::{RegionId, TableId};
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
@@ -155,29 +151,17 @@ impl StandaloneTableMetadataAllocator {
};
Ok(table_id)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
fn create_table_route(table_id: TableId, task: &CreateTableTask) -> TableRouteValue {
if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let region_routes = task
.partitions
.iter()
@@ -198,22 +182,13 @@ fn create_table_route(table_id: TableId, task: &CreateTableTask) -> TableRouteVa
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
}
}
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_route = create_table_route(table_id, task);
let region_wal_options = self.create_wal_options(&table_route)?;
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -222,8 +197,8 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
Ok(TableMetadata {
table_id,
table_route,
region_wal_options,
region_routes,
region_wal_options: HashMap::default(),
})
}
}

View File

@@ -104,7 +104,6 @@ mod test {
use std::sync::Arc;
use common_meta::distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -162,11 +161,7 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -308,11 +303,7 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -207,7 +207,7 @@ mod tests {
.unwrap();
let should_downgraded = table_route_value
.region_routes()
.region_routes
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();

View File

@@ -85,7 +85,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;
let mut new_region_routes = table_route_value.region_routes().clone();
let mut new_region_routes = table_route_value.region_routes.clone();
for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
@@ -233,8 +233,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner()
.region_routes()
.clone()
.region_routes
}
// Original region routes:
@@ -396,8 +395,8 @@ mod tests {
.unwrap()
.into_inner();
let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
@@ -416,7 +415,7 @@ mod tests {
.unwrap()
.into_inner();
let map = region_distribution(table_route_value.region_routes()).unwrap();
let map = region_distribution(&table_route_value.region_routes).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));

View File

@@ -84,7 +84,7 @@ impl RegionMigrationStart {
let table_route = ctx.get_table_route_value().await?;
let region_route = table_route
.region_routes()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
@@ -137,6 +137,7 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -186,8 +187,10 @@ mod tests {
..Default::default()
};
env.create_physical_table_metadata(table_info, vec![region_route])
.await;
env.table_metadata_manager()
.create_table_metadata(table_info, vec![region_route], HashMap::default())
.await
.unwrap();
let err = state
.retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
@@ -218,8 +221,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -249,8 +254,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -274,8 +281,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -187,7 +187,6 @@ mod tests {
use std::assert_matches::assert_matches;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -410,11 +409,7 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -22,7 +22,6 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -145,22 +144,6 @@ impl TestingEnv {
provider: Arc::new(MockContextProvider::default()),
}
}
// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) {
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}
}
/// Generates a [InstructionReply::OpenRegion] reply.
@@ -386,11 +369,7 @@ impl ProcedureMigrationTestSuite {
) {
self.env
.table_metadata_manager()
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
}
@@ -398,7 +377,7 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let table_route = self
let region_routes = self
.env
.table_metadata_manager
.table_route_manager()
@@ -406,25 +385,22 @@ impl ProcedureMigrationTestSuite {
.await
.unwrap()
.unwrap()
.into_inner();
let region_routes = table_route.region_routes();
.into_inner()
.region_routes;
let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;
let region_route = region_routes
.iter()
.into_iter()
.find(|route| route.region.id == region_id)
.unwrap();
assert!(!region_route.is_leader_downgraded());
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert!(!region_route
.follower_peers
.iter()
.into_iter()
.any(|route| route.id == removed_follower_id))
}
}

View File

@@ -74,6 +74,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -135,10 +136,12 @@ mod tests {
},
];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -187,10 +190,11 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -208,7 +212,7 @@ mod tests {
// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
@@ -229,10 +233,11 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -248,7 +253,7 @@ mod tests {
.unwrap()
.unwrap();
assert!(latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}

View File

@@ -59,6 +59,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -127,10 +128,12 @@ mod tests {
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let old_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -163,14 +166,15 @@ mod tests {
state.rollback_downgraded_region(&mut ctx).await.unwrap();
let table_route = table_metadata_manager
let region_routes = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
}
#[tokio::test]
@@ -210,10 +214,11 @@ mod tests {
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -224,13 +229,14 @@ mod tests {
assert!(ctx.volatile_ctx.table_route.is_none());
let table_route = table_metadata_manager
let region_routes = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
}
}

View File

@@ -33,7 +33,7 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let mut region_routes = table_route_value.region_routes().clone();
let mut region_routes = table_route_value.region_routes.clone();
let region_route = region_routes
.iter_mut()
.find(|route| route.region.id == region_id)
@@ -81,7 +81,7 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let region_routes = table_route_value.region_routes().clone();
let region_routes = table_route_value.region_routes.clone();
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
@@ -176,6 +176,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -224,8 +225,11 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -250,8 +254,11 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -278,8 +285,11 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let new_region_routes = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -316,10 +326,12 @@ mod tests {
},
];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -373,8 +385,11 @@ mod tests {
leader_status: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(!updated);
@@ -396,8 +411,11 @@ mod tests {
leader_status: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(updated);
@@ -419,8 +437,11 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
@@ -449,23 +470,24 @@ mod tests {
.unwrap();
ctx.volatile_ctx.opening_region_guard = Some(guard);
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
let table_route = table_metadata_manager
let region_routes = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner();
let region_routes = table_route.region_routes();
.into_inner()
.region_routes;
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(ctx.volatile_ctx.opening_region_guard.is_none());

View File

@@ -100,12 +100,12 @@ fn test_region_request_builder() {
let procedure = CreateTableProcedure::new(
1,
create_table_task(),
TableRouteValue::physical(test_data::new_region_routes()),
test_data::new_region_routes(),
HashMap::default(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);
let template = procedure.new_region_request_builder(None).unwrap();
let template = procedure.new_region_request_builder().unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
@@ -191,7 +191,7 @@ async fn test_on_datanode_create_regions() {
let mut procedure = CreateTableProcedure::new(
1,
create_table_task(),
TableRouteValue::physical(region_routes),
region_routes,
HashMap::default(),
test_data::new_ddl_context(datanode_manager),
);
@@ -247,7 +247,7 @@ async fn test_on_datanode_drop_regions() {
let procedure = DropTableProcedure::new(
1,
drop_table_task,
DeserializedValueWithBytes::from_inner(TableRouteValue::physical(region_routes)),
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes)),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(datanode_manager),
);
@@ -373,7 +373,7 @@ async fn test_submit_alter_region_requests() {
.table_metadata_manager
.create_table_metadata(
table_info.clone(),
TableRouteValue::physical(region_routes),
region_routes.clone(),
HashMap::default(),
)
.await

View File

@@ -188,7 +188,6 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -292,11 +291,7 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.await
.unwrap();
@@ -383,11 +378,7 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.await
.unwrap();

View File

@@ -143,7 +143,7 @@ async fn get_leader_peer_ids(
.context(error::TableMetadataManagerSnafu)
.map(|route| {
route.map_or_else(Vec::new, |route| {
find_leaders(route.region_routes())
find_leaders(&route.region_routes)
.into_iter()
.map(|peer| peer.id)
.collect()

View File

@@ -12,23 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_catalog::consts::METRIC_ENGINE;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
use common_meta::ClusterId;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber, TableId, MAX_REGION_SEQ};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
@@ -55,83 +49,6 @@ impl MetaSrvTableMetadataAllocator {
wal_options_allocator,
}
}
async fn create_table_route(
&self,
cluster_id: ClusterId,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let regions = task.partitions.len();
ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
let mut peers = self
.selector
.select(
cluster_id,
&self.ctx,
SelectorOptions {
min_required_items: regions,
allow_duplication: true,
},
)
.await?;
ensure!(
peers.len() >= regions,
error::NoEnoughAvailableDatanodeSnafu {
required: regions,
available: peers.len(),
}
);
peers.truncate(regions);
let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as RegionNumber),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
#[async_trait::async_trait]
@@ -141,15 +58,23 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.table_id_sequence.next().await? as TableId;
let (table_id, region_routes) = handle_create_region_routes(
ctx.cluster_id,
task,
&self.ctx,
&self.selector,
&self.table_id_sequence,
)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let table_route = self
.create_table_route(ctx.cluster_id, table_id, task)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let region_wal_options = self.create_wal_options(&table_route)?;
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -158,8 +83,84 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
Ok(TableMetadata {
table_id,
table_route,
region_routes,
region_wal_options,
})
}
}
/// pre-allocates create table's table id and region routes.
async fn handle_create_region_routes(
cluster_id: u64,
task: &CreateTableTask,
ctx: &SelectorContext,
selector: &SelectorRef,
table_id_sequence: &SequenceRef,
) -> Result<(TableId, Vec<RegionRoute>)> {
let table_info = &task.table_info;
let partitions = &task.partitions;
let mut peers = selector
.select(
cluster_id,
ctx,
SelectorOptions {
min_required_items: partitions.len(),
allow_duplication: true,
},
)
.await?;
if peers.len() < partitions.len() {
warn!(
"Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}",
format_full_table_name(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name
),
partitions.len(),
peers.len()
);
return error::NoEnoughAvailableDatanodeSnafu {
required: partitions.len(),
available: peers.len(),
}
.fail();
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
let table_id = table_id_sequence
.next()
.await
.context(error::NextSequenceSnafu)? as u32;
ensure!(
partitions.len() <= MAX_REGION_SEQ as usize,
TooManyPartitionsSnafu
);
let region_routes = partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
follower_peers: vec![], // follower_peers is not supported at the moment
leader_status: None,
}
})
.collect::<Vec<_>>();
Ok((table_id, region_routes))
}

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -146,11 +145,7 @@ pub(crate) async fn prepare_table_region_and_info_value(
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
}

View File

@@ -17,7 +17,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -115,7 +114,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", regions.clone().into_iter()).into(),
TableRouteValue::physical(vec![
vec![
RegionRoute {
region: Region {
id: 3.into(),
@@ -170,7 +169,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
]),
],
region_wal_options.clone(),
)
.await
@@ -179,7 +178,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", regions.clone().into_iter()).into(),
TableRouteValue::physical(vec![
vec![
RegionRoute {
region: Region {
id: 1.into(),
@@ -240,7 +239,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
]),
],
region_wal_options,
)
.await

View File

@@ -19,7 +19,7 @@ use api::v1::Rows;
use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoutes;
use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes};
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
@@ -76,7 +76,56 @@ impl PartitionRuleManager {
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
Ok(RegionRoutes(route.region_routes().clone()))
Ok(RegionRoutes(route.region_routes))
}
/// Find datanodes of corresponding regions of given table.
pub async fn find_region_datanodes(
&self,
table_id: TableId,
regions: Vec<RegionNumber>,
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut datanodes = HashMap::with_capacity(regions.len());
let region_map = convert_to_region_leader_map(&route.region_routes);
for region in regions.iter() {
let datanode = *region_map.get(region).context(error::FindDatanodeSnafu {
table_id,
region: *region,
})?;
datanodes
.entry(datanode.clone())
.or_insert_with(Vec::new)
.push(*region);
}
Ok(datanodes)
}
/// Find all leader peers of given table.
pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result<Vec<Peer>> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut peers = Vec::with_capacity(route.region_routes.len());
for peer in &route.region_routes {
peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu {
region_id: peer.region.id,
table_id,
})?);
}
Ok(peers)
}
pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
@@ -87,15 +136,13 @@ impl PartitionRuleManager {
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let region_routes = route.region_routes();
ensure!(
!region_routes.is_empty(),
!route.region_routes.is_empty(),
error::FindTableRoutesSnafu { table_id }
);
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
let mut partitions = Vec::with_capacity(route.region_routes.len());
for r in route.region_routes.iter() {
let partition = r
.region
.partition