feat(repartition): implement region allocation for repartition procedure (#7534)

* refactor: rename WalOptionsAllocator to WalProvider

The name "WalOptionsAllocator" was misleading because:
- For RaftEngine variant, it doesn't actually allocate anything
- The actual allocation logic lives in KafkaTopicPool

"WalProvider" better describes its role as providing WAL options
based on the configured WAL backend (RaftEngine or Kafka).

Changes:
- Rename `WalOptionsAllocator` to `WalProvider`
- Rename `WalOptionsAllocatorRef` to `WalProviderRef`
- Rename `build_wal_options_allocator` to `build_wal_provider`
- Rename module `wal_options_allocator` to `wal_provider`
- Rename error types: `BuildWalOptionsAllocator` -> `BuildWalProvider`,
  `StartWalOptionsAllocator` -> `StartWalProvider`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): extract allocator traits from TableMetadataAllocator

Refactor TableMetadataAllocator to use trait-based dependency injection
for better testability and separation of concerns.

Changes:
- Add `ResourceIdAllocator` trait to abstract ID allocation
- Add `WalOptionsAllocator` trait to abstract WAL options allocation
- Implement traits for `Sequence` and `WalProvider`
- Remove duplicate `allocate_region_wal_options` function
- Rename `table_id_sequence` to `table_id_allocator` for consistency
- Rename `TableIdSequenceHandler` to `TableIdAllocatorHandler`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): add max_region_number tracking to PhysicalTableRouteValue

Add `max_region_number` field to track the highest region number ever
allocated for a table. This value only increases when regions are added
and never decreases when regions are dropped, ensuring unique region
numbers across the table's lifetime.

Changes:
- Add `max_region_number` field to `PhysicalTableRouteValue`
- Implement custom `Deserialize` for backward compatibility
- Update `update_region_routes` to maintain max_region_number
- Calculate max_region_number from region_routes in `new()`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: extract TableRouteAllocator trait from TableMetadataAllocator

- Add TableRouteAllocator trait for abstracting region route allocation
- Implement blanket impl for all PeerAllocator types
- Add PeerAllocator impl for Arc<T> to support trait object delegation
- Update TableMetadataAllocator to use TableRouteAllocatorRef

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename TableRouteAllocator to RegionRoutesAllocator

- Rename table_route.rs to region_routes.rs
- Rename TableRouteAllocator trait to RegionRoutesAllocator
- Rename wal_option.rs to wal_options.rs for consistency
- Update TableMetadataAllocator to use new naming

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): implement region allocation for repartition procedure

This commit implements the region allocation phase of the repartition procedure,
which handles allocating new regions when a table needs to be split into more partitions.

Key changes:
- Refactor `RegionRoutesAllocator::allocate` to accept `(region_number, partition_expr)` tuples
  for more flexible region number assignment
- Simplify `AllocationPlanEntry` by removing `regions_to_allocate` and `regions_to_deallocate`
  fields (now derived from source/target counts)
- Add `convert_allocation_plan_to_repartition_plan` function to handle allocation, equal,
  and deallocation cases
- Fix `RepartitionPlanEntry::allocate_regions()` to return target regions (was incorrectly
  returning source regions)
- Implement complete `AllocateRegion` state with:
  - Region route allocation via `RegionRoutesAllocator`
  - WAL options allocation via `WalOptionsAllocator`
  - Operating region registration for concurrency control
  - Region creation on datanodes via `CreateTableExecutor`
  - Table route metadata update
- Add `TableRouteValue::max_region_number()` helper method
- Add comprehensive unit tests for plan conversion and allocation logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-08 19:03:58 +08:00
committed by GitHub
parent 06f9a4c80c
commit f3e2d333e4
41 changed files with 1442 additions and 285 deletions

View File

@@ -68,8 +68,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -343,7 +343,7 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::StartWalProvider { source, .. } => source.status_code(),
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -64,8 +64,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -289,8 +289,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(display("Failed to build wal provider"))]
BuildWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -350,8 +350,9 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::BuildWalOptionsAllocator { source, .. }
| Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::BuildWalProvider { source, .. } | Error::StartWalProvider { source, .. } => {
source.status_code()
}
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -40,7 +40,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
use common_telemetry::info;
@@ -120,7 +120,7 @@ pub struct Instance {
frontend: Frontend,
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
wal_provider: WalProviderRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -146,10 +146,10 @@ impl App for Instance {
.await
.context(error::StartProcedureManagerSnafu)?;
self.wal_options_allocator
self.wal_provider
.start()
.await
.context(error::StartWalOptionsAllocatorSnafu)?;
.context(error::StartWalProviderSnafu)?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
@@ -468,7 +468,7 @@ impl StartCommand {
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
let table_id_allocator = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
@@ -485,13 +485,13 @@ impl StartCommand {
.clone()
.try_into()
.context(error::InvalidWalProviderSnafu)?;
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
.context(error::BuildWalProviderSnafu)?;
let wal_provider = Arc::new(wal_provider);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_id_allocator,
wal_provider.clone(),
));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
@@ -585,7 +585,7 @@ impl StartCommand {
frontend,
flownode,
procedure_manager,
wal_options_allocator,
wal_provider,
_guard: guard,
})
}

View File

@@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
pub mod allocator;
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod region_routes;
pub mod resource_id;
pub mod wal_options;

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::debug;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::peer::PeerAllocator;
use crate::rpc::router::{Region, RegionRoute};
pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
#[async_trait::async_trait]
pub trait RegionRoutesAllocator: Send + Sync {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>>;
}
#[async_trait::async_trait]
impl<T: PeerAllocator> RegionRoutesAllocator for T {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>> {
let regions = regions_and_partitions.len().max(1);
let peers = self.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id,);
let mut region_routes = regions_and_partitions
.iter()
.enumerate()
.map(|(i, (region_number, partition))| {
let region = Region {
id: RegionId::new(table_id, *region_number),
partition_expr: partition.to_string(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
Ok(region_routes)
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use crate::error::Result;
pub type ResourceIdAllocatorRef = Arc<dyn ResourceIdAllocator>;
#[async_trait::async_trait]
pub trait ResourceIdAllocator: Send + Sync {
/// Returns the next value and increments the sequence.
async fn next(&self) -> Result<u64>;
/// Returns the current value stored in the remote storage without incrementing the sequence.
async fn peek(&self) -> Result<u64>;
/// Jumps to the given value.
async fn jump_to(&self, next: u64) -> Result<()>;
/// Returns the range of available sequences.
async fn min_max(&self) -> Range<u64>;
}

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use store_api::storage::RegionNumber;
use crate::error::Result;
pub type WalOptionsAllocatorRef = Arc<dyn WalOptionsAllocator>;
#[async_trait::async_trait]
pub trait WalOptionsAllocator: Send + Sync {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>>;
}

View File

@@ -97,7 +97,7 @@ pub fn create_region_request_builder(
/// Builds a [CreateRequestBuilder] from a [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod executor;
pub(crate) mod template;
pub mod executor;
pub mod template;
use std::collections::HashMap;

View File

@@ -20,19 +20,17 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::wal_options_allocator::prepare_wal_options;
use crate::wal_provider::prepare_wal_options;
/// Builds a [CreateRequest] from a [RawTableInfo].
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
pub(crate) fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
) -> Result<CreateRequest> {
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
.meta
@@ -57,7 +55,7 @@ pub(crate) fn build_template_from_raw_table_info(
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: METRIC_ENGINE_NAME.to_string(),
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
path: String::new(),
@@ -138,7 +136,7 @@ pub struct CreateRequestBuilder {
}
impl CreateRequestBuilder {
pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
Self {
template,
physical_table_id,

View File

@@ -17,47 +17,47 @@ use std::sync::Arc;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};
use store_api::storage::{RegionNumber, TableId};
use crate::ddl::TableMetadata;
use crate::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options};
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
#[derive(Clone)]
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
region_routes_allocator: RegionRoutesAllocatorRef,
}
impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
table_id_allocator,
wal_options_allocator,
Arc::new(NoopPeerAllocator),
)
}
pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
table_id_allocator,
wal_options_allocator,
peer_allocator,
region_routes_allocator: Arc::new(peer_allocator) as _,
}
}
@@ -70,7 +70,7 @@ impl TableMetadataAllocator {
ensure!(
!self
.table_id_sequence
.table_id_allocator
.min_max()
.await
.contains(&(table_id as u64)),
@@ -89,65 +89,35 @@ impl TableMetadataAllocator {
table_id
} else {
self.table_id_sequence.next().await? as TableId
self.table_id_allocator.next().await? as TableId
};
Ok(table_id)
}
fn create_wal_options(
async fn create_wal_options(
&self,
table_route: &PhysicalTableRouteValue,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
self.wal_options_allocator
.allocate(region_numbers, skip_wal)
.await
}
async fn create_table_route(
&self,
table_id: TableId,
task: &CreateTableTask,
partition_exprs: &[&str],
) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len().max(1);
let peers = self.peer_allocator.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id);
let mut region_routes = task
.partitions
let region_number_and_partition_exprs = partition_exprs
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition_expr: partition.expression.clone(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.map(|(i, partition)| (i as u32, *partition))
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
let region_routes = self
.region_routes_allocator
.allocate(table_id, &region_number_and_partition_exprs)
.await?;
Ok(PhysicalTableRouteValue::new(region_routes))
}
@@ -164,10 +134,20 @@ impl TableMetadataAllocator {
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(table_id, task).await?;
let region_wal_options =
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
let partition_exprs = task
.partitions
.iter()
.map(|p| p.expression.as_str())
.collect::<Vec<_>>();
let table_route = self.create_table_route(table_id, &partition_exprs).await?;
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect::<Vec<_>>();
let region_wal_options = self
.create_wal_options(&region_numbers, task.table_info.meta.options.skip_wal)
.await?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -181,7 +161,7 @@ impl TableMetadataAllocator {
})
}
pub fn table_id_sequence(&self) -> SequenceRef {
self.table_id_sequence.clone()
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
self.table_id_allocator.clone()
}
}

View File

@@ -968,7 +968,7 @@ mod tests {
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::wal_provider::WalProvider;
/// A dummy implemented [NodeManager].
pub struct DummyDatanodeManager;
@@ -993,7 +993,7 @@ mod tests {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(

View File

@@ -1474,6 +1474,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
@@ -1490,7 +1491,7 @@ mod tests {
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::RangeRequest;
use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
use crate::wal_provider::WalProvider;
#[test]
fn test_deserialized_value_with_bytes() {
@@ -1600,10 +1601,9 @@ mod tests {
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options =
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
let wal_provider = WalProvider::RaftEngine;
let regions: Vec<_> = (0..16).collect();
let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
@@ -62,12 +62,51 @@ pub enum TableRouteValue {
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
#[derive(Debug, PartialEq, Serialize, Clone, Default)]
pub struct PhysicalTableRouteValue {
// The region routes of the table.
pub region_routes: Vec<RegionRoute>,
// Tracks the highest region number ever allocated for the table.
// This value only increases: adding a region updates it if needed,
// and dropping regions does not decrease it.
pub max_region_number: RegionNumber,
// The version of the table route.
version: u64,
}
impl<'de> Deserialize<'de> for PhysicalTableRouteValue {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
region_routes: Vec<RegionRoute>,
#[serde(default)]
max_region_number: Option<RegionNumber>,
version: u64,
}
let mut helper = Helper::deserialize(deserializer)?;
// If the max region number is not provided, we will calculate it from the region routes.
if helper.max_region_number.is_none() {
let max_region = helper
.region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
helper.max_region_number = Some(max_region);
}
Ok(PhysicalTableRouteValue {
region_routes: helper.region_routes,
max_region_number: helper.max_region_number.unwrap_or_default(),
version: helper.version,
})
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
physical_table_id: TableId,
@@ -104,9 +143,19 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
let version = self.as_physical_table_route_ref().version;
let physical_table_route = self.as_physical_table_route_ref();
let original_max_region_number = physical_table_route.max_region_number;
let new_max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
let version = physical_table_route.version;
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
// If region routes are added, we will update the max region number.
// If region routes are removed, we will keep the original max region number.
max_region_number: original_max_region_number.max(new_max_region_number),
version: version + 1,
}))
}
@@ -159,6 +208,20 @@ impl TableRouteValue {
Ok(&self.as_physical_table_route_ref().region_routes)
}
/// Returns the max region number of this [TableRouteValue::Physical].
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
pub fn max_region_number(&self) -> Result<RegionNumber> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
Ok(self.as_physical_table_route_ref().max_region_number)
}
/// Returns the reference of [`PhysicalTableRouteValue`].
///
/// # Panic
@@ -227,8 +290,14 @@ impl MetadataValue for TableRouteValue {
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
let max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
Self {
region_routes,
max_region_number,
version: 0,
}
}
@@ -806,6 +875,57 @@ mod tests {
use crate::rpc::router::Region;
use crate::rpc::store::PutRequest;
#[test]
fn test_update_table_route_max_region_number() {
let table_route = PhysicalTableRouteValue::new(vec![
RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(0, 2),
..Default::default()
},
..Default::default()
},
]);
assert_eq!(table_route.max_region_number, 2);
// Shouldn't change the max region number.
let new_table_route = TableRouteValue::Physical(table_route)
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
}])
.unwrap();
assert_eq!(
new_table_route
.as_physical_table_route_ref()
.max_region_number,
2
);
// Should increase the max region number.
let new_table_route = new_table_route
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 3),
..Default::default()
},
..Default::default()
}])
.unwrap()
.into_physical_table_route();
assert_eq!(new_table_route.max_region_number, 3);
}
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
@@ -846,6 +966,7 @@ mod tests {
leader_down_since: None,
},
],
max_region_number: 1,
version: 0,
});
@@ -956,6 +1077,7 @@ mod tests {
}],
..Default::default()
}],
max_region_number: 0,
version: 0,
});

View File

@@ -48,7 +48,7 @@ pub mod stats;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod util;
pub mod wal_options_allocator;
pub mod wal_provider;
// The id of the datanode.
pub type DatanodeId = u64;

View File

@@ -81,6 +81,13 @@ pub trait PeerAllocator: Send + Sync {
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
#[async_trait::async_trait]
impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
T::alloc(self, num).await
}
}
pub struct NoopPeerAllocator;
#[async_trait::async_trait]

View File

@@ -144,6 +144,8 @@ impl ReconcileRegions {
}
/// Creates a region request builder from a raw table info.
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -19,6 +19,7 @@ use common_telemetry::{debug, warn};
use snafu::ensure;
use tokio::sync::Mutex;
use crate::ddl::allocator::resource_id::ResourceIdAllocator;
use crate::error::{self, Result};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::CompareAndPutRequest;
@@ -82,6 +83,25 @@ pub struct Sequence {
inner: Mutex<Inner>,
}
#[async_trait::async_trait]
impl ResourceIdAllocator for Sequence {
async fn next(&self) -> Result<u64> {
self.next().await
}
async fn peek(&self) -> Result<u64> {
self.peek().await
}
async fn jump_to(&self, next: u64) -> Result<()> {
self.jump_to(next).await
}
async fn min_max(&self) -> Range<u64> {
self.min_max().await
}
}
impl Sequence {
/// Returns the next value and increments the sequence.
pub async fn next(&self) -> Result<u64> {

View File

@@ -40,8 +40,8 @@ use crate::peer::{Peer, PeerResolver};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
use crate::wal_provider::{WalProvider, build_kafka_topic_creator};
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -187,7 +187,7 @@ pub fn new_ddl_context_with_kv_backend(
.initial(1024)
.build(),
),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator =

View File

@@ -26,28 +26,46 @@ use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use snafu::{ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::TOPIC_NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::{
build_kafka_client, build_kafka_topic_creator,
};
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
/// Provides wal options in region granularity.
#[derive(Default, Debug)]
pub enum WalOptionsAllocator {
pub enum WalProvider {
#[default]
RaftEngine,
Kafka(KafkaTopicPool),
}
/// Arc wrapper of WalOptionsAllocator.
pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
/// Arc wrapper of WalProvider.
pub type WalProviderRef = Arc<WalProvider>;
impl WalOptionsAllocator {
/// Tries to start the allocator.
#[async_trait::async_trait]
impl WalOptionsAllocator for WalProvider {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = self
.alloc_batch(region_numbers.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(region_numbers.iter().copied().zip(wal_options).collect())
}
}
impl WalProvider {
/// Tries to start the provider.
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
@@ -56,14 +74,14 @@ impl WalOptionsAllocator {
}
/// Allocates a batch of wal options where each wal options goes to a region.
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
/// If skip_wal is true, the wal options will be set to Noop regardless of the provider type.
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
if skip_wal {
return Ok(vec![WalOptions::Noop; num_regions]);
}
match self {
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalProvider::Kafka(topic_manager) => {
let options_batch = topic_manager
.select_batch(num_regions)?
.into_iter()
@@ -80,14 +98,14 @@ impl WalOptionsAllocator {
/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
matches!(&self, WalProvider::Kafka(_))
}
}
#[async_trait]
impl LeadershipChangeListener for WalOptionsAllocator {
impl LeadershipChangeListener for WalProvider {
fn name(&self) -> &str {
"WalOptionsAllocator"
"WalProvider"
}
async fn on_leader_start(&self) -> Result<()> {
@@ -99,13 +117,13 @@ impl LeadershipChangeListener for WalOptionsAllocator {
}
}
/// Builds a wal options allocator based on the given configuration.
pub async fn build_wal_options_allocator(
/// Builds a wal provider based on the given configuration.
pub async fn build_wal_provider(
config: &MetasrvWalConfig,
kv_backend: KvBackendRef,
) -> Result<WalOptionsAllocator> {
) -> Result<WalProvider> {
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
ensure!(
@@ -116,28 +134,11 @@ pub async fn build_wal_options_allocator(
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
Ok(WalProvider::Kafka(topic_pool))
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Inserts wal options into options.
pub fn prepare_wal_options(
options: &mut HashMap<String, String>,
@@ -182,21 +183,19 @@ mod tests {
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
use crate::wal_provider::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
// Tests that the wal provider could successfully allocate raft-engine wal options.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
async fn test_provider_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetasrvWalConfig::RaftEngine;
let allocator = build_wal_options_allocator(&wal_config, kv_backend)
.await
.unwrap();
allocator.start().await.unwrap();
let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
@@ -216,14 +215,14 @@ mod tests {
},
..Default::default()
});
let got = build_wal_options_allocator(&wal_config, kv_backend)
let got = build_wal_provider(&wal_config, kv_backend)
.await
.unwrap_err();
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
#[tokio::test]
async fn test_allocator_with_kafka_allocate_wal_options() {
async fn test_provider_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
@@ -240,13 +239,13 @@ mod tests {
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
// Creates an options provider.
let provider = WalProvider::Kafka(topic_pool);
provider.start().await.unwrap();
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
@@ -261,13 +260,13 @@ mod tests {
}
#[tokio::test]
async fn test_allocator_with_skip_wal() {
let allocator = WalOptionsAllocator::RaftEngine;
allocator.start().await.unwrap();
async fn test_provider_with_skip_wal() {
let provider = WalProvider::RaftEngine;
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
let got = provider.allocate(&regions, true).await.unwrap();
assert_eq!(got.len(), num_regions as usize);
for wal_options in got.values() {
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");

View File

@@ -22,9 +22,9 @@ use snafu::ensure;
use crate::error::{InvalidNumTopicsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal_options_allocator::topic_creator::KafkaTopicCreator;
use crate::wal_options_allocator::topic_manager::KafkaTopicManager;
use crate::wal_provider::selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal_provider::topic_creator::KafkaTopicCreator;
use crate::wal_provider::topic_manager::KafkaTopicManager;
/// Topic pool for kafka remote wal.
/// Responsible for:
@@ -144,7 +144,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
use crate::wal_provider::selector::RoundRobinTopicSelector;
#[tokio::test]
async fn test_pool_invalid_number_topics_err() {

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options;
use common_meta::wal_provider::prepare_wal_options;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::storage::RegionId;

View File

@@ -18,7 +18,7 @@ use common_meta::DatanodeId;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
use common_meta::wal_provider::{extract_topic_from_wal_options, prepare_wal_options};
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::path_utils::table_dir;

View File

@@ -936,8 +936,8 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(display("Failed to build wal provider"))]
BuildWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -1060,6 +1060,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to allocate regions for table: {}", table_id))]
AllocateRegions {
#[snafu(implicit)]
location: Location,
table_id: TableId,
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display("Failed to deallocate regions for table: {}", table_id))]
DeallocateRegions {
#[snafu(implicit)]
@@ -1068,6 +1077,33 @@ pub enum Error {
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display("Failed to build create request for table: {}", table_id))]
BuildCreateRequest {
#[snafu(implicit)]
location: Location,
table_id: TableId,
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display("Failed to allocate region routes for table: {}", table_id))]
AllocateRegionRoutes {
#[snafu(implicit)]
location: Location,
table_id: TableId,
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display("Failed to allocate wal options for table: {}", table_id))]
AllocateWalOptions {
#[snafu(implicit)]
location: Location,
table_id: TableId,
#[snafu(source)]
source: common_meta::error::Error,
},
}
impl Error {
@@ -1113,7 +1149,7 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::ChooseItems { .. }
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildWalProvider { .. }
| Error::BuildPartitionClient { .. }
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
@@ -1215,7 +1251,11 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
Error::AllocateRegions { source, .. } => source.status_code(),
Error::DeallocateRegions { source, .. } => source.status_code(),
Error::AllocateRegionRoutes { source, .. } => source.status_code(),
Error::AllocateWalOptions { source, .. } => source.status_code(),
Error::BuildCreateRequest { source, .. } => source.status_code(),
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
#[cfg(feature = "pg_kvbackend")]

View File

@@ -27,6 +27,7 @@ use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants::{
self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
@@ -42,9 +43,8 @@ use common_meta::peer::{Peer, PeerDiscoveryRef};
use common_meta::reconciliation::manager::ReconciliationManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::sequence::SequenceRef;
use common_meta::stats::topic::TopicStatsRegistryRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_meta::wal_provider::WalProviderRef;
use common_options::datanode::DatanodeClientOptions;
use common_options::memory::MemoryOptions;
use common_procedure::ProcedureManagerRef;
@@ -624,7 +624,7 @@ pub struct Metasrv {
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
ddl_manager: DdlManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
wal_provider: WalProviderRef,
table_metadata_manager: TableMetadataManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
@@ -636,7 +636,7 @@ pub struct Metasrv {
topic_stats_registry: TopicStatsRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
region_flush_ticker: Option<RegionFlushTickerRef>,
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
reconciliation_manager: ReconciliationManagerRef,
resource_stat: ResourceStatRef,
gc_ticker: Option<GcTickerRef>,
@@ -684,7 +684,7 @@ impl Metasrv {
// Builds leadership change notifier.
let mut leadership_change_notifier = LeadershipChangeNotifier::default();
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier.add_listener(self.wal_provider.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
@@ -782,8 +782,8 @@ impl Metasrv {
"Ensure only one instance of Metasrv is running, as there is no election service."
);
if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
if let Err(e) = self.wal_provider.start().await {
error!(e; "Failed to start wal provider");
}
// Always load kv into cached kv store.
self.leader_cached_kv_backend
@@ -931,8 +931,8 @@ impl Metasrv {
self.plugins.get::<SubscriptionManagerRef>()
}
pub fn table_id_sequence(&self) -> &SequenceRef {
&self.table_id_sequence
pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
&self.table_id_allocator
}
pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {

View File

@@ -43,7 +43,7 @@ use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::stats::topic::TopicStatsRegistry;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_meta::wal_provider::{build_kafka_client, build_wal_provider};
use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_stat::ResourceStatImpl;
@@ -54,7 +54,7 @@ use store_api::storage::MAX_REGION_SEQ;
use crate::bootstrap::build_default_meta_peer_client;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result};
use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::gc::GcScheduler;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
@@ -241,11 +241,11 @@ impl MetasrvBuilder {
peer_discovery: meta_peer_client.clone(),
};
let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
let wal_provider = build_wal_provider(&options.wal, kv_backend.clone())
.await
.context(BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let is_remote_wal = wal_options_allocator.is_remote_wal();
.context(BuildWalProviderSnafu)?;
let wal_provider = Arc::new(wal_provider);
let is_remote_wal = wal_provider.is_remote_wal();
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -259,11 +259,11 @@ impl MetasrvBuilder {
);
Arc::new(TableMetadataAllocator::with_peer_allocator(
sequence,
wal_options_allocator.clone(),
wal_provider.clone(),
peer_allocator,
))
});
let table_id_sequence = table_metadata_allocator.table_id_sequence();
let table_id_allocator = table_metadata_allocator.table_id_allocator();
let flow_selector =
Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
@@ -568,7 +568,7 @@ impl MetasrvBuilder {
procedure_manager,
mailbox,
ddl_manager,
wal_options_allocator,
wal_provider,
table_metadata_manager,
runtime_switch_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
@@ -585,7 +585,7 @@ impl MetasrvBuilder {
leader_region_registry,
wal_prune_ticker,
region_flush_ticker,
table_id_sequence,
table_id_allocator,
reconciliation_manager,
topic_stats_registry,
resource_stat: Arc::new(resource_stat),

View File

@@ -23,7 +23,7 @@ use common_meta::instruction::{
};
use common_meta::key::topic_region::TopicRegionKey;
use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_meta::wal_provider::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, info};
use common_wal::options::WalOptions;

View File

@@ -27,12 +27,15 @@ use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::RegionInfo;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
@@ -57,13 +60,8 @@ pub struct PersistentContext {
pub plans: Vec<RepartitionPlanEntry>,
}
pub struct VolatileContext {
pub allocating_regions: Vec<OperatingRegionGuard>,
}
pub struct Context {
pub persistent_ctx: PersistentContext,
pub volatile_ctx: VolatileContext,
pub table_metadata_manager: TableMetadataManagerRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
pub node_manager: NodeManagerRef,
@@ -71,6 +69,8 @@ pub struct Context {
pub mailbox: MailboxRef,
pub server_addr: String,
pub cache_invalidator: CacheInvalidatorRef,
pub region_routes_allocator: RegionRoutesAllocatorRef,
pub wal_options_allocator: WalOptionsAllocatorRef,
}
impl Context {
@@ -100,6 +100,29 @@ impl Context {
Ok(table_route_value)
}
/// Retrieves the table info value for the given table id.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
///
/// Abort:
/// - Table info not found.
pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
let table_id = self.persistent_ctx.table_id;
let table_info_value = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get table info for table: {}", table_id),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner();
Ok(table_info_value)
}
/// Updates the table route.
///
/// Retry:

View File

@@ -13,13 +13,30 @@
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use common_meta::ddl::create_table::executor::CreateTableExecutor;
use common_meta::ddl::create_table::template::{
CreateRequestBuilder, build_template_from_raw_table_info,
};
use common_meta::lock_key::TableLock;
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;
use table::table_reference::TableReference;
use crate::error::Result;
use crate::error::{self, Result};
use crate::procedure::repartition::dispatch::Dispatch;
use crate::procedure::repartition::plan::{AllocationPlanEntry, RepartitionPlanEntry};
use crate::procedure::repartition::plan::{
AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
convert_allocation_plan_to_repartition_plan,
};
use crate::procedure::repartition::{Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -27,41 +44,423 @@ pub struct AllocateRegion {
plan_entries: Vec<AllocationPlanEntry>,
}
impl AllocateRegion {
pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
Self { plan_entries }
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for AllocateRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let region_to_allocate = self
.plan_entries
.iter()
.map(|p| p.regions_to_allocate)
.sum::<usize>();
let table_id = ctx.persistent_ctx.table_id;
let table_route_value = ctx.get_table_route_value().await?;
// Safety: it is physical table route value.
let region_routes = table_route_value.region_routes().unwrap();
let mut next_region_number =
Self::get_next_region_number(table_route_value.max_region_number().unwrap());
if region_to_allocate == 0 {
let repartition_plan_entries = self
.plan_entries
.iter()
.map(RepartitionPlanEntry::from_allocation_plan_entry)
.collect::<Vec<_>>();
// Converts allocation plan to repartition plan.
let repartition_plan_entries = Self::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&self.plan_entries,
);
// If no region to allocate, directly dispatch the plan.
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
ctx.persistent_ctx.plans = repartition_plan_entries;
return Ok((Box::new(Dispatch), Status::executing(true)));
}
// TODO(weny): allocate regions.
todo!()
let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries);
let region_number_and_partition_exprs =
Self::prepare_region_allocation_data(&allocate_regions)?;
let table_info_value = ctx.get_table_info_value().await?;
let new_allocated_region_routes = ctx
.region_routes_allocator
.allocate(
table_id,
&region_number_and_partition_exprs
.iter()
.map(|(n, p)| (*n, p.as_str()))
.collect::<Vec<_>>(),
)
.await
.context(error::AllocateRegionRoutesSnafu { table_id })?;
let wal_options = ctx
.wal_options_allocator
.allocate(
&allocate_regions
.iter()
.map(|r| r.region_id.region_number())
.collect::<Vec<_>>(),
table_info_value.table_info.meta.options.skip_wal,
)
.await
.context(error::AllocateWalOptionsSnafu { table_id })?;
let _operating_guards = Self::register_operating_regions(
&ctx.memory_region_keeper,
&new_allocated_region_routes,
)?;
// Allocates the regions on datanodes.
Self::allocate_regions(
&ctx.node_manager,
&table_info_value.table_info,
&new_allocated_region_routes,
&wal_options,
)
.await?;
// TODO(weny): for metric engine, sync logical regions from the the central region.
// Updates the table routes.
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let new_region_routes =
Self::generate_region_routes(region_routes, &new_allocated_region_routes);
ctx.update_table_route(&table_route_value, new_region_routes)
.await?;
ctx.invalidate_table_cache().await?;
ctx.persistent_ctx.plans = repartition_plan_entries;
Ok((Box::new(Dispatch), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl AllocateRegion {
pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
Self { plan_entries }
}
#[allow(dead_code)]
fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let mut operating_guards = Vec::with_capacity(region_routes.len());
for (region_id, datanode_id) in operating_leader_regions(region_routes) {
let guard = memory_region_keeper
.register(datanode_id, region_id)
.context(error::RegionOperatingRaceSnafu {
peer_id: datanode_id,
region_id,
})?;
operating_guards.push(guard);
}
Ok(operating_guards)
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
new_allocated_region_ids: &[RegionRoute],
) -> Vec<RegionRoute> {
let region_ids = region_routes
.iter()
.map(|r| r.region.id)
.collect::<HashSet<_>>();
let mut new_region_routes = region_routes.to_vec();
for new_allocated_region_id in new_allocated_region_ids {
if !region_ids.contains(&new_allocated_region_id.region.id) {
new_region_routes.push(new_allocated_region_id.clone());
}
}
new_region_routes
}
/// Converts allocation plan entries to repartition plan entries.
///
/// This method takes the allocation plan entries and converts them to repartition plan entries,
/// updating `next_region_number` for each newly allocated region.
#[allow(dead_code)]
fn convert_to_repartition_plans(
table_id: TableId,
next_region_number: &mut RegionNumber,
plan_entries: &[AllocationPlanEntry],
) -> Vec<RepartitionPlanEntry> {
plan_entries
.iter()
.map(|plan_entry| {
convert_allocation_plan_to_repartition_plan(
table_id,
next_region_number,
plan_entry,
)
})
.collect()
}
/// Collects all regions that need to be allocated from the repartition plan entries.
#[allow(dead_code)]
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
) -> Vec<&RegionDescriptor> {
repartition_plan_entries
.iter()
.flat_map(|p| p.allocate_regions())
.collect()
}
/// Prepares region allocation data: region numbers and their partition expressions.
#[allow(dead_code)]
fn prepare_region_allocation_data(
allocate_regions: &[&RegionDescriptor],
) -> Result<Vec<(RegionNumber, String)>> {
allocate_regions
.iter()
.map(|r| {
Ok((
r.region_id.region_number(),
r.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?,
))
})
.collect()
}
/// Calculates the total number of regions that need to be allocated.
#[allow(dead_code)]
fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
repartition_plan_entries
.iter()
.map(|p| p.allocated_region_ids.len())
.sum()
}
/// Gets the next region number from the physical table route.
#[allow(dead_code)]
fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
max_region_number + 1
}
#[allow(dead_code)]
async fn allocate_regions(
node_manager: &NodeManagerRef,
raw_table_info: &RawTableInfo,
region_routes: &[RegionRoute],
wal_options: &HashMap<RegionNumber, String>,
) -> Result<()> {
let table_ref = TableReference::full(
&raw_table_info.catalog_name,
&raw_table_info.schema_name,
&raw_table_info.name,
);
let table_id = raw_table_info.ident.table_id;
let request = build_template_from_raw_table_info(raw_table_info)
.context(error::BuildCreateRequestSnafu { table_id })?;
let builder = CreateRequestBuilder::new(request, None);
info!(
"Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}",
table_id, region_routes, wal_options
);
let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
executor
.on_create_regions(node_manager, table_id, region_routes, wal_options)
.await
.context(error::AllocateRegionsSnafu { table_id })?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use store_api::storage::RegionId;
use uuid::Uuid;
use super::*;
use crate::procedure::repartition::test_util::range_expr;
fn create_region_descriptor(
table_id: TableId,
region_number: u32,
col: &str,
start: i64,
end: i64,
) -> RegionDescriptor {
RegionDescriptor {
region_id: RegionId::new(table_id, region_number),
partition_expr: range_expr(col, start, end),
}
}
fn create_allocation_plan_entry(
table_id: TableId,
source_region_numbers: &[u32],
target_ranges: &[(i64, i64)],
) -> AllocationPlanEntry {
let source_regions = source_region_numbers
.iter()
.enumerate()
.map(|(i, &n)| {
let start = i as i64 * 100;
let end = (i + 1) as i64 * 100;
create_region_descriptor(table_id, n, "x", start, end)
})
.collect();
let target_partition_exprs = target_ranges
.iter()
.map(|&(start, end)| range_expr("x", start, end))
.collect();
AllocationPlanEntry {
group_id: Uuid::new_v4(),
source_regions,
target_partition_exprs,
transition_map: vec![],
}
}
#[test]
fn test_convert_to_repartition_plans_no_allocation() {
let table_id = 1024;
let mut next_region_number = 10;
// 2 source -> 2 target (no allocation needed)
let plan_entries = vec![create_allocation_plan_entry(
table_id,
&[1, 2],
&[(0, 50), (50, 200)],
)];
let result = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&plan_entries,
);
assert_eq!(result.len(), 1);
assert_eq!(result[0].target_regions.len(), 2);
assert!(result[0].allocated_region_ids.is_empty());
// next_region_number should not change
assert_eq!(next_region_number, 10);
}
#[test]
fn test_convert_to_repartition_plans_with_allocation() {
let table_id = 1024;
let mut next_region_number = 10;
// 2 source -> 4 target (need to allocate 2 regions)
let plan_entries = vec![create_allocation_plan_entry(
table_id,
&[1, 2],
&[(0, 50), (50, 100), (100, 150), (150, 200)],
)];
let result = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&plan_entries,
);
assert_eq!(result.len(), 1);
assert_eq!(result[0].target_regions.len(), 4);
assert_eq!(result[0].allocated_region_ids.len(), 2);
assert_eq!(
result[0].allocated_region_ids[0],
RegionId::new(table_id, 10)
);
assert_eq!(
result[0].allocated_region_ids[1],
RegionId::new(table_id, 11)
);
// next_region_number should be incremented by 2
assert_eq!(next_region_number, 12);
}
#[test]
fn test_convert_to_repartition_plans_multiple_entries() {
let table_id = 1024;
let mut next_region_number = 10;
// Multiple plan entries with different allocation needs
let plan_entries = vec![
create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
];
let result = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&plan_entries,
);
assert_eq!(result.len(), 3);
assert_eq!(result[0].allocated_region_ids.len(), 1);
assert_eq!(result[1].allocated_region_ids.len(), 0);
assert_eq!(result[2].allocated_region_ids.len(), 2);
// next_region_number should be incremented by 3 total
assert_eq!(next_region_number, 13);
}
#[test]
fn test_count_regions_to_allocate() {
let table_id = 1024;
let mut next_region_number = 10;
let plan_entries = vec![
create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
];
let repartition_plans = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&plan_entries,
);
let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
assert_eq!(count, 2);
}
#[test]
fn test_collect_allocate_regions() {
let table_id = 1024;
let mut next_region_number = 10;
let plan_entries = vec![
create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
];
let repartition_plans = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&plan_entries,
);
let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
assert_eq!(allocate_regions.len(), 2);
assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
}
#[test]
fn test_prepare_region_allocation_data() {
let table_id = 1024;
let regions = [
create_region_descriptor(table_id, 10, "x", 0, 50),
create_region_descriptor(table_id, 11, "x", 50, 100),
];
let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, 10);
assert_eq!(result[1].0, 11);
// Verify partition expressions are serialized
assert!(!result[0].1.is_empty());
assert!(!result[1].1.is_empty());
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::procedure::repartition::group::GroupId;
@@ -37,10 +39,6 @@ pub struct AllocationPlanEntry {
pub source_regions: Vec<RegionDescriptor>,
/// The target partition expressions for the new or changed regions.
pub target_partition_exprs: Vec<PartitionExpr>,
/// The number of regions that need to be allocated (target count - source count, if positive).
pub regions_to_allocate: usize,
/// The number of regions that need to be deallocated (source count - target count, if positive).
pub regions_to_deallocate: usize,
/// For each `source_regions[k]`, the corresponding vector contains global
/// `target_partition_exprs` that overlap with it.
pub transition_map: Vec<Vec<usize>>,
@@ -66,37 +64,449 @@ pub struct RepartitionPlanEntry {
}
impl RepartitionPlanEntry {
/// Converts an allocation plan entry into a repartition plan entry.
///
/// The target regions are derived from the source regions and the target partition expressions.
/// The allocated region ids and pending deallocate region ids are empty.
pub fn from_allocation_plan_entry(
AllocationPlanEntry {
group_id,
source_regions,
target_partition_exprs,
regions_to_allocate,
regions_to_deallocate,
transition_map,
}: &AllocationPlanEntry,
) -> Self {
debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0);
let target_regions = source_regions
/// Returns the target regions that are newly allocated.
pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
self.target_regions
.iter()
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.collect::<Vec<_>>();
.filter(|r| self.allocated_region_ids.contains(&r.region_id))
.collect()
}
}
Self {
group_id: *group_id,
source_regions: source_regions.clone(),
target_regions,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
/// Converts an allocation plan to a repartition plan.
///
/// Converts an [`AllocationPlanEntry`] (which contains abstract region allocation intents)
/// into a [`RepartitionPlanEntry`] with concrete source and target region descriptors,
/// plus records information on which regions are newly allocated and/or pending deallocation.
///
/// # Returns
///
/// A [`Result`] containing the [`RepartitionPlanEntry`] describing exactly the source regions,
/// the target regions (including any that need to be newly allocated), and transition mappings.
///
/// # Notes
/// - If new regions are needed, their region ids are constructed using `table_id` and incrementing
/// from `next_region_number`.
/// - For each target, associates the correct region descriptor; for new regions, the region id
/// is assigned sequentially.
pub fn convert_allocation_plan_to_repartition_plan(
table_id: TableId,
next_region_number: &mut RegionNumber,
AllocationPlanEntry {
group_id,
source_regions,
target_partition_exprs,
transition_map,
..
}: &AllocationPlanEntry,
) -> RepartitionPlanEntry {
match source_regions.len().cmp(&target_partition_exprs.len()) {
Ordering::Less => {
// requires to allocate regions
let pending_allocate_target_partition_exprs = target_partition_exprs
.iter()
.skip(source_regions.len())
.map(|target_partition_expr| {
let desc = RegionDescriptor {
region_id: RegionId::new(table_id, *next_region_number),
partition_expr: target_partition_expr.clone(),
};
*next_region_number += 1;
desc
})
.collect::<Vec<_>>();
let allocated_region_ids = pending_allocate_target_partition_exprs
.iter()
.map(|rd| rd.region_id)
.collect::<Vec<_>>();
let target_regions = source_regions
.iter()
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.chain(pending_allocate_target_partition_exprs)
.collect::<Vec<_>>();
RepartitionPlanEntry {
group_id: *group_id,
source_regions: source_regions.clone(),
target_regions,
allocated_region_ids,
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
}
}
Ordering::Equal => {
let target_regions = source_regions
.iter()
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.collect::<Vec<_>>();
RepartitionPlanEntry {
group_id: *group_id,
source_regions: source_regions.clone(),
target_regions,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
}
}
Ordering::Greater => {
// requires to deallocate regions
let target_regions = source_regions
.iter()
.take(target_partition_exprs.len())
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.collect::<Vec<_>>();
let pending_deallocate_region_ids = source_regions
.iter()
.skip(target_partition_exprs.len())
.map(|source_region| source_region.region_id)
.collect::<Vec<_>>();
RepartitionPlanEntry {
group_id: *group_id,
source_regions: source_regions.clone(),
target_regions,
allocated_region_ids: vec![],
pending_deallocate_region_ids,
transition_map: transition_map.clone(),
}
}
}
}
#[cfg(test)]
mod tests {
use store_api::storage::RegionId;
use uuid::Uuid;
use super::*;
use crate::procedure::repartition::test_util::range_expr;
fn create_region_descriptor(
table_id: TableId,
region_number: u32,
col: &str,
start: i64,
end: i64,
) -> RegionDescriptor {
RegionDescriptor {
region_id: RegionId::new(table_id, region_number),
partition_expr: range_expr(col, start, end),
}
}
#[test]
fn test_convert_plan_equal_regions() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let mut next_region_number = 10;
let source_regions = vec![
create_region_descriptor(table_id, 1, "x", 0, 100),
create_region_descriptor(table_id, 2, "x", 100, 200),
];
let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: Vec::new(),
};
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(result.group_id, group_id);
assert_eq!(result.source_regions, source_regions);
assert_eq!(result.target_regions.len(), 2);
assert!(result.allocated_region_ids.is_empty());
assert!(result.pending_deallocate_region_ids.is_empty());
// next_region_number should not change for equal regions
assert_eq!(next_region_number, 10);
// Verify target regions
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
assert_eq!(
result.target_regions[1].region_id,
RegionId::new(table_id, 2)
);
assert_eq!(
result.target_regions[1].partition_expr,
target_partition_exprs[1]
);
}
#[test]
fn test_convert_plan_allocate_regions() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let mut next_region_number = 10;
// 3 source regions -> 5 target partition expressions
let source_regions = vec![
create_region_descriptor(table_id, 1, "x", 0, 100),
create_region_descriptor(table_id, 2, "x", 100, 200),
create_region_descriptor(table_id, 3, "x", 200, 300),
];
let target_partition_exprs = vec![
range_expr("x", 0, 50),
range_expr("x", 50, 100),
range_expr("x", 100, 150),
range_expr("x", 150, 200),
range_expr("x", 200, 300),
];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: vec![],
};
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(result.group_id, group_id);
assert_eq!(result.source_regions, source_regions);
assert_eq!(result.target_regions.len(), 5);
assert_eq!(result.allocated_region_ids.len(), 2);
assert!(result.pending_deallocate_region_ids.is_empty());
assert_eq!(next_region_number, 12);
// Verify first 3 target regions use source region ids with target partition exprs
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
assert_eq!(
result.target_regions[1].region_id,
RegionId::new(table_id, 2)
);
assert_eq!(
result.target_regions[1].partition_expr,
target_partition_exprs[1]
);
assert_eq!(
result.target_regions[2].region_id,
RegionId::new(table_id, 3)
);
assert_eq!(
result.target_regions[2].partition_expr,
target_partition_exprs[2]
);
// Verify last 2 target regions are newly allocated
assert_eq!(
result.target_regions[3].region_id,
RegionId::new(table_id, 10)
);
assert_eq!(
result.target_regions[3].partition_expr,
target_partition_exprs[3]
);
assert_eq!(
result.target_regions[4].region_id,
RegionId::new(table_id, 11)
);
assert_eq!(
result.target_regions[4].partition_expr,
target_partition_exprs[4]
);
// Verify allocated region ids
assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
}
#[test]
fn test_convert_plan_deallocate_regions() {
let group_id = Uuid::new_v4();
let table_id = 1024;
// 5 source regions -> 3 target partition expressions
let source_regions = vec![
create_region_descriptor(table_id, 1, "x", 0, 50),
create_region_descriptor(table_id, 2, "x", 50, 100),
create_region_descriptor(table_id, 3, "x", 100, 150),
create_region_descriptor(table_id, 4, "x", 150, 200),
create_region_descriptor(table_id, 5, "x", 200, 300),
];
let target_partition_exprs = vec![
range_expr("x", 0, 100),
range_expr("x", 100, 200),
range_expr("x", 200, 300),
];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: vec![],
};
let mut next_region_number = 10;
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(next_region_number, 10);
assert_eq!(result.group_id, group_id);
assert_eq!(result.source_regions, source_regions);
assert_eq!(result.target_regions.len(), 3);
assert!(result.allocated_region_ids.is_empty());
assert_eq!(result.pending_deallocate_region_ids.len(), 2);
// Verify first 3 source regions are kept as target regions with new partition exprs
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
assert_eq!(
result.target_regions[1].region_id,
RegionId::new(table_id, 2)
);
assert_eq!(
result.target_regions[1].partition_expr,
target_partition_exprs[1]
);
assert_eq!(
result.target_regions[2].region_id,
RegionId::new(table_id, 3)
);
assert_eq!(
result.target_regions[2].partition_expr,
target_partition_exprs[2]
);
// Verify last 2 source regions are pending deallocation
assert_eq!(
result.pending_deallocate_region_ids[0],
RegionId::new(table_id, 4)
);
assert_eq!(
result.pending_deallocate_region_ids[1],
RegionId::new(table_id, 5)
);
}
#[test]
fn test_convert_plan_allocate_single_region() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let mut next_region_number = 5;
// 1 source region -> 2 target partition expressions
let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: vec![],
};
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(result.target_regions.len(), 2);
assert_eq!(result.allocated_region_ids.len(), 1);
assert_eq!(result.pending_deallocate_region_ids.len(), 0);
// First target uses source region id
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
// Second target is newly allocated
assert_eq!(
result.target_regions[1].region_id,
RegionId::new(table_id, 5)
);
assert_eq!(
result.target_regions[1].partition_expr,
target_partition_exprs[1]
);
assert_eq!(next_region_number, 6);
}
#[test]
fn test_convert_plan_deallocate_to_single_region() {
let group_id = Uuid::new_v4();
let table_id = 1024;
// 3 source regions -> 1 target partition expression
let source_regions = vec![
create_region_descriptor(table_id, 1, "x", 0, 100),
create_region_descriptor(table_id, 2, "x", 100, 200),
create_region_descriptor(table_id, 3, "x", 200, 300),
];
let target_partition_exprs = vec![range_expr("x", 0, 300)];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: vec![],
};
let mut next_region_number = 10;
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(result.target_regions.len(), 1);
assert_eq!(result.allocated_region_ids.len(), 0);
assert_eq!(result.pending_deallocate_region_ids.len(), 2);
// Only first source region is kept
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
// Other regions are pending deallocation
assert_eq!(
result.pending_deallocate_region_ids[0],
RegionId::new(table_id, 2)
);
assert_eq!(
result.pending_deallocate_region_ids[1],
RegionId::new(table_id, 3)
);
}
}

View File

@@ -127,18 +127,10 @@ impl RepartitionStart {
.iter()
.map(|&idx| target_exprs[idx].clone())
.collect::<Vec<_>>();
let regions_to_allocate = target_partition_exprs
.len()
.saturating_sub(source_regions.len());
let regions_to_deallocate = source_regions
.len()
.saturating_sub(target_partition_exprs.len());
AllocationPlanEntry {
group_id,
source_regions,
target_partition_exprs,
regions_to_allocate,
regions_to_deallocate,
transition_map: subtask.transition_map,
}
})

View File

@@ -127,7 +127,7 @@ pub mod test_data {
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_meta::wal_provider::WalProvider;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
@@ -211,7 +211,7 @@ pub mod test_data {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(

View File

@@ -18,7 +18,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef};
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::build_kafka_client;
use common_meta::wal_provider::build_kafka_client;
use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::test_util::InMemoryPoisonStore;

View File

@@ -39,7 +39,7 @@ use crate::service::admin::maintenance::MaintenanceHandler;
use crate::service::admin::node_lease::NodeLeaseHandler;
use crate::service::admin::procedure::ProcedureManagerHandler;
use crate::service::admin::recovery::RecoveryHandler;
use crate::service::admin::sequencer::TableIdSequenceHandler;
use crate::service::admin::sequencer::TableIdAllocatorHandler;
/// Expose admin http service on rpc port(3002).
///
@@ -249,8 +249,8 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
let recovery_handler = RecoveryHandler {
manager: metasrv.runtime_switch_manager().clone(),
};
let table_id_sequence_handler = TableIdSequenceHandler {
table_id_sequence: metasrv.table_id_sequence().clone(),
let table_id_allocator_handler = TableIdAllocatorHandler {
table_id_allocator: metasrv.table_id_allocator().clone(),
runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
};
@@ -303,7 +303,7 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
AxumRouter::new()
.route("/next-id", routing::get(sequencer::get_next_table_id))
.route("/set-next-id", routing::post(sequencer::set_next_table_id))
.with_state(table_id_sequence_handler.clone()),
.with_state(table_id_allocator_handler.clone()),
),
);

View File

@@ -16,8 +16,8 @@ use axum::Json;
use axum::extract::{self, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::sequence::SequenceRef;
use serde::{Deserialize, Serialize};
use servers::http::result::error_result::ErrorResponse;
use snafu::{ResultExt, ensure};
@@ -27,12 +27,12 @@ use crate::error::{
};
#[derive(Clone)]
pub(crate) struct TableIdSequenceHandler {
pub(crate) table_id_sequence: SequenceRef,
pub(crate) struct TableIdAllocatorHandler {
pub(crate) table_id_allocator: ResourceIdAllocatorRef,
pub(crate) runtime_switch_manager: RuntimeSwitchManagerRef,
}
impl TableIdSequenceHandler {
impl TableIdAllocatorHandler {
async fn set_next_table_id(&self, next_table_id: u32) -> Result<()> {
ensure!(
self.runtime_switch_manager
@@ -44,7 +44,7 @@ impl TableIdSequenceHandler {
}
);
self.table_id_sequence
self.table_id_allocator
.jump_to(next_table_id as u64)
.await
.context(SetNextSequenceSnafu)
@@ -52,7 +52,7 @@ impl TableIdSequenceHandler {
async fn peek_table_id(&self) -> Result<u32> {
let next_table_id = self
.table_id_sequence
.table_id_allocator
.peek()
.await
.context(PeekSequenceSnafu)?;
@@ -73,7 +73,7 @@ pub(crate) struct ResetTableIdRequest {
/// Set the next table id.
#[axum_macros::debug_handler]
pub(crate) async fn set_next_table_id(
State(handler): State<TableIdSequenceHandler>,
State(handler): State<TableIdAllocatorHandler>,
extract::Json(ResetTableIdRequest { next_table_id }): extract::Json<ResetTableIdRequest>,
) -> Response {
match handler.set_next_table_id(next_table_id).await {
@@ -84,7 +84,7 @@ pub(crate) async fn set_next_table_id(
/// Get the next table id without incrementing the sequence.
#[axum_macros::debug_handler]
pub(crate) async fn get_next_table_id(State(handler): State<TableIdSequenceHandler>) -> Response {
pub(crate) async fn get_next_table_id(State(handler): State<TableIdAllocatorHandler>) -> Response {
match handler.peek_table_id().await {
Ok(next_table_id) => {
(StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()

View File

@@ -38,7 +38,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_meta::wal_provider::build_wal_provider;
use common_procedure::ProcedureManagerRef;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::SlowQueryOptions;
@@ -190,7 +190,7 @@ impl GreptimeDbStandaloneBuilder {
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
let table_id_allocator = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
@@ -203,13 +203,13 @@ impl GreptimeDbStandaloneBuilder {
.build(),
);
let kafka_options = opts.wal.clone().try_into().unwrap();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
.await
.unwrap();
let wal_options_allocator = Arc::new(wal_options_allocator);
let wal_provider = Arc::new(wal_provider);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_id_allocator,
wal_provider.clone(),
));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
@@ -278,7 +278,7 @@ impl GreptimeDbStandaloneBuilder {
flow_streaming_engine.set_frontend_invoker(invoker).await;
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();
wal_provider.start().await.unwrap();
test_util::prepare_another_catalog_and_schema(&instance).await;

View File

@@ -437,7 +437,7 @@ async fn test_set_table_id() {
let metasrv = test_context.metasrv();
// Due to the table id 1024 already allocated, we need to jump to 1025.
metasrv.table_id_sequence().jump_to(1025).await.unwrap();
metasrv.table_id_allocator().jump_to(1025).await.unwrap();
// We should able to create table now.
let output = execute_sql(&test_context.frontend(), CREATE_MONITOR_TABLE_SQL).await;