chore: add retry in create table

This commit is contained in:
shuiyisong
2023-06-20 11:47:21 +08:00
parent 8bc28313d3
commit f1983f9131
3 changed files with 28 additions and 5 deletions

1
Cargo.lock generated
View File

@@ -3204,6 +3204,7 @@ version = "0.4.0"
dependencies = [
"api",
"async-compat",
"async-recursion",
"async-stream",
"async-trait",
"catalog",

View File

@@ -12,6 +12,7 @@ testing = []
[dependencies]
api = { path = "../api" }
async-compat = "0.2"
async-recursion = "1.0"
async-stream.workspace = true
async-trait = "0.1"
catalog = { path = "../catalog" }

View File

@@ -24,6 +24,7 @@ use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequests, TableId,
};
use async_recursion::async_recursion;
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
@@ -40,7 +41,7 @@ use common_meta::rpc::router::{
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::{debug, info, warn};
use common_telemetry::{debug, error, info, warn};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::sql::SqlHandler;
use datatypes::prelude::ConcreteDataType;
@@ -65,6 +66,7 @@ use table::table::AlterContext;
use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::Error::RequestDatanode;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
@@ -213,10 +215,7 @@ impl DistInstance {
);
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE);
client
.create(create_expr_for_region)
.await
.context(RequestDatanodeSnafu)?;
create_table_to_datanode(client, create_expr_for_region, 0).await?;
}
// Since the table information created on meta does not go through KvBackend, so we
@@ -629,6 +628,28 @@ impl DistInstance {
}
}
#[async_recursion]
async fn create_table_to_datanode(
client: Database,
req_expr: CreateTableExpr,
level: u8,
) -> Result<()> {
match client.create(req_expr.clone()).await {
Ok(_) => Ok(()),
Err(e) => {
error!(
"fail to create table in database, current level: {:?}, err: {:?}",
level, e
);
if level < 2 {
create_table_to_datanode(client, req_expr, level + 1).await
} else {
Err(RequestDatanode { source: e })
}
}
}
}
#[async_trait]
impl SqlStatementExecutor for DistInstance {
async fn execute_sql(