mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
chore: per review
This commit is contained in:
@@ -23,7 +23,7 @@ use api::v1::region::InsertRequests;
|
||||
use catalog::CatalogManager;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
use common_meta::error::{Result as MetaResult, UnexpectedSnafu};
|
||||
use common_meta::error::Result as MetaResult;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
@@ -40,7 +40,7 @@ use crate::batching_mode::engine::BatchingEngine;
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu,
|
||||
ListFlowsSnafu, SyncCheckTaskSnafu,
|
||||
ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::repr::{self, DiffRow};
|
||||
@@ -278,7 +278,7 @@ impl FlowDualEngine {
|
||||
let mut check_task = self.check_task.lock().await;
|
||||
ensure!(
|
||||
check_task.is_none(),
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: "Flow consistent check task already exists",
|
||||
}
|
||||
);
|
||||
@@ -293,7 +293,7 @@ impl FlowDualEngine {
|
||||
|
||||
ensure!(
|
||||
check_task.is_some(),
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: "Flow consistent check task does not exist",
|
||||
}
|
||||
);
|
||||
@@ -363,13 +363,13 @@ impl ConsistentCheckTask {
|
||||
.send((allow_create, allow_drop, tx))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: "Failed to send trigger signal",
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
rx.await.map_err(|_| {
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: "Failed to receive trigger signal",
|
||||
}
|
||||
.build()
|
||||
@@ -379,7 +379,7 @@ impl ConsistentCheckTask {
|
||||
|
||||
async fn stop(self) -> Result<(), Error> {
|
||||
self.shutdown_tx.send(()).await.map_err(|_| {
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: "Failed to send shutdown signal",
|
||||
}
|
||||
.build()
|
||||
@@ -585,7 +585,7 @@ impl FlowEngine for FlowDualEngine {
|
||||
})
|
||||
.await?;
|
||||
stream_handler.await.map_err(|e| {
|
||||
crate::error::UnexpectedSnafu {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"),
|
||||
}
|
||||
.build()
|
||||
@@ -670,12 +670,8 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
None => UnexpectedSnafu {
|
||||
err_msg: "Missing request body",
|
||||
}
|
||||
.fail(),
|
||||
_ => UnexpectedSnafu {
|
||||
err_msg: "Invalid request body.",
|
||||
other => common_meta::error::UnexpectedSnafu {
|
||||
err_msg: format!("Invalid request body: {other:?}"),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
@@ -776,11 +772,11 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
None => UnexpectedSnafu {
|
||||
None => common_meta::error::UnexpectedSnafu {
|
||||
err_msg: "Missing request body",
|
||||
}
|
||||
.fail(),
|
||||
_ => UnexpectedSnafu {
|
||||
_ => common_meta::error::UnexpectedSnafu {
|
||||
err_msg: "Invalid request body.",
|
||||
}
|
||||
.fail(),
|
||||
@@ -925,7 +921,7 @@ impl FlowWorkerManager {
|
||||
.copied()
|
||||
.map(FetchFromRow::Idx)
|
||||
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
|
||||
.with_context(|| crate::error::UnexpectedSnafu {
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Column not found: {}, default_value: {:?}",
|
||||
col_name, col_default_val
|
||||
|
||||
Reference in New Issue
Block a user