From f1983f91319afcc7830312d428f919e225e3008d Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 20 Jun 2023 11:47:21 +0800 Subject: [PATCH] chore: add retry in create table --- Cargo.lock | 1 + src/frontend/Cargo.toml | 1 + src/frontend/src/instance/distributed.rs | 31 ++++++++++++++++++++---- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1377a3b617..9bf924c252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3204,6 +3204,7 @@ version = "0.4.0" dependencies = [ "api", "async-compat", + "async-recursion", "async-stream", "async-trait", "catalog", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 4487b10e37..670aaa2c71 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -12,6 +12,7 @@ testing = [] [dependencies] api = { path = "../api" } async-compat = "0.2" +async-recursion = "1.0" async-stream.workspace = true async-trait = "0.1" catalog = { path = "../catalog" } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index f20d0596b0..d921779bb6 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -24,6 +24,7 @@ use api::v1::{ column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, FlushTableExpr, InsertRequests, TableId, }; +use async_recursion::async_recursion; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; @@ -40,7 +41,7 @@ use common_meta::rpc::router::{ use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, error, info, warn}; use datanode::instance::sql::table_idents_to_full_name; use datanode::sql::SqlHandler; use datatypes::prelude::ConcreteDataType; @@ -65,6 +66,7 @@ use table::table::AlterContext; use table::TableRef; use crate::catalog::FrontendCatalogManager; +use crate::error::Error::RequestDatanode; use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, @@ -213,10 +215,7 @@ impl DistInstance { ); let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE); - client - .create(create_expr_for_region) - .await - .context(RequestDatanodeSnafu)?; + create_table_to_datanode(client, create_expr_for_region, 0).await?; } // Since the table information created on meta does not go through KvBackend, so we @@ -629,6 +628,28 @@ impl DistInstance { } } +#[async_recursion] +async fn create_table_to_datanode( + client: Database, + req_expr: CreateTableExpr, + level: u8, +) -> Result<()> { + match client.create(req_expr.clone()).await { + Ok(_) => Ok(()), + Err(e) => { + error!( + "fail to create table in database, current level: {:?}, err: {:?}", + level, e + ); + if level < 2 { + create_table_to_datanode(client, req_expr, level + 1).await + } else { + Err(RequestDatanode { source: e }) + } + } + } +} + #[async_trait] impl SqlStatementExecutor for DistInstance { async fn execute_sql(