refactor: hide RegionRoute behind TableRouteValue (#2989)

* refactor: hide `RegionRoute` behind `TableRouteValue` (Metric Engine table route, part 2)

* Update src/common/meta/src/ddl/create_table.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/meta-srv/src/procedure/region_migration/test_util.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: resolve PR comments

* fix: rustfmt

* compatible with the old TableRouteValue

* Update src/common/meta/src/key/table_route.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
LFC
2023-12-25 19:37:50 +08:00
committed by GitHub
parent 89129c99c8
commit 1641fd572a
27 changed files with 958 additions and 779 deletions

754
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@
use std::time::Instant;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
@@ -53,7 +54,11 @@ impl TableMetadataBencher {
let start = Instant::now();
self.table_metadata_manager
.create_table_metadata(table_info, region_routes, region_wal_options)
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();

View File

@@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey};
use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
@@ -153,7 +153,7 @@ impl MigrateTableMetadata {
)
.unwrap();
let new_table_value = NextTableRouteValue::new(table_route.region_routes);
let new_table_value = NextTableRouteValue::physical(table_route.region_routes);
let table_id = table_route.table.id as u32;
let new_key = TableRouteKey::new(table_id);

View File

@@ -21,10 +21,10 @@ use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
pub mod alter_table;
pub mod create_table;
@@ -58,7 +58,7 @@ pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
pub table_route: TableRouteValue,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,

View File

@@ -182,7 +182,6 @@ impl AlterTableProcedure {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
let table_route = self
.context
@@ -190,9 +189,7 @@ impl AlterTableProcedure {
.table_route_manager()
.get(table_id)
.await?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();

View File

@@ -18,9 +18,8 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_config::WAL_OPTIONS_KEY;
use common_error::ext::BoxedError;
use common_procedure::error::{
@@ -40,8 +39,9 @@ use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableInfoNotFoundSnafu};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
@@ -60,13 +60,13 @@ impl CreateTableProcedure {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
}
}
@@ -78,10 +78,12 @@ impl CreateTableProcedure {
opening_regions: vec![],
};
creator
.register_opening_regions(&context)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if let TableRouteValue::Physical(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
Ok(CreateTableProcedure { context, creator })
}
@@ -94,10 +96,6 @@ impl CreateTableProcedure {
self.table_info().ident.table_id
}
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.creator.data.region_routes
}
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}
@@ -132,7 +130,10 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}
pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
@@ -191,16 +192,54 @@ impl CreateTableProcedure {
options: create_table_expr.table_options.clone(),
};
let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
Ok(builder)
Ok(CreateRequestBuilder {
template,
physical_table_id,
})
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&region_routes, request_builder).await
}
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
self.create_regions(region_routes, request_builder).await
}
}
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Registers opening regions
self.creator.register_opening_regions(&self.context)?;
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;
let create_table_expr = &create_table_data.task.create_table;
@@ -208,8 +247,6 @@ impl CreateTableProcedure {
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let mut request_builder = self.new_region_request_builder()?;
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
@@ -221,12 +258,7 @@ impl CreateTableProcedure {
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.build_one(region_id, storage_path.clone(), region_wal_options)
.await?;
requests.push(PbRegionRequest::Create(create_region_request));
@@ -270,10 +302,13 @@ impl CreateTableProcedure {
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.await?;
info!("Created table metadata for table {table_id}");
@@ -329,7 +364,7 @@ impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
@@ -337,21 +372,23 @@ impl TableCreator {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
table_route,
region_wal_options,
},
opening_regions: vec![],
}
}
/// Register opening regions if doesn't exist.
pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> {
let region_routes = &self.data.region_routes;
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
&self,
context: &DdlContext,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(());
return Ok(vec![]);
}
let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
@@ -366,9 +403,7 @@ impl TableCreator {
})?;
opening_region_guards.push(guard);
}
self.opening_regions = opening_region_guards;
Ok(())
Ok(opening_region_guards)
}
}
@@ -386,7 +421,7 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}
@@ -399,28 +434,18 @@ impl CreateTableData {
/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
context: DdlContext,
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}
impl CreateRequestBuilder {
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
Self {
context,
template,
physical_table_id: None,
}
}
pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}
async fn build_one(
&mut self,
create_expr: &CreateTableExpr,
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
@@ -438,49 +463,18 @@ impl CreateRequestBuilder {
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
.await?;
}
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
Ok(request)
}
async fn metric_engine_hook(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
request: &mut PbCreateRegionRequest,
) -> Result<()> {
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
let table_id = if let Some(table_id) = self.physical_table_id {
table_id
} else {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_key = TableNameKey::new(
&create_expr.catalog_name,
&create_expr.schema_name,
physical_table_name,
);
let table_id = table_name_manager
.get(table_name_key)
.await?
.context(TableInfoNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();
self.physical_table_id = Some(table_id);
table_id
};
// Concat physical table's table id and corresponding region number to get
// the physical region id.
let physical_region_id = RegionId::new(table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}
Ok(())
Ok(request)
}
}

View File

@@ -177,7 +177,7 @@ impl DdlManager {
&self,
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -185,7 +185,7 @@ impl DdlManager {
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
region_routes,
table_route,
region_wal_options,
context,
);
@@ -275,9 +275,8 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let table_route = table_route_value.into_inner().region_routes().clone();
@@ -356,9 +355,8 @@ async fn handle_drop_table_task(
table_name: table_ref.to_string(),
})?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let id = ddl_manager
.submit_drop_table_task(
@@ -392,7 +390,7 @@ async fn handle_create_table_task(
let TableMetadata {
table_id,
region_routes,
table_route,
region_wal_options,
} = table_meta;
@@ -402,7 +400,7 @@ async fn handle_create_table_task(
.submit_create_table_task(
cluster_id,
create_table_task,
region_routes,
table_route,
region_wal_options,
)
.await?;

View File

@@ -135,9 +135,9 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Table route not found: {}", table_name))]
#[snafu(display("Failed to find table route for table id {}", table_id))]
TableRouteNotFound {
table_name: String,
table_id: TableId,
location: Location,
},

View File

@@ -147,6 +147,14 @@ pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}
pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
Self: Sized;
fn try_as_raw_value(&self) -> Result<Vec<u8>>;
}
pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
pub struct TableMetadataManager {
@@ -221,7 +229,9 @@ impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T
}
}
impl<'de, T: DeserializeOwned + Serialize> Deserialize<'de> for DeserializedValueWithBytes<T> {
impl<'de, T: DeserializeOwned + Serialize + TableMetaValue> Deserialize<'de>
for DeserializedValueWithBytes<T>
{
/// - Deserialize behaviors:
///
/// The `inner` field will be deserialized from the `bytes` field.
@@ -248,11 +258,11 @@ impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithByt
}
}
impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
impl<T: Serialize + DeserializeOwned + TableMetaValue> DeserializedValueWithBytes<T> {
/// Returns a struct containing a deserialized value and an original `bytes`.
/// It accepts original bytes of inner.
pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
let inner = serde_json::from_slice(&bytes).context(error::SerdeJsonSnafu)?;
let inner = T::try_from_raw_value(&bytes)?;
Ok(Self { bytes, inner })
}
@@ -373,13 +383,10 @@ impl TableMetadataManager {
pub async fn create_table_metadata(
&self,
mut table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
table_route_value: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = region_routes
.iter()
.map(|region| region.region.id.region_number())
.collect::<Vec<_>>();
let region_numbers = table_route_value.region_numbers();
table_info.meta.region_numbers = region_numbers;
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.clone();
@@ -403,30 +410,28 @@ impl TableMetadataManager {
.table_info_manager()
.build_create_txn(table_id, &table_info_value)?;
// Creates datanode table key value pairs.
let distribution = region_distribution(&region_routes)?;
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
table_id,
&engine,
&region_storage_path,
region_options,
region_wal_options,
distribution,
)?;
// Creates table route.
let table_route_value = TableRouteValue::new(region_routes);
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
let txn = Txn::merge_all(vec![
let mut txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_datanode_table_txn,
create_table_route_txn,
]);
if let TableRouteValue::Physical(x) = &table_route_value {
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
table_id,
&engine,
&region_storage_path,
region_options,
region_wal_options,
region_distribution(&x.region_routes)?,
)?;
txn = txn.merge(create_datanode_table_txn);
}
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already created.
@@ -711,12 +716,12 @@ impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey);
macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
$(
impl $val_ty {
pub fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
impl $crate::key::TableMetaValue for $val_ty {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
}
pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
@@ -744,8 +749,7 @@ macro_rules! impl_optional_meta_value {
impl_table_meta_value! {
TableNameValue,
TableInfoValue,
DatanodeTableValue,
TableRouteValue
DatanodeTableValue
}
impl_optional_meta_value! {
@@ -765,6 +769,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -780,14 +785,14 @@ mod tests {
let region_routes = vec![region_route.clone()];
let expected_region_routes =
TableRouteValue::new(vec![region_route.clone(), region_route.clone()]);
TableRouteValue::physical(vec![region_route.clone(), region_route.clone()]);
let expected = serde_json::to_vec(&expected_region_routes).unwrap();
// Serialize behaviors:
// The inner field will be ignored.
let value = DeserializedValueWithBytes {
// ignored
inner: TableRouteValue::new(region_routes.clone()),
inner: TableRouteValue::physical(region_routes.clone()),
bytes: Bytes::from(expected.clone()),
};
@@ -831,6 +836,20 @@ mod tests {
test_utils::new_test_table_info(10, region_numbers)
}
async fn create_physical_table_metadata(
table_metadata_manager: &TableMetadataManager,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) -> Result<()> {
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
}
#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -840,34 +859,33 @@ mod tests {
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
// if metadata was already created, it should be ok.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.is_ok());
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(region_route.clone());
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_table_metadata(
table_info.clone(),
modified_region_routes,
HashMap::default()
)
.await
.is_err());
assert!(create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
modified_region_routes
)
.await
.is_err());
let (remote_table_info, remote_table_route) = table_metadata_manager
.get_full_table_info(10)
@@ -894,18 +912,18 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::physical(
region_routes.clone(),
));
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
@@ -973,14 +991,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
let new_table_name = "another_name".to_string();
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
@@ -1045,14 +1063,14 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
let mut new_table_info = table_info.clone();
new_table_info.name = "hi".to_string();
let current_table_info_value =
@@ -1123,17 +1141,18 @@ mod tests {
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |region_route| {
@@ -1193,17 +1212,19 @@ mod tests {
let engine = table_info.meta.engine.as_str();
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
// creates metadata.
table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
let new_region_routes = vec![
new_region_route(1, 1),

View File

@@ -24,7 +24,8 @@ use table::metadata::TableId;
use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::key::{
RegionDistribution, TableMetaKey, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX,
RegionDistribution, TableMetaKey, TableMetaValue, DATANODE_TABLE_KEY_PATTERN,
DATANODE_TABLE_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::{DeserializedValueWithBytes, TABLE_INFO_KEY_PREFIX};
use super::{DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};

View File

@@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use super::{TableMetaValue, TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use crate::error::{Error, InvalidTableMetadataSnafu, Result};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;

View File

@@ -71,8 +71,8 @@ impl_table_meta_value! {TableRegionValue}
#[cfg(test)]
mod tests {
use super::*;
use crate::key::TableMetaValue;
#[test]
fn test_serde() {

View File

@@ -16,11 +16,12 @@ use std::collections::HashMap;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use super::DeserializedValueWithBytes;
use crate::error::Result;
use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
@@ -38,6 +39,7 @@ impl TableRouteKey {
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
@@ -55,11 +57,8 @@ pub struct LogicalTableRouteValue {
}
impl TableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: 0,
})
pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
@@ -102,6 +101,59 @@ impl TableRouteValue {
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
pub fn region_numbers(&self) -> Vec<RegionNumber> {
match self {
TableRouteValue::Physical(x) => x
.region_routes
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect::<Vec<_>>(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect::<Vec<_>>(),
}
}
}
impl TableMetaValue for TableRouteValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
let r = serde_json::from_slice::<TableRouteValue>(raw_value);
match r {
// Compatible with old TableRouteValue.
Err(e) if e.is_data() => Ok(Self::Physical(
serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
.context(SerdeJsonSnafu)?,
)),
Ok(x) => Ok(x),
Err(e) => Err(e).context(SerdeJsonSnafu),
}
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: 0,
}
}
}
impl LogicalTableRouteValue {
pub fn physical_table_id(&self) -> TableId {
todo!()
}
pub fn region_ids(&self) -> Vec<RegionId> {
todo!()
}
}
impl TableMetaKey for TableRouteKey {
@@ -301,3 +353,20 @@ impl TableRouteManager {
.transpose()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
let new_raw_v = format!("{:?}", v);
assert_eq!(
new_raw_v,
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }], version: 0 })"#
);
}
}

View File

@@ -18,10 +18,14 @@ use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::region::check_response_header;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -34,7 +38,7 @@ use common_telemetry::{debug, info, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
@@ -151,17 +155,29 @@ impl StandaloneTableMetadataAllocator {
};
Ok(table_id)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
fn create_table_route(table_id: TableId, task: &CreateTableTask) -> TableRouteValue {
if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let region_routes = task
.partitions
.iter()
@@ -182,13 +198,22 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
}
}
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_route = create_table_route(table_id, task);
let region_wal_options = self.create_wal_options(&table_route)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -197,8 +222,8 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
Ok(TableMetadata {
table_id,
region_routes,
region_wal_options: HashMap::default(),
table_route,
region_wal_options,
})
}
}

View File

@@ -104,6 +104,7 @@ mod test {
use std::sync::Arc;
use common_meta::distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -161,7 +162,11 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
@@ -303,7 +308,11 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -137,7 +137,6 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -187,10 +186,8 @@ mod tests {
..Default::default()
};
env.table_metadata_manager()
.create_table_metadata(table_info, vec![region_route], HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, vec![region_route])
.await;
let err = state
.retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
@@ -221,10 +218,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -254,10 +249,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -281,10 +274,8 @@ mod tests {
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -187,6 +187,7 @@ mod tests {
use std::assert_matches::assert_matches;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -409,7 +410,11 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -22,6 +22,7 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -144,6 +145,22 @@ impl TestingEnv {
provider: Arc::new(MockContextProvider::default()),
}
}
// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) {
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}
}
/// Generates a [InstructionReply::OpenRegion] reply.
@@ -369,7 +386,11 @@ impl ProcedureMigrationTestSuite {
) {
self.env
.table_metadata_manager()
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}

View File

@@ -74,7 +74,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -136,12 +135,10 @@ mod tests {
},
];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -190,11 +187,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
@@ -233,11 +229,10 @@ mod tests {
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -59,7 +59,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -128,12 +127,10 @@ mod tests {
region_routes
};
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let old_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -213,11 +210,10 @@ mod tests {
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -176,7 +176,6 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -225,11 +224,8 @@ mod tests {
..Default::default()
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -254,11 +250,8 @@ mod tests {
..Default::default()
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -285,11 +278,8 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let new_region_routes = state
.build_upgrade_candidate_region_metadata(&mut ctx)
@@ -326,12 +316,10 @@ mod tests {
},
];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
@@ -385,11 +373,8 @@ mod tests {
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(!updated);
@@ -411,11 +396,8 @@ mod tests {
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(updated);
@@ -437,11 +419,8 @@ mod tests {
leader_status: Some(RegionStatus::Downgraded),
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
@@ -470,11 +449,10 @@ mod tests {
.unwrap();
ctx.volatile_ctx.opening_region_guard = Some(guard);
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();

View File

@@ -100,12 +100,12 @@ fn test_region_request_builder() {
let procedure = CreateTableProcedure::new(
1,
create_table_task(),
test_data::new_region_routes(),
TableRouteValue::physical(test_data::new_region_routes()),
HashMap::default(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);
let template = procedure.new_region_request_builder().unwrap();
let template = procedure.new_region_request_builder(None).unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
@@ -191,7 +191,7 @@ async fn test_on_datanode_create_regions() {
let mut procedure = CreateTableProcedure::new(
1,
create_table_task(),
region_routes,
TableRouteValue::physical(region_routes),
HashMap::default(),
test_data::new_ddl_context(datanode_manager),
);
@@ -247,7 +247,7 @@ async fn test_on_datanode_drop_regions() {
let procedure = DropTableProcedure::new(
1,
drop_table_task,
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes)),
DeserializedValueWithBytes::from_inner(TableRouteValue::physical(region_routes)),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(datanode_manager),
);
@@ -373,7 +373,7 @@ async fn test_submit_alter_region_requests() {
.table_metadata_manager
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await

View File

@@ -188,6 +188,7 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -291,7 +292,11 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
@@ -378,7 +383,11 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -12,17 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use std::collections::HashMap;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
use common_telemetry::{debug, warn};
use common_meta::ClusterId;
use common_telemetry::debug;
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
use store_api::storage::{RegionId, RegionNumber, TableId, MAX_REGION_SEQ};
use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
@@ -49,6 +55,83 @@ impl MetaSrvTableMetadataAllocator {
wal_options_allocator,
}
}
async fn create_table_route(
&self,
cluster_id: ClusterId,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let regions = task.partitions.len();
ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
let mut peers = self
.selector
.select(
cluster_id,
&self.ctx,
SelectorOptions {
min_required_items: regions,
allow_duplication: true,
},
)
.await?;
ensure!(
peers.len() >= regions,
error::NoEnoughAvailableDatanodeSnafu {
required: regions,
available: peers.len(),
}
);
peers.truncate(regions);
let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as RegionNumber),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
}
fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> MetaResult<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}
}
#[async_trait::async_trait]
@@ -58,23 +141,15 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let (table_id, region_routes) = handle_create_region_routes(
ctx.cluster_id,
task,
&self.ctx,
&self.selector,
&self.table_id_sequence,
)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let table_id = self.table_id_sequence.next().await? as TableId;
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
let table_route = self
.create_table_route(ctx.cluster_id, table_id, task)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let region_wal_options = self.create_wal_options(&table_route)?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -83,84 +158,8 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
Ok(TableMetadata {
table_id,
region_routes,
table_route,
region_wal_options,
})
}
}
/// pre-allocates create table's table id and region routes.
async fn handle_create_region_routes(
cluster_id: u64,
task: &CreateTableTask,
ctx: &SelectorContext,
selector: &SelectorRef,
table_id_sequence: &SequenceRef,
) -> Result<(TableId, Vec<RegionRoute>)> {
let table_info = &task.table_info;
let partitions = &task.partitions;
let mut peers = selector
.select(
cluster_id,
ctx,
SelectorOptions {
min_required_items: partitions.len(),
allow_duplication: true,
},
)
.await?;
if peers.len() < partitions.len() {
warn!(
"Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}",
format_full_table_name(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name
),
partitions.len(),
peers.len()
);
return error::NoEnoughAvailableDatanodeSnafu {
required: partitions.len(),
available: peers.len(),
}
.fail();
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
let table_id = table_id_sequence
.next()
.await
.context(error::NextSequenceSnafu)? as u32;
ensure!(
partitions.len() <= MAX_REGION_SEQ as usize,
TooManyPartitionsSnafu
);
let region_routes = partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer.into()),
follower_peers: vec![], // follower_peers is not supported at the moment
leader_status: None,
}
})
.collect::<Vec<_>>();
Ok((table_id, region_routes))
}

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
@@ -145,7 +146,11 @@ pub(crate) async fn prepare_table_region_and_info_value(
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes, HashMap::default())
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}

View File

@@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -114,7 +115,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", regions.clone().into_iter()).into(),
vec![
TableRouteValue::physical(vec![
RegionRoute {
region: Region {
id: 3.into(),
@@ -169,7 +170,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
],
]),
region_wal_options.clone(),
)
.await
@@ -178,7 +179,7 @@ pub(crate) async fn create_partition_rule_manager(
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", regions.clone().into_iter()).into(),
vec![
TableRouteValue::physical(vec![
RegionRoute {
region: Region {
id: 1.into(),
@@ -239,7 +240,7 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_status: None,
},
],
]),
region_wal_options,
)
.await