chore: only retry when retry-able in flow (#5987)

* chore: only retry when retry-able

* chore: revert dbg change

* refactor: per review

* fix: check for available frontend first

* docs: more explain&longer timeout&feat: more retry at every level&try send select 1

* fix: use `sql` method for "SELECT 1"

* fix: also put recover flows in spawned task and a dead loop

* test: update transient error in flow rebuild test

* chore: sleep after sqlness sleep

* chore: add a warning

* chore: wait even more time after reboot
This commit is contained in:
discord9
2025-04-28 17:49:49 +08:00
committed by GitHub
parent 13ac4d5048
commit bdd44fd7ec
17 changed files with 327 additions and 57 deletions

View File

@@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::error;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
@@ -192,6 +192,36 @@ impl Database {
from_grpc_response(response)
}
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
/// is `max_retries * GRPC_CONN_TIMEOUT`
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let mut retries = 0;
let request = self.to_rpc_request(request);
loop {
let raw_response = client.handle(request.clone()).await;
match (raw_response, retries < max_retries) {
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
(Err(err), true) => {
// determine if the error is retryable
if is_grpc_retryable(&err) {
// retry
retries += 1;
warn!("Retrying {} times with error = {:?}", retries, err);
continue;
}
}
(Err(err), false) => {
error!(
"Failed to send request to grpc handle after {} retries, error = {:?}",
retries, err
);
return Err(err.into());
}
}
}
}
#[inline]
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
@@ -368,6 +398,11 @@ impl Database {
}
}
/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc
pub fn is_grpc_retryable(err: &tonic::Status) -> bool {
matches!(err.code(), tonic::Code::Unavailable)
}
#[derive(Default, Debug, Clone)]
struct FlightContext {
auth_header: Option<AuthHeader>,

View File

@@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
UnexpectedSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -81,6 +82,11 @@ impl FlowDualEngine {
}
}
/// Determine if the engine is in distributed mode
pub fn is_distributed(&self) -> bool {
self.streaming_engine.node_id.is_some()
}
pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
self.streaming_engine.clone()
}
@@ -89,6 +95,39 @@ impl FlowDualEngine {
self.batching_engine.clone()
}
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
/// in standalone mode, return immediately
/// notice here if any frontend appear in cluster info this function will return immediately
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
if !self.is_distributed() {
return Ok(());
}
let frontend_client = self.batching_engine().frontend_client.clone();
let sleep_duration = std::time::Duration::from_millis(1_000);
let now = std::time::Instant::now();
loop {
let frontend_list = frontend_client.scan_for_frontend().await?;
if !frontend_list.is_empty() {
let fe_list = frontend_list
.iter()
.map(|(_, info)| &info.peer.addr)
.collect::<Vec<_>>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());
}
let elapsed = now.elapsed();
tokio::time::sleep(sleep_duration).await;
info!("Waiting for available frontend, elapsed={:?}", elapsed);
if elapsed >= timeout {
return NoAvailableFrontendSnafu {
timeout,
context: "No available frontend found in cluster info",
}
.fail();
}
}
}
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
///
/// the need to sync is to make sure flush flow actually get called
@@ -338,18 +377,36 @@ struct ConsistentCheckTask {
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
// first do recover flows
engine.check_flow_consistent(true, false).await?;
let inner = engine.clone();
let engine = engine.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
.await
{
warn!("No frontend is available yet:\n {err:?}");
}
// then do recover flows, if failed, always retry
let mut recover_retry = 0;
while let Err(err) = engine.check_flow_consistent(true, false).await {
recover_retry += 1;
error!(
"Failed to recover flows:\n {err:?}, retry {} in {}s",
recover_retry,
MIN_REFRESH_DURATION.as_secs()
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
// then do check flows, with configurable allow_create and allow_drop
let (mut allow_create, mut allow_drop) = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
loop {
if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await {
if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
error!(err; "Failed to check flow consistent");
}
if let Some(done) = ret_signal.take() {
@@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine {
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => Ok(0),
None => {
warn!(
"Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
);
Ok(0)
}
}
}

View File

@@ -31,10 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
/// Grpc connection timeout
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
/// Grpc max retry number
const GRPC_MAX_RETRIES: u32 = 3;
/// Flow wait for available frontend timeout,
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
/// which should prevent flownode from starting
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);

View File

@@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
pub struct BatchingEngine {
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
/// frontend client for insert request
pub(crate) frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,

View File

@@ -15,6 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,15 +27,17 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES,
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::Error;
/// Just like [`GrpcQueryHandler`] but use BoxedError
@@ -127,10 +130,24 @@ impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
/// Try sending a "SELECT 1" to the database
async fn try_select_one(&self) -> Result<(), Error> {
// notice here use `sql` for `SELECT 1` return 1 row
let _ = self
.database
.sql("SELECT 1")
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
})?;
Ok(())
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
@@ -160,8 +177,8 @@ impl FrontendClient {
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(
/// Get the database with maximum `last_activity_ts`& is able to process query
async fn get_latest_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -177,22 +194,50 @@ impl FrontendClient {
.fail();
};
let frontends = self.scan_for_frontend().await?;
let mut peer = None;
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
peer = Some(val.peer.clone());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
> now_in_ms
})
{
let addr = &node_info.peer.addr;
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
match db.try_select_one().await {
Ok(_) => return Ok(db),
Err(e) => {
warn!(
"Failed to connect to frontend {} on retry={}: \n{e:?}",
addr, retry
);
}
}
}
// no available frontend
// sleep and retry
interval.tick().await;
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(catalog, schema, client);
Ok(DatabaseWithPeer::new(database, peer))
NoAvailableFrontendSnafu {
timeout: GRPC_CONN_TIMEOUT,
context: "No available frontend found that is able to process query",
}
.fail()
}
pub async fn create(
@@ -222,38 +267,18 @@ impl FrontendClient {
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_last_active_frontend(catalog, schema).await?;
let db = self.get_latest_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let mut retry = 0;
loop {
let ret = db.database.handle(req.clone()).await.with_context(|_| {
InvalidRequestSnafu {
context: format!("Failed to handle request: {:?}", req),
}
});
if let Err(err) = ret {
if retry < GRPC_MAX_RETRIES {
retry += 1;
warn!(
"Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}",
db.peer, retry, err
);
continue;
} else {
common_telemetry::error!(
"Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}",
db.peer, retry, err
);
return Err(err);
}
}
return ret;
}
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
})
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()

View File

@@ -61,6 +61,16 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"No available frontend found after timeout: {timeout:?}, context: {context}"
))]
NoAvailableFrontend {
timeout: std::time::Duration,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -296,7 +306,8 @@ impl ErrorExt for Error {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
| Self::InsertIntoFlow { .. }
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }

View File

@@ -172,6 +172,8 @@ impl FlownodeServer {
}
/// Start the background task for streaming computation.
///
/// Should be called only after heartbeat is establish, hence can get cluster info
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let handle = manager_ref

View File

@@ -31,6 +31,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -23,6 +23,9 @@ FROM
distinct_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -101,6 +110,16 @@ GROUP BY
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
+--------------------+---------------------------------------------------------------------------------------+

View File

@@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic;
@@ -44,10 +47,16 @@ FROM
numbers_input_basic
GROUP BY
ts;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;

View File

@@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -91,6 +94,9 @@ FROM
SHOW CREATE TABLE out_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic;
ADMIN FLUSH_FLOW('test_distinct_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -730,10 +730,21 @@ SELECT key FROM api_stats;
+-----+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow');
SELECT key FROM api_stats;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+------------------------------------+

View File

@@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';