mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-04 04:50:37 +00:00
Allow flush to recreate missing flows
This commit is contained in:
committed by
GitHub
parent
f1574d940d
commit
48c04c81ee
@@ -249,6 +249,7 @@ impl FlowDualEngine {
|
||||
async fn try_sync_with_check_task(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
allow_create: bool,
|
||||
allow_drop: bool,
|
||||
) -> Result<(), Error> {
|
||||
// this function rarely get called so adding some log is helpful
|
||||
@@ -258,7 +259,7 @@ impl FlowDualEngine {
|
||||
// keep trying to trigger consistent check
|
||||
while retry < max_retry {
|
||||
if let Some(task) = self.check_task.lock().await.as_ref() {
|
||||
task.trigger(false, allow_drop).await?;
|
||||
task.trigger(allow_create, allow_drop).await?;
|
||||
break;
|
||||
}
|
||||
retry += 1;
|
||||
@@ -412,17 +413,10 @@ impl FlowDualEngine {
|
||||
}
|
||||
if errors.is_empty() {
|
||||
info!("Recover flows successfully, flows: {:?}", to_be_created);
|
||||
} else {
|
||||
let failed_flow_ids = errors.iter().map(|(flow_id, _)| *flow_id).join(", ");
|
||||
for (flow_id, err) in errors {
|
||||
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
|
||||
}
|
||||
return InternalSnafu {
|
||||
reason: format!(
|
||||
"Failed to recreate flows during consistency check: [{failed_flow_ids}]"
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
for (flow_id, err) in errors {
|
||||
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
@@ -711,7 +705,7 @@ impl FlowEngine for FlowDualEngine {
|
||||
"Flow {} is not exist in the underlying engine, but exist in metadata",
|
||||
flow_id
|
||||
);
|
||||
self.try_sync_with_check_task(flow_id, true).await?;
|
||||
self.try_sync_with_check_task(flow_id, false, true).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -723,7 +717,7 @@ impl FlowEngine for FlowDualEngine {
|
||||
|
||||
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
// sync with check task
|
||||
self.try_sync_with_check_task(flow_id, false).await?;
|
||||
self.try_sync_with_check_task(flow_id, true, false).await?;
|
||||
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
|
||||
match flow_type {
|
||||
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
|
||||
|
||||
@@ -453,11 +453,7 @@ Affected Rows: 3
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_wildcard_basic');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
|
||||
+-----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
Error: 1005(Cancelled), Timeout expired
|
||||
|
||||
-- this is expected to be the same as above("2") since the new `input_basic` table
|
||||
-- have different table id, so is a different table
|
||||
|
||||
Reference in New Issue
Block a user