diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index aca3e6e29c..e14f75fd56 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -68,8 +68,8 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to start wal options allocator"))] - StartWalOptionsAllocator { + #[snafu(display("Failed to start wal provider"))] + StartWalProvider { #[snafu(implicit)] location: Location, source: common_meta::error::Error, @@ -343,7 +343,7 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), - Error::StartWalOptionsAllocator { source, .. } => source.status_code(), + Error::StartWalProvider { source, .. } => source.status_code(), Error::HttpQuerySql { .. } => StatusCode::Internal, Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => { source.status_code() diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index fbff2d42e0..b80828d13a 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -64,8 +64,8 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to start wal options allocator"))] - StartWalOptionsAllocator { + #[snafu(display("Failed to start wal provider"))] + StartWalProvider { #[snafu(implicit)] location: Location, source: common_meta::error::Error, @@ -289,8 +289,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to build wal options allocator"))] - BuildWalOptionsAllocator { + #[snafu(display("Failed to build wal provider"))] + BuildWalProvider { #[snafu(implicit)] location: Location, source: common_meta::error::Error, @@ -350,8 +350,9 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), - Error::BuildWalOptionsAllocator { source, .. } - | Error::StartWalOptionsAllocator { source, .. } => source.status_code(), + Error::BuildWalProvider { source, .. } | Error::StartWalProvider { source, .. } => { + source.status_code() + } Error::HttpQuerySql { .. } => StatusCode::Internal, Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => { source.status_code() diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8f978f9278..9569dfb1e8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -40,7 +40,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; -use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator}; +use common_meta::wal_provider::{WalProviderRef, build_wal_provider}; use common_procedure::ProcedureManagerRef; use common_query::prelude::set_default_prefix; use common_telemetry::info; @@ -120,7 +120,7 @@ pub struct Instance { frontend: Frontend, flownode: FlownodeInstance, procedure_manager: ProcedureManagerRef, - wal_options_allocator: WalOptionsAllocatorRef, + wal_provider: WalProviderRef, // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } @@ -146,10 +146,10 @@ impl App for Instance { .await .context(error::StartProcedureManagerSnafu)?; - self.wal_options_allocator + self.wal_provider .start() .await - .context(error::StartWalOptionsAllocatorSnafu)?; + .context(error::StartWalProviderSnafu)?; plugins::start_frontend_plugins(self.frontend.instance.plugins().clone()) .await @@ -468,7 +468,7 @@ impl StartCommand { flow_server: flownode.flow_engine(), }); - let table_id_sequence = Arc::new( + let table_id_allocator = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) @@ -485,13 +485,13 @@ impl StartCommand { .clone() .try_into() .context(error::InvalidWalProviderSnafu)?; - let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone()) + let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone()) .await - .context(error::BuildWalOptionsAllocatorSnafu)?; - let wal_options_allocator = Arc::new(wal_options_allocator); + .context(error::BuildWalProviderSnafu)?; + let wal_provider = Arc::new(wal_provider); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( - table_id_sequence, - wal_options_allocator.clone(), + table_id_allocator, + wal_provider.clone(), )); let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( flow_id_sequence, @@ -585,7 +585,7 @@ impl StartCommand { frontend, flownode, procedure_manager, - wal_options_allocator, + wal_provider, _guard: guard, }) } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index aeab51190c..8fc647433a 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::region_registry::LeaderRegionRegistryRef; +pub mod allocator; pub mod alter_database; pub mod alter_logical_tables; pub mod alter_table; diff --git a/src/common/meta/src/ddl/allocator.rs b/src/common/meta/src/ddl/allocator.rs new file mode 100644 index 0000000000..5a3de33b34 --- /dev/null +++ b/src/common/meta/src/ddl/allocator.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod region_routes; +pub mod resource_id; +pub mod wal_options; diff --git a/src/common/meta/src/ddl/allocator/region_routes.rs b/src/common/meta/src/ddl/allocator/region_routes.rs new file mode 100644 index 0000000000..04665c5a31 --- /dev/null +++ b/src/common/meta/src/ddl/allocator/region_routes.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::debug; +use store_api::storage::{RegionId, RegionNumber, TableId}; + +use crate::error::Result; +use crate::peer::PeerAllocator; +use crate::rpc::router::{Region, RegionRoute}; + +pub type RegionRoutesAllocatorRef = Arc; + +#[async_trait::async_trait] +pub trait RegionRoutesAllocator: Send + Sync { + async fn allocate( + &self, + table_id: TableId, + regions_and_partitions: &[(RegionNumber, &str)], + ) -> Result>; +} + +#[async_trait::async_trait] +impl RegionRoutesAllocator for T { + async fn allocate( + &self, + table_id: TableId, + regions_and_partitions: &[(RegionNumber, &str)], + ) -> Result> { + let regions = regions_and_partitions.len().max(1); + let peers = self.alloc(regions).await?; + debug!("Allocated peers {:?} for table {}", peers, table_id,); + + let mut region_routes = regions_and_partitions + .iter() + .enumerate() + .map(|(i, (region_number, partition))| { + let region = Region { + id: RegionId::new(table_id, *region_number), + partition_expr: partition.to_string(), + ..Default::default() + }; + + let peer = peers[i % peers.len()].clone(); + + RegionRoute { + region, + leader_peer: Some(peer), + ..Default::default() + } + }) + .collect::>(); + + // If the table has no partitions, we need to create a default region. + if region_routes.is_empty() { + region_routes.push(RegionRoute { + region: Region { + id: RegionId::new(table_id, 0), + ..Default::default() + }, + leader_peer: Some(peers[0].clone()), + ..Default::default() + }); + } + + Ok(region_routes) + } +} diff --git a/src/common/meta/src/ddl/allocator/resource_id.rs b/src/common/meta/src/ddl/allocator/resource_id.rs new file mode 100644 index 0000000000..e0d9f6229e --- /dev/null +++ b/src/common/meta/src/ddl/allocator/resource_id.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use crate::error::Result; + +pub type ResourceIdAllocatorRef = Arc; + +#[async_trait::async_trait] +pub trait ResourceIdAllocator: Send + Sync { + /// Returns the next value and increments the sequence. + async fn next(&self) -> Result; + + /// Returns the current value stored in the remote storage without incrementing the sequence. + async fn peek(&self) -> Result; + + /// Jumps to the given value. + async fn jump_to(&self, next: u64) -> Result<()>; + + /// Returns the range of available sequences. + async fn min_max(&self) -> Range; +} diff --git a/src/common/meta/src/ddl/allocator/wal_options.rs b/src/common/meta/src/ddl/allocator/wal_options.rs new file mode 100644 index 0000000000..3b902d9bf1 --- /dev/null +++ b/src/common/meta/src/ddl/allocator/wal_options.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use store_api::storage::RegionNumber; + +use crate::error::Result; + +pub type WalOptionsAllocatorRef = Arc; + +#[async_trait::async_trait] +pub trait WalOptionsAllocator: Send + Sync { + async fn allocate( + &self, + region_numbers: &[RegionNumber], + skip_wal: bool, + ) -> Result>; +} diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index 1c53bc2e4e..4f1b0da7d8 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -97,7 +97,7 @@ pub fn create_region_request_builder( /// Builds a [CreateRequestBuilder] from a [RawTableInfo]. /// -/// Note: **This method is only used for creating logical tables.** +/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. pub fn create_region_request_builder_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 48c677aa5b..bfa57693d0 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod executor; -pub(crate) mod template; +pub mod executor; +pub mod template; use std::collections::HashMap; diff --git a/src/common/meta/src/ddl/create_table/template.rs b/src/common/meta/src/ddl/create_table/template.rs index 19ee82c091..a94985a9b8 100644 --- a/src/common/meta/src/ddl/create_table/template.rs +++ b/src/common/meta/src/ddl/create_table/template.rs @@ -20,19 +20,17 @@ use api::v1::region::{CreateRequest, RegionColumnDef}; use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{RawTableInfo, TableId}; use crate::error::{self, Result}; -use crate::wal_options_allocator::prepare_wal_options; +use crate::wal_provider::prepare_wal_options; -/// Builds a [CreateRequest] from a [RawTableInfo]. +/// Constructs a [CreateRequest] based on the provided [RawTableInfo]. /// -/// Note: **This method is only used for creating logical tables.** -pub(crate) fn build_template_from_raw_table_info( - raw_table_info: &RawTableInfo, -) -> Result { +/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. +pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result { let primary_key_indices = &raw_table_info.meta.primary_key_indices; let column_defs = raw_table_info .meta @@ -57,7 +55,7 @@ pub(crate) fn build_template_from_raw_table_info( let options = HashMap::from(&raw_table_info.meta.options); let template = CreateRequest { region_id: 0, - engine: METRIC_ENGINE_NAME.to_string(), + engine: raw_table_info.meta.engine.clone(), column_defs, primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(), path: String::new(), @@ -138,7 +136,7 @@ pub struct CreateRequestBuilder { } impl CreateRequestBuilder { - pub(crate) fn new(template: CreateRequest, physical_table_id: Option) -> Self { + pub fn new(template: CreateRequest, physical_table_id: Option) -> Self { Self { template, physical_table_id, diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 42cc893ee6..2ba94edc99 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -17,47 +17,47 @@ use std::sync::Arc; use common_telemetry::{debug, info}; use snafu::ensure; -use store_api::storage::{RegionId, RegionNumber, TableId}; +use store_api::storage::{RegionNumber, TableId}; use crate::ddl::TableMetadata; +use crate::ddl::allocator::region_routes::RegionRoutesAllocatorRef; +use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef; +use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef; use crate::error::{Result, UnsupportedSnafu}; use crate::key::table_route::PhysicalTableRouteValue; use crate::peer::{NoopPeerAllocator, PeerAllocatorRef}; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{Region, RegionRoute}; -use crate::sequence::SequenceRef; -use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options}; pub type TableMetadataAllocatorRef = Arc; #[derive(Clone)] pub struct TableMetadataAllocator { - table_id_sequence: SequenceRef, + table_id_allocator: ResourceIdAllocatorRef, wal_options_allocator: WalOptionsAllocatorRef, - peer_allocator: PeerAllocatorRef, + region_routes_allocator: RegionRoutesAllocatorRef, } impl TableMetadataAllocator { pub fn new( - table_id_sequence: SequenceRef, + table_id_allocator: ResourceIdAllocatorRef, wal_options_allocator: WalOptionsAllocatorRef, ) -> Self { Self::with_peer_allocator( - table_id_sequence, + table_id_allocator, wal_options_allocator, Arc::new(NoopPeerAllocator), ) } pub fn with_peer_allocator( - table_id_sequence: SequenceRef, + table_id_allocator: ResourceIdAllocatorRef, wal_options_allocator: WalOptionsAllocatorRef, peer_allocator: PeerAllocatorRef, ) -> Self { Self { - table_id_sequence, + table_id_allocator, wal_options_allocator, - peer_allocator, + region_routes_allocator: Arc::new(peer_allocator) as _, } } @@ -70,7 +70,7 @@ impl TableMetadataAllocator { ensure!( !self - .table_id_sequence + .table_id_allocator .min_max() .await .contains(&(table_id as u64)), @@ -89,65 +89,35 @@ impl TableMetadataAllocator { table_id } else { - self.table_id_sequence.next().await? as TableId + self.table_id_allocator.next().await? as TableId }; Ok(table_id) } - fn create_wal_options( + async fn create_wal_options( &self, - table_route: &PhysicalTableRouteValue, + region_numbers: &[RegionNumber], skip_wal: bool, ) -> Result> { - let region_numbers = table_route - .region_routes - .iter() - .map(|route| route.region.id.region_number()) - .collect(); - allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal) + self.wal_options_allocator + .allocate(region_numbers, skip_wal) + .await } async fn create_table_route( &self, table_id: TableId, - task: &CreateTableTask, + partition_exprs: &[&str], ) -> Result { - let regions = task.partitions.len().max(1); - let peers = self.peer_allocator.alloc(regions).await?; - debug!("Allocated peers {:?} for table {}", peers, table_id); - - let mut region_routes = task - .partitions + let region_number_and_partition_exprs = partition_exprs .iter() .enumerate() - .map(|(i, partition)| { - let region = Region { - id: RegionId::new(table_id, i as u32), - partition_expr: partition.expression.clone(), - ..Default::default() - }; - - let peer = peers[i % peers.len()].clone(); - - RegionRoute { - region, - leader_peer: Some(peer), - ..Default::default() - } - }) + .map(|(i, partition)| (i as u32, *partition)) .collect::>(); - - // If the table has no partitions, we need to create a default region. - if region_routes.is_empty() { - region_routes.push(RegionRoute { - region: Region { - id: RegionId::new(table_id, 0), - ..Default::default() - }, - leader_peer: Some(peers[0].clone()), - ..Default::default() - }); - } + let region_routes = self + .region_routes_allocator + .allocate(table_id, ®ion_number_and_partition_exprs) + .await?; Ok(PhysicalTableRouteValue::new(region_routes)) } @@ -164,10 +134,20 @@ impl TableMetadataAllocator { pub async fn create(&self, task: &CreateTableTask) -> Result { let table_id = self.allocate_table_id(&task.create_table.table_id).await?; - let table_route = self.create_table_route(table_id, task).await?; - - let region_wal_options = - self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?; + let partition_exprs = task + .partitions + .iter() + .map(|p| p.expression.as_str()) + .collect::>(); + let table_route = self.create_table_route(table_id, &partition_exprs).await?; + let region_numbers = table_route + .region_routes + .iter() + .map(|route| route.region.id.region_number()) + .collect::>(); + let region_wal_options = self + .create_wal_options(®ion_numbers, task.table_info.meta.options.skip_wal) + .await?; debug!( "Allocated region wal options {:?} for table {}", @@ -181,7 +161,7 @@ impl TableMetadataAllocator { }) } - pub fn table_id_sequence(&self) -> SequenceRef { - self.table_id_sequence.clone() + pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef { + self.table_id_allocator.clone() } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 74493d4d47..1db189c783 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -968,7 +968,7 @@ mod tests { use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; - use crate::wal_options_allocator::WalOptionsAllocator; + use crate::wal_provider::WalProvider; /// A dummy implemented [NodeManager]. pub struct DummyDatanodeManager; @@ -993,7 +993,7 @@ mod tests { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), - Arc::new(WalOptionsAllocator::default()), + Arc::new(WalProvider::default()), )); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e223c91349..e3bc73aca0 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1474,6 +1474,7 @@ mod tests { use super::datanode_table::DatanodeTableKey; use super::test_utils; + use crate::ddl::allocator::wal_options::WalOptionsAllocator; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::utils::region_storage_path; use crate::error::Result; @@ -1490,7 +1491,7 @@ mod tests { use crate::peer::Peer; use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution}; use crate::rpc::store::RangeRequest; - use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options}; + use crate::wal_provider::WalProvider; #[test] fn test_deserialized_value_with_bytes() { @@ -1600,10 +1601,9 @@ mod tests { let region_route = new_test_region_route(); let region_routes = &vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info().into(); - let wal_allocator = WalOptionsAllocator::RaftEngine; - let regions = (0..16).collect(); - let region_wal_options = - allocate_region_wal_options(regions, &wal_allocator, false).unwrap(); + let wal_provider = WalProvider::RaftEngine; + let regions: Vec<_> = (0..16).collect(); + let region_wal_options = wal_provider.allocate(®ions, false).await.unwrap(); create_physical_table_metadata( &table_metadata_manager, table_info.clone(), diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index fe07ce4084..d77d615d0a 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::sync::Arc; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; @@ -62,12 +62,51 @@ pub enum TableRouteValue { Logical(LogicalTableRouteValue), } -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] +#[derive(Debug, PartialEq, Serialize, Clone, Default)] pub struct PhysicalTableRouteValue { + // The region routes of the table. pub region_routes: Vec, + // Tracks the highest region number ever allocated for the table. + // This value only increases: adding a region updates it if needed, + // and dropping regions does not decrease it. + pub max_region_number: RegionNumber, + // The version of the table route. version: u64, } +impl<'de> Deserialize<'de> for PhysicalTableRouteValue { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct Helper { + region_routes: Vec, + #[serde(default)] + max_region_number: Option, + version: u64, + } + + let mut helper = Helper::deserialize(deserializer)?; + // If the max region number is not provided, we will calculate it from the region routes. + if helper.max_region_number.is_none() { + let max_region = helper + .region_routes + .iter() + .map(|r| r.region.id.region_number()) + .max() + .unwrap_or_default(); + helper.max_region_number = Some(max_region); + } + + Ok(PhysicalTableRouteValue { + region_routes: helper.region_routes, + max_region_number: helper.max_region_number.unwrap_or_default(), + version: helper.version, + }) + } +} + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct LogicalTableRouteValue { physical_table_id: TableId, @@ -104,9 +143,19 @@ impl TableRouteValue { err_msg: format!("{self:?} is a non-physical TableRouteValue."), } ); - let version = self.as_physical_table_route_ref().version; + let physical_table_route = self.as_physical_table_route_ref(); + let original_max_region_number = physical_table_route.max_region_number; + let new_max_region_number = region_routes + .iter() + .map(|r| r.region.id.region_number()) + .max() + .unwrap_or_default(); + let version = physical_table_route.version; Ok(Self::Physical(PhysicalTableRouteValue { region_routes, + // If region routes are added, we will update the max region number. + // If region routes are removed, we will keep the original max region number. + max_region_number: original_max_region_number.max(new_max_region_number), version: version + 1, })) } @@ -159,6 +208,20 @@ impl TableRouteValue { Ok(&self.as_physical_table_route_ref().region_routes) } + /// Returns the max region number of this [TableRouteValue::Physical]. + /// + /// # Panic + /// If it is not the [`PhysicalTableRouteValue`]. + pub fn max_region_number(&self) -> Result { + ensure!( + self.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: format!("{self:?} is a non-physical TableRouteValue."), + } + ); + Ok(self.as_physical_table_route_ref().max_region_number) + } + /// Returns the reference of [`PhysicalTableRouteValue`]. /// /// # Panic @@ -227,8 +290,14 @@ impl MetadataValue for TableRouteValue { impl PhysicalTableRouteValue { pub fn new(region_routes: Vec) -> Self { + let max_region_number = region_routes + .iter() + .map(|r| r.region.id.region_number()) + .max() + .unwrap_or_default(); Self { region_routes, + max_region_number, version: 0, } } @@ -806,6 +875,57 @@ mod tests { use crate::rpc::router::Region; use crate::rpc::store::PutRequest; + #[test] + fn test_update_table_route_max_region_number() { + let table_route = PhysicalTableRouteValue::new(vec![ + RegionRoute { + region: Region { + id: RegionId::new(0, 1), + ..Default::default() + }, + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(0, 2), + ..Default::default() + }, + ..Default::default() + }, + ]); + assert_eq!(table_route.max_region_number, 2); + + // Shouldn't change the max region number. + let new_table_route = TableRouteValue::Physical(table_route) + .update(vec![RegionRoute { + region: Region { + id: RegionId::new(0, 1), + ..Default::default() + }, + ..Default::default() + }]) + .unwrap(); + assert_eq!( + new_table_route + .as_physical_table_route_ref() + .max_region_number, + 2 + ); + + // Should increase the max region number. + let new_table_route = new_table_route + .update(vec![RegionRoute { + region: Region { + id: RegionId::new(0, 3), + ..Default::default() + }, + ..Default::default() + }]) + .unwrap() + .into_physical_table_route(); + assert_eq!(new_table_route.max_region_number, 3); + } + #[test] fn test_table_route_compatibility() { let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#; @@ -846,6 +966,7 @@ mod tests { leader_down_since: None, }, ], + max_region_number: 1, version: 0, }); @@ -956,6 +1077,7 @@ mod tests { }], ..Default::default() }], + max_region_number: 0, version: 0, }); diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 17bca731b2..93cd229b16 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -48,7 +48,7 @@ pub mod stats; #[cfg(any(test, feature = "testing"))] pub mod test_util; pub mod util; -pub mod wal_options_allocator; +pub mod wal_provider; // The id of the datanode. pub type DatanodeId = u64; diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 0c11c49075..f849facbb4 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -81,6 +81,13 @@ pub trait PeerAllocator: Send + Sync { pub type PeerAllocatorRef = Arc; +#[async_trait::async_trait] +impl PeerAllocator for Arc { + async fn alloc(&self, num: usize) -> Result, Error> { + T::alloc(self, num).await + } +} + pub struct NoopPeerAllocator; #[async_trait::async_trait] diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index 115790a86e..17abfe70d1 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -144,6 +144,8 @@ impl ReconcileRegions { } /// Creates a region request builder from a raw table info. +/// +/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. fn create_region_request_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index 167d8bc001..d186446fda 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -19,6 +19,7 @@ use common_telemetry::{debug, warn}; use snafu::ensure; use tokio::sync::Mutex; +use crate::ddl::allocator::resource_id::ResourceIdAllocator; use crate::error::{self, Result}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::CompareAndPutRequest; @@ -82,6 +83,25 @@ pub struct Sequence { inner: Mutex, } +#[async_trait::async_trait] +impl ResourceIdAllocator for Sequence { + async fn next(&self) -> Result { + self.next().await + } + + async fn peek(&self) -> Result { + self.peek().await + } + + async fn jump_to(&self, next: u64) -> Result<()> { + self.jump_to(next).await + } + + async fn min_max(&self) -> Range { + self.min_max().await + } +} + impl Sequence { /// Returns the next value and increments the sequence. pub async fn next(&self) -> Result { diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 05f3b8c497..3396870ab6 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -40,8 +40,8 @@ use crate::peer::{Peer, PeerResolver}; use crate::region_keeper::MemoryRegionKeeper; use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; -use crate::wal_options_allocator::topic_pool::KafkaTopicPool; -use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator}; +use crate::wal_provider::topic_pool::KafkaTopicPool; +use crate::wal_provider::{WalProvider, build_kafka_topic_creator}; use crate::{DatanodeId, FlownodeId}; #[async_trait::async_trait] @@ -187,7 +187,7 @@ pub fn new_ddl_context_with_kv_backend( .initial(1024) .build(), ), - Arc::new(WalOptionsAllocator::default()), + Arc::new(WalProvider::default()), )); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_metadata_allocator = diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_provider.rs similarity index 72% rename from src/common/meta/src/wal_options_allocator.rs rename to src/common/meta/src/wal_provider.rs index 464b14b39f..cd599b58ba 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_provider.rs @@ -26,28 +26,46 @@ use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions}; use snafu::{ResultExt, ensure}; use store_api::storage::{RegionId, RegionNumber}; +use crate::ddl::allocator::wal_options::WalOptionsAllocator; use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result}; use crate::key::TOPIC_NAME_PATTERN_REGEX; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; -pub use crate::wal_options_allocator::topic_creator::{ - build_kafka_client, build_kafka_topic_creator, -}; -use crate::wal_options_allocator::topic_pool::KafkaTopicPool; +pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator}; +use crate::wal_provider::topic_pool::KafkaTopicPool; -/// Allocates wal options in region granularity. +/// Provides wal options in region granularity. #[derive(Default, Debug)] -pub enum WalOptionsAllocator { +pub enum WalProvider { #[default] RaftEngine, Kafka(KafkaTopicPool), } -/// Arc wrapper of WalOptionsAllocator. -pub type WalOptionsAllocatorRef = Arc; +/// Arc wrapper of WalProvider. +pub type WalProviderRef = Arc; -impl WalOptionsAllocator { - /// Tries to start the allocator. +#[async_trait::async_trait] +impl WalOptionsAllocator for WalProvider { + async fn allocate( + &self, + region_numbers: &[RegionNumber], + skip_wal: bool, + ) -> Result> { + let wal_options = self + .alloc_batch(region_numbers.len(), skip_wal)? + .into_iter() + .map(|wal_options| { + serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options }) + }) + .collect::>>()?; + + Ok(region_numbers.iter().copied().zip(wal_options).collect()) + } +} + +impl WalProvider { + /// Tries to start the provider. pub async fn start(&self) -> Result<()> { match self { Self::RaftEngine => Ok(()), @@ -56,14 +74,14 @@ impl WalOptionsAllocator { } /// Allocates a batch of wal options where each wal options goes to a region. - /// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type. + /// If skip_wal is true, the wal options will be set to Noop regardless of the provider type. pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result> { if skip_wal { return Ok(vec![WalOptions::Noop; num_regions]); } match self { - WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]), - WalOptionsAllocator::Kafka(topic_manager) => { + WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]), + WalProvider::Kafka(topic_manager) => { let options_batch = topic_manager .select_batch(num_regions)? .into_iter() @@ -80,14 +98,14 @@ impl WalOptionsAllocator { /// Returns true if it's the remote WAL. pub fn is_remote_wal(&self) -> bool { - matches!(&self, WalOptionsAllocator::Kafka(_)) + matches!(&self, WalProvider::Kafka(_)) } } #[async_trait] -impl LeadershipChangeListener for WalOptionsAllocator { +impl LeadershipChangeListener for WalProvider { fn name(&self) -> &str { - "WalOptionsAllocator" + "WalProvider" } async fn on_leader_start(&self) -> Result<()> { @@ -99,13 +117,13 @@ impl LeadershipChangeListener for WalOptionsAllocator { } } -/// Builds a wal options allocator based on the given configuration. -pub async fn build_wal_options_allocator( +/// Builds a wal provider based on the given configuration. +pub async fn build_wal_provider( config: &MetasrvWalConfig, kv_backend: KvBackendRef, -) -> Result { +) -> Result { match config { - MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine), + MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine), MetasrvWalConfig::Kafka(kafka_config) => { let prefix = &kafka_config.kafka_topic.topic_name_prefix; ensure!( @@ -116,28 +134,11 @@ pub async fn build_wal_options_allocator( build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic) .await?; let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator); - Ok(WalOptionsAllocator::Kafka(topic_pool)) + Ok(WalProvider::Kafka(topic_pool)) } } } -/// Allocates a wal options for each region. The allocated wal options is encoded immediately. -pub fn allocate_region_wal_options( - regions: Vec, - wal_options_allocator: &WalOptionsAllocator, - skip_wal: bool, -) -> Result> { - let wal_options = wal_options_allocator - .alloc_batch(regions.len(), skip_wal)? - .into_iter() - .map(|wal_options| { - serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options }) - }) - .collect::>>()?; - - Ok(regions.into_iter().zip(wal_options).collect()) -} - /// Inserts wal options into options. pub fn prepare_wal_options( options: &mut HashMap, @@ -182,21 +183,19 @@ mod tests { use crate::error::Error; use crate::kv_backend::memory::MemoryKvBackend; use crate::test_util::test_kafka_topic_pool; - use crate::wal_options_allocator::selector::RoundRobinTopicSelector; + use crate::wal_provider::selector::RoundRobinTopicSelector; - // Tests that the wal options allocator could successfully allocate raft-engine wal options. + // Tests that the wal provider could successfully allocate raft-engine wal options. #[tokio::test] - async fn test_allocator_with_raft_engine() { + async fn test_provider_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; let wal_config = MetasrvWalConfig::RaftEngine; - let allocator = build_wal_options_allocator(&wal_config, kv_backend) - .await - .unwrap(); - allocator.start().await.unwrap(); + let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap(); + provider.start().await.unwrap(); let num_regions = 32; let regions = (0..num_regions).collect::>(); - let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap(); + let got = provider.allocate(®ions, false).await.unwrap(); let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap(); let expected = regions @@ -216,14 +215,14 @@ mod tests { }, ..Default::default() }); - let got = build_wal_options_allocator(&wal_config, kv_backend) + let got = build_wal_provider(&wal_config, kv_backend) .await .unwrap_err(); assert_matches!(got, Error::InvalidTopicNamePrefix { .. }); } #[tokio::test] - async fn test_allocator_with_kafka_allocate_wal_options() { + async fn test_provider_with_kafka_allocate_wal_options() { common_telemetry::init_default_ut_logging(); maybe_skip_kafka_integration_test!(); let num_topics = 5; @@ -240,13 +239,13 @@ mod tests { let topic_creator = topic_pool.topic_creator(); topic_creator.delete_topics(&topics).await.unwrap(); - // Creates an options allocator. - let allocator = WalOptionsAllocator::Kafka(topic_pool); - allocator.start().await.unwrap(); + // Creates an options provider. + let provider = WalProvider::Kafka(topic_pool); + provider.start().await.unwrap(); let num_regions = 3; let regions = (0..num_regions).collect::>(); - let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap(); + let got = provider.allocate(®ions, false).await.unwrap(); // Check the allocated wal options contain the expected topics. let expected = (0..num_regions) @@ -261,13 +260,13 @@ mod tests { } #[tokio::test] - async fn test_allocator_with_skip_wal() { - let allocator = WalOptionsAllocator::RaftEngine; - allocator.start().await.unwrap(); + async fn test_provider_with_skip_wal() { + let provider = WalProvider::RaftEngine; + provider.start().await.unwrap(); let num_regions = 32; let regions = (0..num_regions).collect::>(); - let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap(); + let got = provider.allocate(®ions, true).await.unwrap(); assert_eq!(got.len(), num_regions as usize); for wal_options in got.values() { assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}"); diff --git a/src/common/meta/src/wal_options_allocator/selector.rs b/src/common/meta/src/wal_provider/selector.rs similarity index 100% rename from src/common/meta/src/wal_options_allocator/selector.rs rename to src/common/meta/src/wal_provider/selector.rs diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_provider/topic_creator.rs similarity index 100% rename from src/common/meta/src/wal_options_allocator/topic_creator.rs rename to src/common/meta/src/wal_provider/topic_creator.rs diff --git a/src/common/meta/src/wal_options_allocator/topic_manager.rs b/src/common/meta/src/wal_provider/topic_manager.rs similarity index 100% rename from src/common/meta/src/wal_options_allocator/topic_manager.rs rename to src/common/meta/src/wal_provider/topic_manager.rs diff --git a/src/common/meta/src/wal_options_allocator/topic_pool.rs b/src/common/meta/src/wal_provider/topic_pool.rs similarity index 96% rename from src/common/meta/src/wal_options_allocator/topic_pool.rs rename to src/common/meta/src/wal_provider/topic_pool.rs index 6be1cfb778..919f0b2abe 100644 --- a/src/common/meta/src/wal_options_allocator/topic_pool.rs +++ b/src/common/meta/src/wal_provider/topic_pool.rs @@ -22,9 +22,9 @@ use snafu::ensure; use crate::error::{InvalidNumTopicsSnafu, Result}; use crate::kv_backend::KvBackendRef; -use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef}; -use crate::wal_options_allocator::topic_creator::KafkaTopicCreator; -use crate::wal_options_allocator::topic_manager::KafkaTopicManager; +use crate::wal_provider::selector::{RoundRobinTopicSelector, TopicSelectorRef}; +use crate::wal_provider::topic_creator::KafkaTopicCreator; +use crate::wal_provider::topic_manager::KafkaTopicManager; /// Topic pool for kafka remote wal. /// Responsible for: @@ -144,7 +144,7 @@ mod tests { use super::*; use crate::error::Error; use crate::test_util::test_kafka_topic_pool; - use crate::wal_options_allocator::selector::RoundRobinTopicSelector; + use crate::wal_provider::selector::RoundRobinTopicSelector; #[tokio::test] async fn test_pool_invalid_number_topics_err() { diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 76ca806a98..8b39d70d77 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; -use common_meta::wal_options_allocator::prepare_wal_options; +use common_meta::wal_provider::prepare_wal_options; use store_api::path_utils::table_dir; use store_api::region_request::{PathType, RegionOpenRequest}; use store_api::storage::RegionId; diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs index 64d6a40a9c..488ddacdf0 100644 --- a/src/datanode/src/utils.rs +++ b/src/datanode/src/utils.rs @@ -18,7 +18,7 @@ use common_meta::DatanodeId; use common_meta::key::datanode_table::DatanodeTableManager; use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue}; use common_meta::kv_backend::KvBackendRef; -use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options}; +use common_meta::wal_provider::{extract_topic_from_wal_options, prepare_wal_options}; use futures::TryStreamExt; use snafu::ResultExt; use store_api::path_utils::table_dir; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 838dbba64a..11a1c28cfb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -936,8 +936,8 @@ pub enum Error { source: common_meta::error::Error, }, - #[snafu(display("Failed to build wal options allocator"))] - BuildWalOptionsAllocator { + #[snafu(display("Failed to build wal provider"))] + BuildWalProvider { #[snafu(implicit)] location: Location, source: common_meta::error::Error, @@ -1060,6 +1060,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to allocate regions for table: {}", table_id))] + AllocateRegions { + #[snafu(implicit)] + location: Location, + table_id: TableId, + #[snafu(source)] + source: common_meta::error::Error, + }, + #[snafu(display("Failed to deallocate regions for table: {}", table_id))] DeallocateRegions { #[snafu(implicit)] @@ -1068,6 +1077,33 @@ pub enum Error { #[snafu(source)] source: common_meta::error::Error, }, + + #[snafu(display("Failed to build create request for table: {}", table_id))] + BuildCreateRequest { + #[snafu(implicit)] + location: Location, + table_id: TableId, + #[snafu(source)] + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to allocate region routes for table: {}", table_id))] + AllocateRegionRoutes { + #[snafu(implicit)] + location: Location, + table_id: TableId, + #[snafu(source)] + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to allocate wal options for table: {}", table_id))] + AllocateWalOptions { + #[snafu(implicit)] + location: Location, + table_id: TableId, + #[snafu(source)] + source: common_meta::error::Error, + }, } impl Error { @@ -1113,7 +1149,7 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::ChooseItems { .. } | Error::FlowStateHandler { .. } - | Error::BuildWalOptionsAllocator { .. } + | Error::BuildWalProvider { .. } | Error::BuildPartitionClient { .. } | Error::BuildKafkaClient { .. } => StatusCode::Internal, @@ -1215,7 +1251,11 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::RepartitionCreateSubtasks { source, .. } => source.status_code(), Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(), + Error::AllocateRegions { source, .. } => source.status_code(), Error::DeallocateRegions { source, .. } => source.status_code(), + Error::AllocateRegionRoutes { source, .. } => source.status_code(), + Error::AllocateWalOptions { source, .. } => source.status_code(), + Error::BuildCreateRequest { source, .. } => source.status_code(), Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, #[cfg(feature = "pg_kvbackend")] diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 19bcb3ee4a..6b0968f224 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,6 +27,7 @@ use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef; use common_meta::ddl_manager::DdlManagerRef; use common_meta::distributed_time_constants::{ self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval, @@ -42,9 +43,8 @@ use common_meta::peer::{Peer, PeerDiscoveryRef}; use common_meta::reconciliation::manager::ReconciliationManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; -use common_meta::sequence::SequenceRef; use common_meta::stats::topic::TopicStatsRegistryRef; -use common_meta::wal_options_allocator::WalOptionsAllocatorRef; +use common_meta::wal_provider::WalProviderRef; use common_options::datanode::DatanodeClientOptions; use common_options::memory::MemoryOptions; use common_procedure::ProcedureManagerRef; @@ -624,7 +624,7 @@ pub struct Metasrv { procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, ddl_manager: DdlManagerRef, - wal_options_allocator: WalOptionsAllocatorRef, + wal_provider: WalProviderRef, table_metadata_manager: TableMetadataManagerRef, runtime_switch_manager: RuntimeSwitchManagerRef, memory_region_keeper: MemoryRegionKeeperRef, @@ -636,7 +636,7 @@ pub struct Metasrv { topic_stats_registry: TopicStatsRegistryRef, wal_prune_ticker: Option, region_flush_ticker: Option, - table_id_sequence: SequenceRef, + table_id_allocator: ResourceIdAllocatorRef, reconciliation_manager: ReconciliationManagerRef, resource_stat: ResourceStatRef, gc_ticker: Option, @@ -684,7 +684,7 @@ impl Metasrv { // Builds leadership change notifier. let mut leadership_change_notifier = LeadershipChangeNotifier::default(); - leadership_change_notifier.add_listener(self.wal_options_allocator.clone()); + leadership_change_notifier.add_listener(self.wal_provider.clone()); leadership_change_notifier .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager))); leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new( @@ -782,8 +782,8 @@ impl Metasrv { "Ensure only one instance of Metasrv is running, as there is no election service." ); - if let Err(e) = self.wal_options_allocator.start().await { - error!(e; "Failed to start wal options allocator"); + if let Err(e) = self.wal_provider.start().await { + error!(e; "Failed to start wal provider"); } // Always load kv into cached kv store. self.leader_cached_kv_backend @@ -931,8 +931,8 @@ impl Metasrv { self.plugins.get::() } - pub fn table_id_sequence(&self) -> &SequenceRef { - &self.table_id_sequence + pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef { + &self.table_id_allocator } pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index fc93886aed..4180a4b404 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -43,7 +43,7 @@ use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::stats::topic::TopicStatsRegistry; -use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator}; +use common_meta::wal_provider::{build_kafka_client, build_wal_provider}; use common_procedure::ProcedureManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; use common_stat::ResourceStatImpl; @@ -54,7 +54,7 @@ use store_api::storage::MAX_REGION_SEQ; use crate::bootstrap::build_default_meta_peer_client; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::MetaPeerClientRef; -use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result}; +use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result}; use crate::events::EventHandlerImpl; use crate::gc::GcScheduler; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; @@ -241,11 +241,11 @@ impl MetasrvBuilder { peer_discovery: meta_peer_client.clone(), }; - let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone()) + let wal_provider = build_wal_provider(&options.wal, kv_backend.clone()) .await - .context(BuildWalOptionsAllocatorSnafu)?; - let wal_options_allocator = Arc::new(wal_options_allocator); - let is_remote_wal = wal_options_allocator.is_remote_wal(); + .context(BuildWalProviderSnafu)?; + let wal_provider = Arc::new(wal_provider); + let is_remote_wal = wal_provider.is_remote_wal(); let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -259,11 +259,11 @@ impl MetasrvBuilder { ); Arc::new(TableMetadataAllocator::with_peer_allocator( sequence, - wal_options_allocator.clone(), + wal_provider.clone(), peer_allocator, )) }); - let table_id_sequence = table_metadata_allocator.table_id_sequence(); + let table_id_allocator = table_metadata_allocator.table_id_allocator(); let flow_selector = Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef; @@ -568,7 +568,7 @@ impl MetasrvBuilder { procedure_manager, mailbox, ddl_manager, - wal_options_allocator, + wal_provider, table_metadata_manager, runtime_switch_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( @@ -585,7 +585,7 @@ impl MetasrvBuilder { leader_region_registry, wal_prune_ticker, region_flush_ticker, - table_id_sequence, + table_id_allocator, reconciliation_manager, topic_stats_registry, resource_stat: Arc::new(resource_stat), diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 9d0cb0d65a..0621f30361 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -23,7 +23,7 @@ use common_meta::instruction::{ }; use common_meta::key::topic_region::TopicRegionKey; use common_meta::lock_key::RemoteWalLock; -use common_meta::wal_options_allocator::extract_topic_from_wal_options; +use common_meta::wal_provider::extract_topic_from_wal_options; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{error, info}; use common_wal::options::WalOptions; diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index 3d58f473b7..a296c4dbb2 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -27,12 +27,15 @@ use std::fmt::Debug; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef; +use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::RegionInfo; +use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::node_manager::NodeManagerRef; -use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; +use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; @@ -57,13 +60,8 @@ pub struct PersistentContext { pub plans: Vec, } -pub struct VolatileContext { - pub allocating_regions: Vec, -} - pub struct Context { pub persistent_ctx: PersistentContext, - pub volatile_ctx: VolatileContext, pub table_metadata_manager: TableMetadataManagerRef, pub memory_region_keeper: MemoryRegionKeeperRef, pub node_manager: NodeManagerRef, @@ -71,6 +69,8 @@ pub struct Context { pub mailbox: MailboxRef, pub server_addr: String, pub cache_invalidator: CacheInvalidatorRef, + pub region_routes_allocator: RegionRoutesAllocatorRef, + pub wal_options_allocator: WalOptionsAllocatorRef, } impl Context { @@ -100,6 +100,29 @@ impl Context { Ok(table_route_value) } + /// Retrieves the table info value for the given table id. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + /// + /// Abort: + /// - Table info not found. + pub async fn get_table_info_value(&self) -> Result { + let table_id = self.persistent_ctx.table_id; + let table_info_value = self + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get table info for table: {}", table_id), + })? + .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner(); + Ok(table_info_value) + } + /// Updates the table route. /// /// Retry: diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index bcdf17e10f..fbf565b946 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -13,13 +13,30 @@ // limitations under the License. use std::any::Any; +use std::collections::{HashMap, HashSet}; +use common_meta::ddl::create_table::executor::CreateTableExecutor; +use common_meta::ddl::create_table::template::{ + CreateRequestBuilder, build_template_from_raw_table_info, +}; +use common_meta::lock_key::TableLock; +use common_meta::node_manager::NodeManagerRef; +use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; +use common_meta::rpc::router::{RegionRoute, operating_leader_regions}; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionNumber, TableId}; +use table::metadata::RawTableInfo; +use table::table_reference::TableReference; -use crate::error::Result; +use crate::error::{self, Result}; use crate::procedure::repartition::dispatch::Dispatch; -use crate::procedure::repartition::plan::{AllocationPlanEntry, RepartitionPlanEntry}; +use crate::procedure::repartition::plan::{ + AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry, + convert_allocation_plan_to_repartition_plan, +}; use crate::procedure::repartition::{Context, State}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -27,41 +44,423 @@ pub struct AllocateRegion { plan_entries: Vec, } -impl AllocateRegion { - pub fn new(plan_entries: Vec) -> Self { - Self { plan_entries } - } -} - #[async_trait::async_trait] #[typetag::serde] impl State for AllocateRegion { async fn next( &mut self, ctx: &mut Context, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { - let region_to_allocate = self - .plan_entries - .iter() - .map(|p| p.regions_to_allocate) - .sum::(); + let table_id = ctx.persistent_ctx.table_id; + let table_route_value = ctx.get_table_route_value().await?; + // Safety: it is physical table route value. + let region_routes = table_route_value.region_routes().unwrap(); + let mut next_region_number = + Self::get_next_region_number(table_route_value.max_region_number().unwrap()); - if region_to_allocate == 0 { - let repartition_plan_entries = self - .plan_entries - .iter() - .map(RepartitionPlanEntry::from_allocation_plan_entry) - .collect::>(); + // Converts allocation plan to repartition plan. + let repartition_plan_entries = Self::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &self.plan_entries, + ); + + // If no region to allocate, directly dispatch the plan. + if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 { ctx.persistent_ctx.plans = repartition_plan_entries; return Ok((Box::new(Dispatch), Status::executing(true))); } - // TODO(weny): allocate regions. - todo!() + let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries); + let region_number_and_partition_exprs = + Self::prepare_region_allocation_data(&allocate_regions)?; + let table_info_value = ctx.get_table_info_value().await?; + let new_allocated_region_routes = ctx + .region_routes_allocator + .allocate( + table_id, + ®ion_number_and_partition_exprs + .iter() + .map(|(n, p)| (*n, p.as_str())) + .collect::>(), + ) + .await + .context(error::AllocateRegionRoutesSnafu { table_id })?; + let wal_options = ctx + .wal_options_allocator + .allocate( + &allocate_regions + .iter() + .map(|r| r.region_id.region_number()) + .collect::>(), + table_info_value.table_info.meta.options.skip_wal, + ) + .await + .context(error::AllocateWalOptionsSnafu { table_id })?; + + let _operating_guards = Self::register_operating_regions( + &ctx.memory_region_keeper, + &new_allocated_region_routes, + )?; + // Allocates the regions on datanodes. + Self::allocate_regions( + &ctx.node_manager, + &table_info_value.table_info, + &new_allocated_region_routes, + &wal_options, + ) + .await?; + + // TODO(weny): for metric engine, sync logical regions from the the central region. + + // Updates the table routes. + let table_lock = TableLock::Write(table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + let new_region_routes = + Self::generate_region_routes(region_routes, &new_allocated_region_routes); + ctx.update_table_route(&table_route_value, new_region_routes) + .await?; + ctx.invalidate_table_cache().await?; + + ctx.persistent_ctx.plans = repartition_plan_entries; + Ok((Box::new(Dispatch), Status::executing(true))) } fn as_any(&self) -> &dyn Any { self } } + +impl AllocateRegion { + pub fn new(plan_entries: Vec) -> Self { + Self { plan_entries } + } + + #[allow(dead_code)] + fn register_operating_regions( + memory_region_keeper: &MemoryRegionKeeperRef, + region_routes: &[RegionRoute], + ) -> Result> { + let mut operating_guards = Vec::with_capacity(region_routes.len()); + for (region_id, datanode_id) in operating_leader_regions(region_routes) { + let guard = memory_region_keeper + .register(datanode_id, region_id) + .context(error::RegionOperatingRaceSnafu { + peer_id: datanode_id, + region_id, + })?; + operating_guards.push(guard); + } + Ok(operating_guards) + } + + #[allow(dead_code)] + fn generate_region_routes( + region_routes: &[RegionRoute], + new_allocated_region_ids: &[RegionRoute], + ) -> Vec { + let region_ids = region_routes + .iter() + .map(|r| r.region.id) + .collect::>(); + let mut new_region_routes = region_routes.to_vec(); + for new_allocated_region_id in new_allocated_region_ids { + if !region_ids.contains(&new_allocated_region_id.region.id) { + new_region_routes.push(new_allocated_region_id.clone()); + } + } + new_region_routes + } + + /// Converts allocation plan entries to repartition plan entries. + /// + /// This method takes the allocation plan entries and converts them to repartition plan entries, + /// updating `next_region_number` for each newly allocated region. + #[allow(dead_code)] + fn convert_to_repartition_plans( + table_id: TableId, + next_region_number: &mut RegionNumber, + plan_entries: &[AllocationPlanEntry], + ) -> Vec { + plan_entries + .iter() + .map(|plan_entry| { + convert_allocation_plan_to_repartition_plan( + table_id, + next_region_number, + plan_entry, + ) + }) + .collect() + } + + /// Collects all regions that need to be allocated from the repartition plan entries. + #[allow(dead_code)] + fn collect_allocate_regions( + repartition_plan_entries: &[RepartitionPlanEntry], + ) -> Vec<&RegionDescriptor> { + repartition_plan_entries + .iter() + .flat_map(|p| p.allocate_regions()) + .collect() + } + + /// Prepares region allocation data: region numbers and their partition expressions. + #[allow(dead_code)] + fn prepare_region_allocation_data( + allocate_regions: &[&RegionDescriptor], + ) -> Result> { + allocate_regions + .iter() + .map(|r| { + Ok(( + r.region_id.region_number(), + r.partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?, + )) + }) + .collect() + } + + /// Calculates the total number of regions that need to be allocated. + #[allow(dead_code)] + fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize { + repartition_plan_entries + .iter() + .map(|p| p.allocated_region_ids.len()) + .sum() + } + + /// Gets the next region number from the physical table route. + #[allow(dead_code)] + fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber { + max_region_number + 1 + } + + #[allow(dead_code)] + async fn allocate_regions( + node_manager: &NodeManagerRef, + raw_table_info: &RawTableInfo, + region_routes: &[RegionRoute], + wal_options: &HashMap, + ) -> Result<()> { + let table_ref = TableReference::full( + &raw_table_info.catalog_name, + &raw_table_info.schema_name, + &raw_table_info.name, + ); + let table_id = raw_table_info.ident.table_id; + let request = build_template_from_raw_table_info(raw_table_info) + .context(error::BuildCreateRequestSnafu { table_id })?; + let builder = CreateRequestBuilder::new(request, None); + info!( + "Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}", + table_id, region_routes, wal_options + ); + let executor = CreateTableExecutor::new(table_ref.into(), false, builder); + executor + .on_create_regions(node_manager, table_id, region_routes, wal_options) + .await + .context(error::AllocateRegionsSnafu { table_id })?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use store_api::storage::RegionId; + use uuid::Uuid; + + use super::*; + use crate::procedure::repartition::test_util::range_expr; + + fn create_region_descriptor( + table_id: TableId, + region_number: u32, + col: &str, + start: i64, + end: i64, + ) -> RegionDescriptor { + RegionDescriptor { + region_id: RegionId::new(table_id, region_number), + partition_expr: range_expr(col, start, end), + } + } + + fn create_allocation_plan_entry( + table_id: TableId, + source_region_numbers: &[u32], + target_ranges: &[(i64, i64)], + ) -> AllocationPlanEntry { + let source_regions = source_region_numbers + .iter() + .enumerate() + .map(|(i, &n)| { + let start = i as i64 * 100; + let end = (i + 1) as i64 * 100; + create_region_descriptor(table_id, n, "x", start, end) + }) + .collect(); + + let target_partition_exprs = target_ranges + .iter() + .map(|&(start, end)| range_expr("x", start, end)) + .collect(); + + AllocationPlanEntry { + group_id: Uuid::new_v4(), + source_regions, + target_partition_exprs, + transition_map: vec![], + } + } + + #[test] + fn test_convert_to_repartition_plans_no_allocation() { + let table_id = 1024; + let mut next_region_number = 10; + + // 2 source -> 2 target (no allocation needed) + let plan_entries = vec![create_allocation_plan_entry( + table_id, + &[1, 2], + &[(0, 50), (50, 200)], + )]; + + let result = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &plan_entries, + ); + + assert_eq!(result.len(), 1); + assert_eq!(result[0].target_regions.len(), 2); + assert!(result[0].allocated_region_ids.is_empty()); + // next_region_number should not change + assert_eq!(next_region_number, 10); + } + + #[test] + fn test_convert_to_repartition_plans_with_allocation() { + let table_id = 1024; + let mut next_region_number = 10; + + // 2 source -> 4 target (need to allocate 2 regions) + let plan_entries = vec![create_allocation_plan_entry( + table_id, + &[1, 2], + &[(0, 50), (50, 100), (100, 150), (150, 200)], + )]; + + let result = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &plan_entries, + ); + + assert_eq!(result.len(), 1); + assert_eq!(result[0].target_regions.len(), 4); + assert_eq!(result[0].allocated_region_ids.len(), 2); + assert_eq!( + result[0].allocated_region_ids[0], + RegionId::new(table_id, 10) + ); + assert_eq!( + result[0].allocated_region_ids[1], + RegionId::new(table_id, 11) + ); + // next_region_number should be incremented by 2 + assert_eq!(next_region_number, 12); + } + + #[test] + fn test_convert_to_repartition_plans_multiple_entries() { + let table_id = 1024; + let mut next_region_number = 10; + + // Multiple plan entries with different allocation needs + let plan_entries = vec![ + create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation + create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation + create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations + ]; + + let result = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &plan_entries, + ); + + assert_eq!(result.len(), 3); + assert_eq!(result[0].allocated_region_ids.len(), 1); + assert_eq!(result[1].allocated_region_ids.len(), 0); + assert_eq!(result[2].allocated_region_ids.len(), 2); + // next_region_number should be incremented by 3 total + assert_eq!(next_region_number, 13); + } + + #[test] + fn test_count_regions_to_allocate() { + let table_id = 1024; + let mut next_region_number = 10; + + let plan_entries = vec![ + create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation + create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate) + create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation + ]; + + let repartition_plans = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &plan_entries, + ); + + let count = AllocateRegion::count_regions_to_allocate(&repartition_plans); + assert_eq!(count, 2); + } + + #[test] + fn test_collect_allocate_regions() { + let table_id = 1024; + let mut next_region_number = 10; + + let plan_entries = vec![ + create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation + create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation + ]; + + let repartition_plans = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &plan_entries, + ); + + let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans); + assert_eq!(allocate_regions.len(), 2); + assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10)); + assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11)); + } + + #[test] + fn test_prepare_region_allocation_data() { + let table_id = 1024; + let regions = [ + create_region_descriptor(table_id, 10, "x", 0, 50), + create_region_descriptor(table_id, 11, "x", 50, 100), + ]; + let region_refs: Vec<&RegionDescriptor> = regions.iter().collect(); + + let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap(); + + assert_eq!(result.len(), 2); + assert_eq!(result[0].0, 10); + assert_eq!(result[1].0, 11); + // Verify partition expressions are serialized + assert!(!result[0].1.is_empty()); + assert!(!result[1].1.is_empty()); + } +} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs index 9d22531a1d..9eabec7101 100644 --- a/src/meta-srv/src/procedure/repartition/plan.rs +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; + use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::procedure::repartition::group::GroupId; @@ -37,10 +39,6 @@ pub struct AllocationPlanEntry { pub source_regions: Vec, /// The target partition expressions for the new or changed regions. pub target_partition_exprs: Vec, - /// The number of regions that need to be allocated (target count - source count, if positive). - pub regions_to_allocate: usize, - /// The number of regions that need to be deallocated (source count - target count, if positive). - pub regions_to_deallocate: usize, /// For each `source_regions[k]`, the corresponding vector contains global /// `target_partition_exprs` that overlap with it. pub transition_map: Vec>, @@ -66,37 +64,449 @@ pub struct RepartitionPlanEntry { } impl RepartitionPlanEntry { - /// Converts an allocation plan entry into a repartition plan entry. - /// - /// The target regions are derived from the source regions and the target partition expressions. - /// The allocated region ids and pending deallocate region ids are empty. - pub fn from_allocation_plan_entry( - AllocationPlanEntry { - group_id, - source_regions, - target_partition_exprs, - regions_to_allocate, - regions_to_deallocate, - transition_map, - }: &AllocationPlanEntry, - ) -> Self { - debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0); - let target_regions = source_regions + /// Returns the target regions that are newly allocated. + pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> { + self.target_regions .iter() - .zip(target_partition_exprs.iter()) - .map(|(source_region, target_partition_expr)| RegionDescriptor { - region_id: source_region.region_id, - partition_expr: target_partition_expr.clone(), - }) - .collect::>(); + .filter(|r| self.allocated_region_ids.contains(&r.region_id)) + .collect() + } +} - Self { - group_id: *group_id, - source_regions: source_regions.clone(), - target_regions, - allocated_region_ids: vec![], - pending_deallocate_region_ids: vec![], - transition_map: transition_map.clone(), +/// Converts an allocation plan to a repartition plan. +/// +/// Converts an [`AllocationPlanEntry`] (which contains abstract region allocation intents) +/// into a [`RepartitionPlanEntry`] with concrete source and target region descriptors, +/// plus records information on which regions are newly allocated and/or pending deallocation. +/// +/// # Returns +/// +/// A [`Result`] containing the [`RepartitionPlanEntry`] describing exactly the source regions, +/// the target regions (including any that need to be newly allocated), and transition mappings. +/// +/// # Notes +/// - If new regions are needed, their region ids are constructed using `table_id` and incrementing +/// from `next_region_number`. +/// - For each target, associates the correct region descriptor; for new regions, the region id +/// is assigned sequentially. +pub fn convert_allocation_plan_to_repartition_plan( + table_id: TableId, + next_region_number: &mut RegionNumber, + AllocationPlanEntry { + group_id, + source_regions, + target_partition_exprs, + transition_map, + .. + }: &AllocationPlanEntry, +) -> RepartitionPlanEntry { + match source_regions.len().cmp(&target_partition_exprs.len()) { + Ordering::Less => { + // requires to allocate regions + let pending_allocate_target_partition_exprs = target_partition_exprs + .iter() + .skip(source_regions.len()) + .map(|target_partition_expr| { + let desc = RegionDescriptor { + region_id: RegionId::new(table_id, *next_region_number), + partition_expr: target_partition_expr.clone(), + }; + *next_region_number += 1; + desc + }) + .collect::>(); + + let allocated_region_ids = pending_allocate_target_partition_exprs + .iter() + .map(|rd| rd.region_id) + .collect::>(); + + let target_regions = source_regions + .iter() + .zip(target_partition_exprs.iter()) + .map(|(source_region, target_partition_expr)| RegionDescriptor { + region_id: source_region.region_id, + partition_expr: target_partition_expr.clone(), + }) + .chain(pending_allocate_target_partition_exprs) + .collect::>(); + + RepartitionPlanEntry { + group_id: *group_id, + source_regions: source_regions.clone(), + target_regions, + allocated_region_ids, + pending_deallocate_region_ids: vec![], + transition_map: transition_map.clone(), + } + } + Ordering::Equal => { + let target_regions = source_regions + .iter() + .zip(target_partition_exprs.iter()) + .map(|(source_region, target_partition_expr)| RegionDescriptor { + region_id: source_region.region_id, + partition_expr: target_partition_expr.clone(), + }) + .collect::>(); + + RepartitionPlanEntry { + group_id: *group_id, + source_regions: source_regions.clone(), + target_regions, + allocated_region_ids: vec![], + pending_deallocate_region_ids: vec![], + transition_map: transition_map.clone(), + } + } + Ordering::Greater => { + // requires to deallocate regions + let target_regions = source_regions + .iter() + .take(target_partition_exprs.len()) + .zip(target_partition_exprs.iter()) + .map(|(source_region, target_partition_expr)| RegionDescriptor { + region_id: source_region.region_id, + partition_expr: target_partition_expr.clone(), + }) + .collect::>(); + + let pending_deallocate_region_ids = source_regions + .iter() + .skip(target_partition_exprs.len()) + .map(|source_region| source_region.region_id) + .collect::>(); + + RepartitionPlanEntry { + group_id: *group_id, + source_regions: source_regions.clone(), + target_regions, + allocated_region_ids: vec![], + pending_deallocate_region_ids, + transition_map: transition_map.clone(), + } } } } + +#[cfg(test)] +mod tests { + use store_api::storage::RegionId; + use uuid::Uuid; + + use super::*; + use crate::procedure::repartition::test_util::range_expr; + + fn create_region_descriptor( + table_id: TableId, + region_number: u32, + col: &str, + start: i64, + end: i64, + ) -> RegionDescriptor { + RegionDescriptor { + region_id: RegionId::new(table_id, region_number), + partition_expr: range_expr(col, start, end), + } + } + + #[test] + fn test_convert_plan_equal_regions() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let mut next_region_number = 10; + let source_regions = vec![ + create_region_descriptor(table_id, 1, "x", 0, 100), + create_region_descriptor(table_id, 2, "x", 100, 200), + ]; + let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: Vec::new(), + }; + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + assert_eq!(result.group_id, group_id); + assert_eq!(result.source_regions, source_regions); + assert_eq!(result.target_regions.len(), 2); + assert!(result.allocated_region_ids.is_empty()); + assert!(result.pending_deallocate_region_ids.is_empty()); + // next_region_number should not change for equal regions + assert_eq!(next_region_number, 10); + // Verify target regions + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + assert_eq!( + result.target_regions[1].region_id, + RegionId::new(table_id, 2) + ); + assert_eq!( + result.target_regions[1].partition_expr, + target_partition_exprs[1] + ); + } + + #[test] + fn test_convert_plan_allocate_regions() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let mut next_region_number = 10; + + // 3 source regions -> 5 target partition expressions + let source_regions = vec![ + create_region_descriptor(table_id, 1, "x", 0, 100), + create_region_descriptor(table_id, 2, "x", 100, 200), + create_region_descriptor(table_id, 3, "x", 200, 300), + ]; + let target_partition_exprs = vec![ + range_expr("x", 0, 50), + range_expr("x", 50, 100), + range_expr("x", 100, 150), + range_expr("x", 150, 200), + range_expr("x", 200, 300), + ]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: vec![], + }; + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + assert_eq!(result.group_id, group_id); + assert_eq!(result.source_regions, source_regions); + assert_eq!(result.target_regions.len(), 5); + assert_eq!(result.allocated_region_ids.len(), 2); + assert!(result.pending_deallocate_region_ids.is_empty()); + assert_eq!(next_region_number, 12); + + // Verify first 3 target regions use source region ids with target partition exprs + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + assert_eq!( + result.target_regions[1].region_id, + RegionId::new(table_id, 2) + ); + assert_eq!( + result.target_regions[1].partition_expr, + target_partition_exprs[1] + ); + assert_eq!( + result.target_regions[2].region_id, + RegionId::new(table_id, 3) + ); + assert_eq!( + result.target_regions[2].partition_expr, + target_partition_exprs[2] + ); + + // Verify last 2 target regions are newly allocated + assert_eq!( + result.target_regions[3].region_id, + RegionId::new(table_id, 10) + ); + assert_eq!( + result.target_regions[3].partition_expr, + target_partition_exprs[3] + ); + assert_eq!( + result.target_regions[4].region_id, + RegionId::new(table_id, 11) + ); + assert_eq!( + result.target_regions[4].partition_expr, + target_partition_exprs[4] + ); + + // Verify allocated region ids + assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10)); + assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11)); + } + + #[test] + fn test_convert_plan_deallocate_regions() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + + // 5 source regions -> 3 target partition expressions + let source_regions = vec![ + create_region_descriptor(table_id, 1, "x", 0, 50), + create_region_descriptor(table_id, 2, "x", 50, 100), + create_region_descriptor(table_id, 3, "x", 100, 150), + create_region_descriptor(table_id, 4, "x", 150, 200), + create_region_descriptor(table_id, 5, "x", 200, 300), + ]; + let target_partition_exprs = vec![ + range_expr("x", 0, 100), + range_expr("x", 100, 200), + range_expr("x", 200, 300), + ]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: vec![], + }; + let mut next_region_number = 10; + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + assert_eq!(next_region_number, 10); + assert_eq!(result.group_id, group_id); + assert_eq!(result.source_regions, source_regions); + assert_eq!(result.target_regions.len(), 3); + assert!(result.allocated_region_ids.is_empty()); + assert_eq!(result.pending_deallocate_region_ids.len(), 2); + + // Verify first 3 source regions are kept as target regions with new partition exprs + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + assert_eq!( + result.target_regions[1].region_id, + RegionId::new(table_id, 2) + ); + assert_eq!( + result.target_regions[1].partition_expr, + target_partition_exprs[1] + ); + assert_eq!( + result.target_regions[2].region_id, + RegionId::new(table_id, 3) + ); + assert_eq!( + result.target_regions[2].partition_expr, + target_partition_exprs[2] + ); + + // Verify last 2 source regions are pending deallocation + assert_eq!( + result.pending_deallocate_region_ids[0], + RegionId::new(table_id, 4) + ); + assert_eq!( + result.pending_deallocate_region_ids[1], + RegionId::new(table_id, 5) + ); + } + + #[test] + fn test_convert_plan_allocate_single_region() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let mut next_region_number = 5; + // 1 source region -> 2 target partition expressions + let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)]; + let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: vec![], + }; + + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + assert_eq!(result.target_regions.len(), 2); + assert_eq!(result.allocated_region_ids.len(), 1); + assert_eq!(result.pending_deallocate_region_ids.len(), 0); + // First target uses source region id + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + // Second target is newly allocated + assert_eq!( + result.target_regions[1].region_id, + RegionId::new(table_id, 5) + ); + assert_eq!( + result.target_regions[1].partition_expr, + target_partition_exprs[1] + ); + assert_eq!(next_region_number, 6); + } + + #[test] + fn test_convert_plan_deallocate_to_single_region() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + + // 3 source regions -> 1 target partition expression + let source_regions = vec![ + create_region_descriptor(table_id, 1, "x", 0, 100), + create_region_descriptor(table_id, 2, "x", 100, 200), + create_region_descriptor(table_id, 3, "x", 200, 300), + ]; + let target_partition_exprs = vec![range_expr("x", 0, 300)]; + let allocation_plan = AllocationPlanEntry { + group_id, + source_regions: source_regions.clone(), + target_partition_exprs: target_partition_exprs.clone(), + transition_map: vec![], + }; + let mut next_region_number = 10; + let result = convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plan, + ); + assert_eq!(result.target_regions.len(), 1); + assert_eq!(result.allocated_region_ids.len(), 0); + assert_eq!(result.pending_deallocate_region_ids.len(), 2); + + // Only first source region is kept + assert_eq!( + result.target_regions[0].region_id, + RegionId::new(table_id, 1) + ); + assert_eq!( + result.target_regions[0].partition_expr, + target_partition_exprs[0] + ); + + // Other regions are pending deallocation + assert_eq!( + result.pending_deallocate_region_ids[0], + RegionId::new(table_id, 2) + ); + assert_eq!( + result.pending_deallocate_region_ids[1], + RegionId::new(table_id, 3) + ); + } +} diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 3792222dc9..88e84546c8 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -127,18 +127,10 @@ impl RepartitionStart { .iter() .map(|&idx| target_exprs[idx].clone()) .collect::>(); - let regions_to_allocate = target_partition_exprs - .len() - .saturating_sub(source_regions.len()); - let regions_to_deallocate = source_regions - .len() - .saturating_sub(target_partition_exprs.len()); AllocationPlanEntry { group_id, source_regions, target_partition_exprs, - regions_to_allocate, - regions_to_deallocate, transition_map: subtask.transition_map, } }) diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index ae4c7c3e43..b120b39b79 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -127,7 +127,7 @@ pub mod test_data { use common_meta::region_registry::LeaderRegionRegistry; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; - use common_meta::wal_options_allocator::WalOptionsAllocator; + use common_meta::wal_provider::WalProvider; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -211,7 +211,7 @@ pub mod test_data { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), - Arc::new(WalOptionsAllocator::default()), + Arc::new(WalProvider::default()), )); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( diff --git a/src/meta-srv/src/procedure/wal_prune/test_util.rs b/src/meta-srv/src/procedure/wal_prune/test_util.rs index e94aa184c3..96f72402ad 100644 --- a/src/meta-srv/src/procedure/wal_prune/test_util.rs +++ b/src/meta-srv/src/procedure/wal_prune/test_util.rs @@ -18,7 +18,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef}; use common_meta::state_store::KvStateStore; -use common_meta::wal_options_allocator::build_kafka_client; +use common_meta::wal_provider::build_kafka_client; use common_procedure::ProcedureManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::test_util::InMemoryPoisonStore; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index d146785e8c..1e977fcd8e 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -39,7 +39,7 @@ use crate::service::admin::maintenance::MaintenanceHandler; use crate::service::admin::node_lease::NodeLeaseHandler; use crate::service::admin::procedure::ProcedureManagerHandler; use crate::service::admin::recovery::RecoveryHandler; -use crate::service::admin::sequencer::TableIdSequenceHandler; +use crate::service::admin::sequencer::TableIdAllocatorHandler; /// Expose admin http service on rpc port(3002). /// @@ -249,8 +249,8 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { let recovery_handler = RecoveryHandler { manager: metasrv.runtime_switch_manager().clone(), }; - let table_id_sequence_handler = TableIdSequenceHandler { - table_id_sequence: metasrv.table_id_sequence().clone(), + let table_id_allocator_handler = TableIdAllocatorHandler { + table_id_allocator: metasrv.table_id_allocator().clone(), runtime_switch_manager: metasrv.runtime_switch_manager().clone(), }; @@ -303,7 +303,7 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { AxumRouter::new() .route("/next-id", routing::get(sequencer::get_next_table_id)) .route("/set-next-id", routing::post(sequencer::set_next_table_id)) - .with_state(table_id_sequence_handler.clone()), + .with_state(table_id_allocator_handler.clone()), ), ); diff --git a/src/meta-srv/src/service/admin/sequencer.rs b/src/meta-srv/src/service/admin/sequencer.rs index 0ee22492c2..ad1d948e91 100644 --- a/src/meta-srv/src/service/admin/sequencer.rs +++ b/src/meta-srv/src/service/admin/sequencer.rs @@ -16,8 +16,8 @@ use axum::Json; use axum::extract::{self, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; -use common_meta::sequence::SequenceRef; use serde::{Deserialize, Serialize}; use servers::http::result::error_result::ErrorResponse; use snafu::{ResultExt, ensure}; @@ -27,12 +27,12 @@ use crate::error::{ }; #[derive(Clone)] -pub(crate) struct TableIdSequenceHandler { - pub(crate) table_id_sequence: SequenceRef, +pub(crate) struct TableIdAllocatorHandler { + pub(crate) table_id_allocator: ResourceIdAllocatorRef, pub(crate) runtime_switch_manager: RuntimeSwitchManagerRef, } -impl TableIdSequenceHandler { +impl TableIdAllocatorHandler { async fn set_next_table_id(&self, next_table_id: u32) -> Result<()> { ensure!( self.runtime_switch_manager @@ -44,7 +44,7 @@ impl TableIdSequenceHandler { } ); - self.table_id_sequence + self.table_id_allocator .jump_to(next_table_id as u64) .await .context(SetNextSequenceSnafu) @@ -52,7 +52,7 @@ impl TableIdSequenceHandler { async fn peek_table_id(&self) -> Result { let next_table_id = self - .table_id_sequence + .table_id_allocator .peek() .await .context(PeekSequenceSnafu)?; @@ -73,7 +73,7 @@ pub(crate) struct ResetTableIdRequest { /// Set the next table id. #[axum_macros::debug_handler] pub(crate) async fn set_next_table_id( - State(handler): State, + State(handler): State, extract::Json(ResetTableIdRequest { next_table_id }): extract::Json, ) -> Response { match handler.set_next_table_id(next_table_id).await { @@ -84,7 +84,7 @@ pub(crate) async fn set_next_table_id( /// Get the next table id without incrementing the sequence. #[axum_macros::debug_handler] -pub(crate) async fn get_next_table_id(State(handler): State) -> Response { +pub(crate) async fn get_next_table_id(State(handler): State) -> Response { match handler.peek_table_id().await { Ok(next_table_id) => { (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b6e12903e9..0c42c2f3da 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -38,7 +38,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; -use common_meta::wal_options_allocator::build_wal_options_allocator; +use common_meta::wal_provider::build_wal_provider; use common_procedure::ProcedureManagerRef; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::SlowQueryOptions; @@ -190,7 +190,7 @@ impl GreptimeDbStandaloneBuilder { flow_server: flownode.flow_engine(), }); - let table_id_sequence = Arc::new( + let table_id_allocator = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) @@ -203,13 +203,13 @@ impl GreptimeDbStandaloneBuilder { .build(), ); let kafka_options = opts.wal.clone().try_into().unwrap(); - let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone()) + let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone()) .await .unwrap(); - let wal_options_allocator = Arc::new(wal_options_allocator); + let wal_provider = Arc::new(wal_provider); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( - table_id_sequence, - wal_options_allocator.clone(), + table_id_allocator, + wal_provider.clone(), )); let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( flow_id_sequence, @@ -278,7 +278,7 @@ impl GreptimeDbStandaloneBuilder { flow_streaming_engine.set_frontend_invoker(invoker).await; procedure_manager.start().await.unwrap(); - wal_options_allocator.start().await.unwrap(); + wal_provider.start().await.unwrap(); test_util::prepare_another_catalog_and_schema(&instance).await; diff --git a/tests-integration/src/tests/reconcile_table.rs b/tests-integration/src/tests/reconcile_table.rs index acbfcf6079..c2feecf30e 100644 --- a/tests-integration/src/tests/reconcile_table.rs +++ b/tests-integration/src/tests/reconcile_table.rs @@ -437,7 +437,7 @@ async fn test_set_table_id() { let metasrv = test_context.metasrv(); // Due to the table id 1024 already allocated, we need to jump to 1025. - metasrv.table_id_sequence().jump_to(1025).await.unwrap(); + metasrv.table_id_allocator().jump_to(1025).await.unwrap(); // We should able to create table now. let output = execute_sql(&test_context.frontend(), CREATE_MONITOR_TABLE_SQL).await;