mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
refactor: allocate table ids in the procedure (#3293)
* refactor: refactor the create logical tables * test(create_logical_tables): add tests for on_prepare * test(create_logical_tables): add tests for on_create_metadata * refactor: rename to create_logical_tables_metadata * chore: fmt toml * chore: apply suggestions from CR
This commit is contained in:
@@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
|
||||
Ok(Self { context, creator })
|
||||
}
|
||||
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
/// On the prepares step, it performs:
|
||||
/// - Checks whether physical table exists.
|
||||
/// - Checks whether logical tables exist.
|
||||
/// - Allocates the table ids.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - The physical table does not exist.
|
||||
/// - Failed to check whether tables exist.
|
||||
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
// Sets physical region numbers
|
||||
@@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
|
||||
.data
|
||||
.set_physical_region_numbers(physical_region_numbers);
|
||||
|
||||
// Checks if the tables exists
|
||||
// Checks if the tables exist
|
||||
let table_name_keys = self
|
||||
.creator
|
||||
.data
|
||||
@@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
|
||||
.map(|x| x.map(|x| x.table_id()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sets table ids already exists
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
|
||||
// If all tables do not exists, we can create them directly.
|
||||
if self.creator.data.is_all_tables_not_exists() {
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
// Filter out the tables that already exist.
|
||||
let tasks = &self.creator.data.tasks;
|
||||
let mut filtered_tasks = Vec::with_capacity(tasks.len());
|
||||
for (task, table_id) in tasks
|
||||
.iter()
|
||||
.zip(self.creator.data.table_ids_already_exists().iter())
|
||||
{
|
||||
// Validates the tasks
|
||||
let tasks = &mut self.creator.data.tasks;
|
||||
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
|
||||
if table_id.is_some() {
|
||||
// If a table already exists, we just ignore it.
|
||||
ensure!(
|
||||
@@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
filtered_tasks.push(task.clone());
|
||||
}
|
||||
|
||||
// Resets tasks
|
||||
self.creator.data.tasks = filtered_tasks;
|
||||
if self.creator.data.tasks.is_empty() {
|
||||
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
|
||||
self.creator.data.state = CreateTablesState::CreateMetadata;
|
||||
return Ok(Status::executing(true));
|
||||
// If all tables already exist, returns the table_ids.
|
||||
if already_exists_tables_ids.iter().all(Option::is_some) {
|
||||
return Ok(Status::done_with_output(
|
||||
already_exists_tables_ids
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
}
|
||||
|
||||
// Allocates table ids
|
||||
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
} else {
|
||||
self.context
|
||||
.table_metadata_allocator
|
||||
.allocate_table_id(task)
|
||||
.await?
|
||||
};
|
||||
task.set_table_id(table_id);
|
||||
}
|
||||
|
||||
self.creator
|
||||
.data
|
||||
.set_table_ids_already_exists(already_exists_tables_ids);
|
||||
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
|
||||
self.create_regions(region_routes).await
|
||||
}
|
||||
|
||||
/// Creates table metadata
|
||||
///
|
||||
/// Abort(not-retry):
|
||||
/// - Failed to create table metadata.
|
||||
pub async fn on_create_metadata(&self) -> Result<Status> {
|
||||
let manager = &self.context.table_metadata_manager;
|
||||
|
||||
let physical_table_id = self.creator.data.physical_table_id();
|
||||
let tables_data = self.creator.data.all_tables_data();
|
||||
let num_tables = tables_data.len();
|
||||
let remaining_tasks = self.creator.data.remaining_tasks();
|
||||
let num_tables = remaining_tasks.len();
|
||||
|
||||
if num_tables > 0 {
|
||||
let chunk_size = manager.max_logical_tables_per_batch();
|
||||
if num_tables > chunk_size {
|
||||
let chunks = tables_data
|
||||
let chunks = remaining_tasks
|
||||
.into_iter()
|
||||
.chunks(chunk_size)
|
||||
.into_iter()
|
||||
@@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
|
||||
manager.create_logical_tables_metadata(chunk).await?;
|
||||
}
|
||||
} else {
|
||||
manager.create_logical_tables_metadata(tables_data).await?;
|
||||
manager
|
||||
.create_logical_tables_metadata(remaining_tasks)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let table_ids = self.creator.data.real_table_ids();
|
||||
// The `table_id` MUST be collected after the [Prepare::Prepare],
|
||||
// ensures the all `table_id`s have been allocated.
|
||||
let table_ids = self
|
||||
.creator
|
||||
.data
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
|
||||
|
||||
@@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
|
||||
body: Some(PbRegionRequest::Creates(creates)),
|
||||
};
|
||||
create_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -310,17 +334,13 @@ impl TablesCreator {
|
||||
tasks: Vec<CreateTableTask>,
|
||||
physical_table_id: TableId,
|
||||
) -> Self {
|
||||
let table_ids_from_tasks = tasks
|
||||
.iter()
|
||||
.map(|task| task.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
let len = table_ids_from_tasks.len();
|
||||
let len = tasks.len();
|
||||
|
||||
Self {
|
||||
data: CreateTablesData {
|
||||
cluster_id,
|
||||
state: CreateTablesState::Prepare,
|
||||
tasks,
|
||||
table_ids_from_tasks,
|
||||
table_ids_already_exists: vec![None; len],
|
||||
physical_table_id,
|
||||
physical_region_numbers: vec![],
|
||||
@@ -334,10 +354,6 @@ pub struct CreateTablesData {
|
||||
cluster_id: ClusterId,
|
||||
state: CreateTablesState,
|
||||
tasks: Vec<CreateTableTask>,
|
||||
table_ids_from_tasks: Vec<TableId>,
|
||||
// Because the table_id is allocated before entering the distributed lock,
|
||||
// it needs to recheck if the table exists when creating a table.
|
||||
// If it does exist, then the table_id needs to be replaced with the existing one.
|
||||
table_ids_already_exists: Vec<Option<TableId>>,
|
||||
physical_table_id: TableId,
|
||||
physical_region_numbers: Vec<RegionNumber>,
|
||||
@@ -360,24 +376,6 @@ impl CreateTablesData {
|
||||
self.table_ids_already_exists = table_ids_already_exists;
|
||||
}
|
||||
|
||||
fn table_ids_already_exists(&self) -> &[Option<TableId>] {
|
||||
&self.table_ids_already_exists
|
||||
}
|
||||
|
||||
fn is_all_tables_not_exists(&self) -> bool {
|
||||
self.table_ids_already_exists.iter().all(Option::is_none)
|
||||
}
|
||||
|
||||
pub fn real_table_ids(&self) -> Vec<TableId> {
|
||||
self.table_ids_from_tasks
|
||||
.iter()
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.map(|(table_id_from_task, table_id_already_exists)| {
|
||||
table_id_already_exists.unwrap_or(*table_id_from_task)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
|
||||
self.tasks
|
||||
.iter()
|
||||
@@ -385,18 +383,27 @@ impl CreateTablesData {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
/// Returns the remaining tasks.
|
||||
/// The length of tasks must be greater than 0.
|
||||
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
|
||||
self.tasks
|
||||
.iter()
|
||||
.map(|task| {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
(table_info, table_route)
|
||||
.zip(self.table_ids_already_exists.iter())
|
||||
.flat_map(|(task, table_id)| {
|
||||
if table_id.is_none() {
|
||||
let table_info = task.table_info.clone();
|
||||
let region_ids = self
|
||||
.physical_region_numbers
|
||||
.iter()
|
||||
.map(|region_number| {
|
||||
RegionId::new(table_info.ident.table_id, *region_number)
|
||||
})
|
||||
.collect();
|
||||
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
|
||||
Some((table_info, table_route))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ impl TableMetadataAllocator {
|
||||
}
|
||||
}
|
||||
|
||||
async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
|
||||
let table_id = if let Some(table_id) = &task.create_table.table_id {
|
||||
let table_id = table_id.id;
|
||||
|
||||
|
||||
@@ -12,4 +12,5 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod create_logical_tables;
|
||||
mod create_table;
|
||||
|
||||
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
518
src/common/meta/src/ddl/tests/create_logical_tables.rs
Normal file
@@ -0,0 +1,518 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::RawTableInfo;
|
||||
|
||||
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
|
||||
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
|
||||
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
|
||||
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::ddl::CreateTableTask;
|
||||
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("host")
|
||||
.data_type(ColumnDataType::String)
|
||||
.semantic_type(SemanticType::Tag)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["host".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this code may be duplicated with others.
|
||||
// However, it's by design, ensures the tests are easy to be modified or added.
|
||||
fn test_create_physical_table_task(name: &str) -> CreateTableTask {
|
||||
let create_table = TestCreateTableExprBuilder::default()
|
||||
.column_defs([
|
||||
TestColumnDefBuilder::default()
|
||||
.name("ts")
|
||||
.data_type(ColumnDataType::TimestampMillisecond)
|
||||
.semantic_type(SemanticType::Timestamp)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
TestColumnDefBuilder::default()
|
||||
.name("value")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
])
|
||||
.time_index("ts")
|
||||
.primary_keys(["value".into()])
|
||||
.table_name(name)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into();
|
||||
let table_info = build_raw_table_info_from_expr(&create_table);
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_physical_table_not_found() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = 1024u32;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableRouteNotFound { .. });
|
||||
}
|
||||
|
||||
async fn create_physical_table_metadata(
|
||||
ddl_context: &DdlContext,
|
||||
table_info: RawTableInfo,
|
||||
table_route: TableRouteValue,
|
||||
) {
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route, HashMap::default())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let tasks = vec![test_create_logical_table_task("foo")];
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_logical_table_exists_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(1025);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, Error::TableAlreadyExists { .. });
|
||||
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_with_create_if_table_exists() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("foo");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure =
|
||||
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
let output = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*output, vec![8192]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_prepare_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NaiveDatanodeHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for NaiveDatanodeHandler {
|
||||
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
|
||||
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_request: QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![1025, 1026]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_part_logical_tables_exist() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// Creates the logical table metadata.
|
||||
let mut task = test_create_logical_table_task("exists");
|
||||
task.set_table_id(8192);
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Sets `create_if_not_exists`
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let non_exist_task = test_create_logical_table_task("non_exists");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task, non_exist_task],
|
||||
physical_table_id,
|
||||
ddl_context,
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let status = procedure.execute(&ctx).await.unwrap();
|
||||
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
|
||||
assert_eq!(*table_ids, vec![8192, 1025]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_create_metadata_err() {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
let cluster_id = 1;
|
||||
// Prepares physical table metadata.
|
||||
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
|
||||
let TableMetadata {
|
||||
table_id,
|
||||
table_route,
|
||||
..
|
||||
} = ddl_context
|
||||
.table_metadata_allocator
|
||||
.create(
|
||||
&TableMetadataAllocatorContext { cluster_id },
|
||||
&create_physical_table_task,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
create_physical_table_metadata(
|
||||
&ddl_context,
|
||||
create_physical_table_task.table_info.clone(),
|
||||
table_route,
|
||||
)
|
||||
.await;
|
||||
// The create logical table procedure.
|
||||
let physical_table_id = table_id;
|
||||
// Creates the logical table metadata.
|
||||
let task = test_create_logical_table_task("foo");
|
||||
let yet_another_task = test_create_logical_table_task("bar");
|
||||
let mut procedure = CreateLogicalTablesProcedure::new(
|
||||
cluster_id,
|
||||
vec![task.clone(), yet_another_task],
|
||||
physical_table_id,
|
||||
ddl_context.clone(),
|
||||
);
|
||||
let status = procedure.on_prepare().await.unwrap();
|
||||
assert_matches!(status, Status::Executing { persist: true });
|
||||
let ctx = ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
};
|
||||
procedure.execute(&ctx).await.unwrap();
|
||||
// Creates logical table metadata(different with the task)
|
||||
let mut task = task.clone();
|
||||
task.table_info.ident.table_id = 1025;
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_logical_tables_metadata(vec![(
|
||||
task.table_info,
|
||||
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
// Triggers procedure to create table metadata
|
||||
let error = procedure.execute(&ctx).await.unwrap_err();
|
||||
assert!(!error.is_retry_later());
|
||||
}
|
||||
@@ -169,10 +169,10 @@ impl TruncateTableProcedure {
|
||||
let requester = requester.clone();
|
||||
|
||||
truncate_region_tasks.push(async move {
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(add_peer_context_if_needed(datanode)(err));
|
||||
}
|
||||
Ok(())
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(datanode))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -325,6 +325,11 @@ impl TableRouteManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [TableRouteValue::Physical] of table.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - the physical table(`logical_or_physical_table_id`) does not exists
|
||||
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exists.
|
||||
pub async fn get_physical_table_route(
|
||||
&self,
|
||||
logical_or_physical_table_id: TableId,
|
||||
|
||||
@@ -363,6 +363,11 @@ impl CreateTableTask {
|
||||
table: &table.table_name,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the `table_info`'s table_id.
|
||||
pub fn set_table_id(&mut self, table_id: TableId) {
|
||||
self.table_info.ident.table_id = table_id;
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for CreateTableTask {
|
||||
|
||||
Reference in New Issue
Block a user