mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 22:40:40 +00:00
feat: remove support for logical tables in the create table procedure (#3592)
* feat: Remove support for logical tables in the create table procedure * chore: remove the redandent table ids alloc * chore: minor fix
This commit is contained in:
@@ -426,7 +426,6 @@ impl StartCommand {
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
wal_options_allocator.clone(),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
));
|
||||
|
||||
let ddl_task_executor = Self::create_ddl_task_executor(
|
||||
|
||||
@@ -22,7 +22,7 @@ use self::table_meta::TableMetadataAllocatorRef;
|
||||
use crate::cache_invalidator::CacheInvalidatorRef;
|
||||
use crate::datanode_manager::DatanodeManagerRef;
|
||||
use crate::error::Result;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::TableMetadataManagerRef;
|
||||
use crate::region_keeper::MemoryRegionKeeperRef;
|
||||
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
@@ -86,7 +86,7 @@ pub struct TableMetadata {
|
||||
/// Table id.
|
||||
pub table_id: TableId,
|
||||
/// Route information for each region of the table.
|
||||
pub table_route: TableRouteValue,
|
||||
pub table_route: PhysicalTableRouteValue,
|
||||
/// The encoded wal options for regions of the table.
|
||||
// If a region does not have an associated wal options, no key for the region would be found in the map.
|
||||
pub region_wal_options: HashMap<RegionNumber, String>,
|
||||
|
||||
@@ -35,9 +35,9 @@ use table::table_reference::TableReference;
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{self, Result, TableRouteNotFoundSnafu};
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
|
||||
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
|
||||
use crate::region_keeper::OperatingRegionGuard;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
@@ -69,7 +69,7 @@ impl CreateTableProcedure {
|
||||
};
|
||||
|
||||
// Only registers regions if the table route is allocated.
|
||||
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
|
||||
if let Some(x) = &creator.data.table_route {
|
||||
creator.opening_regions = creator
|
||||
.register_opening_regions(&context, &x.region_routes)
|
||||
.map_err(BoxedError::new)
|
||||
@@ -97,7 +97,7 @@ impl CreateTableProcedure {
|
||||
})
|
||||
}
|
||||
|
||||
fn table_route(&self) -> Result<&TableRouteValue> {
|
||||
fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
|
||||
self.creator
|
||||
.data
|
||||
.table_route
|
||||
@@ -111,7 +111,7 @@ impl CreateTableProcedure {
|
||||
pub fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
table_route: PhysicalTableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.creator
|
||||
@@ -192,31 +192,10 @@ impl CreateTableProcedure {
|
||||
/// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
|
||||
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
|
||||
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
|
||||
// Safety: the table route must be allocated.
|
||||
match self.table_route()?.clone() {
|
||||
TableRouteValue::Physical(x) => {
|
||||
let request_builder = self.new_region_request_builder(None)?;
|
||||
self.create_regions(&x.region_routes, request_builder).await
|
||||
}
|
||||
TableRouteValue::Logical(x) => {
|
||||
let physical_table_id = x.physical_table_id();
|
||||
|
||||
let physical_table_route = self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.try_get_physical_table_route(physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: physical_table_id,
|
||||
})?;
|
||||
let region_routes = &physical_table_route.region_routes;
|
||||
|
||||
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
|
||||
|
||||
self.create_regions(region_routes, request_builder).await
|
||||
}
|
||||
}
|
||||
let table_route = self.table_route()?.clone();
|
||||
let request_builder = self.new_region_request_builder(None)?;
|
||||
self.create_regions(&table_route.region_routes, request_builder)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn create_regions(
|
||||
@@ -224,15 +203,12 @@ impl CreateTableProcedure {
|
||||
region_routes: &[RegionRoute],
|
||||
request_builder: CreateRequestBuilder,
|
||||
) -> Result<Status> {
|
||||
// Safety: the table_route must be allocated.
|
||||
if self.table_route()?.is_physical() {
|
||||
// Registers opening regions
|
||||
let guards = self
|
||||
.creator
|
||||
.register_opening_regions(&self.context, region_routes)?;
|
||||
if !guards.is_empty() {
|
||||
self.creator.opening_regions = guards;
|
||||
}
|
||||
// Registers opening regions
|
||||
let guards = self
|
||||
.creator
|
||||
.register_opening_regions(&self.context, region_routes)?;
|
||||
if !guards.is_empty() {
|
||||
self.creator.opening_regions = guards;
|
||||
}
|
||||
|
||||
let create_table_data = &self.creator.data;
|
||||
@@ -303,7 +279,7 @@ impl CreateTableProcedure {
|
||||
// Safety: the region_wal_options must be allocated.
|
||||
let region_wal_options = self.region_wal_options()?.clone();
|
||||
// Safety: the table_route must be allocated.
|
||||
let table_route = self.table_route()?.clone();
|
||||
let table_route = TableRouteValue::Physical(self.table_route()?.clone());
|
||||
manager
|
||||
.create_table_metadata(raw_table_info, table_route, region_wal_options)
|
||||
.await?;
|
||||
@@ -400,7 +376,7 @@ impl TableCreator {
|
||||
fn set_allocated_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
table_route: TableRouteValue,
|
||||
table_route: PhysicalTableRouteValue,
|
||||
region_wal_options: HashMap<RegionNumber, String>,
|
||||
) {
|
||||
self.data.task.table_info.ident.table_id = table_id;
|
||||
@@ -424,7 +400,7 @@ pub struct CreateTableData {
|
||||
pub state: CreateTableState,
|
||||
pub task: CreateTableTask,
|
||||
/// None stands for not allocated yet.
|
||||
table_route: Option<TableRouteValue>,
|
||||
table_route: Option<PhysicalTableRouteValue>,
|
||||
/// None stands for not allocated yet.
|
||||
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
|
||||
pub cluster_id: ClusterId,
|
||||
|
||||
@@ -16,16 +16,13 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::METRIC_ENGINE;
|
||||
use common_telemetry::{debug, info};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
|
||||
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu};
|
||||
use crate::key::table_name::{TableNameKey, TableNameManager};
|
||||
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
|
||||
use crate::error::{self, Result, UnsupportedSnafu};
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::rpc::router::{Region, RegionRoute};
|
||||
@@ -38,7 +35,6 @@ pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
|
||||
pub struct TableMetadataAllocator {
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
}
|
||||
|
||||
@@ -46,12 +42,10 @@ impl TableMetadataAllocator {
|
||||
pub fn new(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_name_manager: TableNameManager,
|
||||
) -> Self {
|
||||
Self::with_peer_allocator(
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_name_manager,
|
||||
Arc::new(NoopPeerAllocator),
|
||||
)
|
||||
}
|
||||
@@ -59,13 +53,11 @@ impl TableMetadataAllocator {
|
||||
pub fn with_peer_allocator(
|
||||
table_id_sequence: SequenceRef,
|
||||
wal_options_allocator: WalOptionsAllocatorRef,
|
||||
table_name_manager: TableNameManager,
|
||||
peer_allocator: PeerAllocatorRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id_sequence,
|
||||
wal_options_allocator,
|
||||
table_name_manager,
|
||||
peer_allocator,
|
||||
}
|
||||
}
|
||||
@@ -102,19 +94,14 @@ impl TableMetadataAllocator {
|
||||
|
||||
fn create_wal_options(
|
||||
&self,
|
||||
table_route: &TableRouteValue,
|
||||
table_route: &PhysicalTableRouteValue,
|
||||
) -> Result<HashMap<RegionNumber, String>> {
|
||||
match table_route {
|
||||
TableRouteValue::Physical(x) => {
|
||||
let region_numbers = x
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|route| route.region.id.region_number())
|
||||
.collect();
|
||||
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
|
||||
}
|
||||
TableRouteValue::Logical(_) => Ok(HashMap::new()),
|
||||
}
|
||||
let region_numbers = table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|route| route.region.id.region_number())
|
||||
.collect();
|
||||
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
|
||||
}
|
||||
|
||||
async fn create_table_route(
|
||||
@@ -122,7 +109,7 @@ impl TableMetadataAllocator {
|
||||
ctx: &TableMetadataAllocatorContext,
|
||||
table_id: TableId,
|
||||
task: &CreateTableTask,
|
||||
) -> Result<TableRouteValue> {
|
||||
) -> Result<PhysicalTableRouteValue> {
|
||||
let regions = task.partitions.len();
|
||||
ensure!(
|
||||
regions > 0,
|
||||
@@ -131,56 +118,29 @@ impl TableMetadataAllocator {
|
||||
}
|
||||
);
|
||||
|
||||
let table_route = if task.create_table.engine == METRIC_ENGINE
|
||||
&& let Some(physical_table_name) = task
|
||||
.create_table
|
||||
.table_options
|
||||
.get(LOGICAL_TABLE_METADATA_KEY)
|
||||
{
|
||||
let physical_table_id = self
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(
|
||||
&task.create_table.catalog_name,
|
||||
&task.create_table.schema_name,
|
||||
physical_table_name,
|
||||
))
|
||||
.await?
|
||||
.context(TableNotFoundSnafu {
|
||||
table_name: physical_table_name,
|
||||
})?
|
||||
.table_id();
|
||||
let peers = self.peer_allocator.alloc(ctx, regions).await?;
|
||||
let region_routes = task
|
||||
.partitions
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, partition)| {
|
||||
let region = Region {
|
||||
id: RegionId::new(table_id, i as u32),
|
||||
partition: Some(partition.clone().into()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let region_ids = (0..regions)
|
||||
.map(|i| RegionId::new(table_id, i as RegionNumber))
|
||||
.collect();
|
||||
let peer = peers[i % peers.len()].clone();
|
||||
|
||||
TableRouteValue::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
|
||||
} else {
|
||||
let peers = self.peer_allocator.alloc(ctx, regions).await?;
|
||||
RegionRoute {
|
||||
region,
|
||||
leader_peer: Some(peer),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let region_routes = task
|
||||
.partitions
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, partition)| {
|
||||
let region = Region {
|
||||
id: RegionId::new(table_id, i as u32),
|
||||
partition: Some(partition.clone().into()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let peer = peers[i % peers.len()].clone();
|
||||
|
||||
RegionRoute {
|
||||
region,
|
||||
leader_peer: Some(peer),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
|
||||
};
|
||||
Ok(table_route)
|
||||
Ok(PhysicalTableRouteValue::new(region_routes))
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
@@ -203,15 +163,6 @@ impl TableMetadataAllocator {
|
||||
region_wal_options,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets table ids with all tasks.
|
||||
pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> {
|
||||
for task in tasks {
|
||||
let table_id = self.allocate_table_id(task).await?;
|
||||
task.table_info.ident.table_id = table_id;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
|
||||
|
||||
@@ -68,7 +68,7 @@ pub async fn create_physical_table(
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ async fn test_on_prepare() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
@@ -106,7 +106,7 @@ async fn test_on_prepare_logical_table_exists_err() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
@@ -152,7 +152,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
@@ -200,7 +200,7 @@ async fn test_on_prepare_part_logical_tables_exist() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
@@ -271,7 +271,7 @@ async fn test_on_create_metadata() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
@@ -321,7 +321,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
@@ -382,7 +382,7 @@ async fn test_on_create_metadata_err() {
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
TableRouteValue::Physical(table_route),
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
|
||||
@@ -541,7 +541,7 @@ async fn handle_create_table_task(
|
||||
async fn handle_create_logical_table_tasks(
|
||||
ddl_manager: &DdlManager,
|
||||
cluster_id: ClusterId,
|
||||
mut create_table_tasks: Vec<CreateTableTask>,
|
||||
create_table_tasks: Vec<CreateTableTask>,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
ensure!(
|
||||
!create_table_tasks.is_empty(),
|
||||
@@ -554,11 +554,6 @@ async fn handle_create_logical_table_tasks(
|
||||
&create_table_tasks,
|
||||
)
|
||||
.await?;
|
||||
// Sets table_ids on create_table_tasks
|
||||
ddl_manager
|
||||
.table_metadata_allocator
|
||||
.set_table_ids_on_logic_create(&mut create_table_tasks)
|
||||
.await?;
|
||||
let num_logical_tables = create_table_tasks.len();
|
||||
|
||||
let (id, output) = ddl_manager
|
||||
@@ -770,7 +765,6 @@ mod tests {
|
||||
Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
);
|
||||
|
||||
@@ -99,7 +99,6 @@ pub fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext {
|
||||
.build(),
|
||||
),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
table_metadata_manager,
|
||||
}
|
||||
|
||||
@@ -225,7 +225,6 @@ impl MetaSrvBuilder {
|
||||
Arc::new(TableMetadataAllocator::with_peer_allocator(
|
||||
sequence,
|
||||
wal_options_allocator.clone(),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
peer_allocator,
|
||||
))
|
||||
});
|
||||
|
||||
@@ -37,7 +37,7 @@ use common_meta::ddl::test_util::create_table::{
|
||||
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
|
||||
};
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
|
||||
use common_meta::key::DeserializedValueWithBytes;
|
||||
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask};
|
||||
use common_meta::rpc::router::{find_leaders, RegionRoute};
|
||||
@@ -115,7 +115,7 @@ fn test_region_request_builder() {
|
||||
|
||||
procedure.set_allocated_metadata(
|
||||
1024,
|
||||
TableRouteValue::physical(test_data::new_region_routes()),
|
||||
PhysicalTableRouteValue::new(test_data::new_region_routes()),
|
||||
HashMap::default(),
|
||||
);
|
||||
|
||||
@@ -210,7 +210,7 @@ async fn test_on_datanode_create_regions() {
|
||||
|
||||
procedure.set_allocated_metadata(
|
||||
42,
|
||||
TableRouteValue::physical(test_data::new_region_routes()),
|
||||
PhysicalTableRouteValue::new(test_data::new_region_routes()),
|
||||
HashMap::default(),
|
||||
);
|
||||
|
||||
|
||||
@@ -209,7 +209,6 @@ pub mod test_data {
|
||||
table_metadata_allocator: Arc::new(TableMetadataAllocator::new(
|
||||
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
|
||||
Arc::new(WalOptionsAllocator::default()),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +143,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
wal_options_allocator.clone(),
|
||||
table_metadata_manager.table_name_manager().clone(),
|
||||
));
|
||||
|
||||
let ddl_task_executor = Arc::new(
|
||||
|
||||
Reference in New Issue
Block a user