chore: avoid sending create table requests for already existing tables (#5347)

* chore: avoid sending create table requests for already existing tables

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-01-15 10:50:25 +08:00
committed by GitHub
parent bd37e086c2
commit 57f8afcb70
4 changed files with 23 additions and 7 deletions

View File

@@ -21,7 +21,7 @@ use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -143,7 +143,12 @@ impl CreateLogicalTablesProcedure {
for peer in leaders {
let requester = self.context.node_manager.datanode(&peer).await;
let request = self.make_request(&peer, region_routes)?;
let Some(request) = self.make_request(&peer, region_routes)? else {
debug!("no region request to send to datanode {}", peer);
// We can skip the rest of the datanodes,
// the rest of the datanodes should have the same result.
break;
};
create_region_tasks.push(async move {
requester

View File

@@ -25,7 +25,7 @@ impl CreateLogicalTablesProcedure {
Ok(())
}
pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> {
pub async fn check_tables_already_exist(&mut self) -> Result<()> {
let table_name_keys = self
.data
.all_create_table_exprs()

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
@@ -31,11 +32,15 @@ impl CreateLogicalTablesProcedure {
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
) -> Result<Option<RegionRequest>> {
let tasks = &self.data.tasks;
let table_ids_already_exists = &self.data.table_ids_already_exists;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for task in tasks {
for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
if table_id_already_exists.is_some() {
continue;
}
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
@@ -51,13 +56,18 @@ impl CreateLogicalTablesProcedure {
}
}
Ok(RegionRequest {
if requests.is_empty() {
debug!("no region request to send to datanodes");
return Ok(None);
}
Ok(Some(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}))
}
fn create_region_request_builder(

View File

@@ -291,6 +291,7 @@ async fn test_on_datanode_create_logical_regions() {
}
});
procedure.check_tables_already_exist().await.unwrap();
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(