mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-02 05:10:40 +00:00
feat: flow recreate on reboot (#4509)
* feat: flow reboot clean * refactor: per review * refactor: per review * test: sqlness flow reboot
This commit is contained in:
@@ -24,6 +24,7 @@ use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
@@ -296,11 +297,13 @@ impl StartCommand {
|
||||
Arc::new(executor),
|
||||
);
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
|
||||
let flownode_builder = FlownodeBuilder::new(
|
||||
opts,
|
||||
Plugins::new(),
|
||||
table_metadata_manager,
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager,
|
||||
)
|
||||
.with_heartbeat_task(heartbeat_task);
|
||||
|
||||
|
||||
@@ -476,11 +476,13 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
);
|
||||
let flownode = Arc::new(
|
||||
flow_builder
|
||||
@@ -511,7 +513,6 @@ impl StartCommand {
|
||||
opts.wal.into(),
|
||||
kv_backend.clone(),
|
||||
));
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
wal_options_allocator.clone(),
|
||||
|
||||
@@ -83,6 +83,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list flows in flownode={id:?}"))]
|
||||
ListFlows {
|
||||
id: Option<common_meta::FlownodeId>,
|
||||
source: common_meta::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Flow already exist, id={id}"))]
|
||||
FlowAlreadyExist {
|
||||
id: FlowId,
|
||||
@@ -214,7 +222,8 @@ impl ErrorExt for Error {
|
||||
}
|
||||
Self::TableNotFound { .. }
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::InvalidQueryProst { .. }
|
||||
| &Self::InvalidQuery { .. }
|
||||
| &Self::Plan { .. }
|
||||
|
||||
@@ -29,12 +29,13 @@ use common_meta::cache::{
|
||||
};
|
||||
use common_meta::ddl::{table_meta, ProcedureExecutorRef};
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::key::flow::FlowMetadataManagerRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::{self, Flownode, NodeManagerRef};
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing::info;
|
||||
use futures::FutureExt;
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
|
||||
use itertools::Itertools;
|
||||
use meta_client::client::MetaClient;
|
||||
@@ -47,7 +48,7 @@ use serde::de::Unexpected;
|
||||
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::server::Server;
|
||||
use session::context::QueryContextRef;
|
||||
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{broadcast, oneshot, Mutex};
|
||||
@@ -57,7 +58,8 @@ use tonic::{Request, Response, Status};
|
||||
|
||||
use crate::adapter::FlowWorkerManagerRef;
|
||||
use crate::error::{
|
||||
CacheRequiredSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
||||
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
|
||||
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::transform::register_function_to_query_engine;
|
||||
@@ -240,6 +242,7 @@ pub struct FlownodeBuilder {
|
||||
plugins: Plugins,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
}
|
||||
|
||||
@@ -250,12 +253,14 @@ impl FlownodeBuilder {
|
||||
plugins: Plugins,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
plugins,
|
||||
table_meta,
|
||||
catalog_manager,
|
||||
flow_metadata_manager,
|
||||
heartbeat_task: None,
|
||||
}
|
||||
}
|
||||
@@ -283,6 +288,11 @@ impl FlownodeBuilder {
|
||||
self.build_manager(query_engine_factory.query_engine())
|
||||
.await?,
|
||||
);
|
||||
|
||||
if let Err(err) = self.recover_flows(&manager).await {
|
||||
common_telemetry::error!(err; "Failed to recover flows");
|
||||
}
|
||||
|
||||
let server = FlownodeServer::new(FlowService::new(manager.clone()));
|
||||
|
||||
let heartbeat_task = self.heartbeat_task;
|
||||
@@ -296,6 +306,85 @@ impl FlownodeBuilder {
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
/// recover all flow tasks in this flownode in distributed mode(nodeid is Some(<num>))
|
||||
///
|
||||
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
|
||||
///
|
||||
/// TODO(discord9): persisent flow tasks with internal state
|
||||
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> {
|
||||
let nodeid = self.opts.node_id;
|
||||
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
|
||||
let to_be_recover = self
|
||||
.flow_metadata_manager
|
||||
.flownode_flow_manager()
|
||||
.flows(nodeid)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(ListFlowsSnafu { id: Some(nodeid) })?;
|
||||
to_be_recover.into_iter().map(|(id, _)| id).collect()
|
||||
} else {
|
||||
let all_catalogs = self
|
||||
.catalog_manager
|
||||
.catalog_names()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let mut all_flow_ids = vec![];
|
||||
for catalog in all_catalogs {
|
||||
let flows = self
|
||||
.flow_metadata_manager
|
||||
.flow_name_manager()
|
||||
.flow_names(&catalog)
|
||||
.await
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
|
||||
}
|
||||
all_flow_ids
|
||||
};
|
||||
let cnt = to_be_recovered.len();
|
||||
|
||||
// TODO(discord9): recover in parallel
|
||||
for flow_id in to_be_recovered {
|
||||
let info = self
|
||||
.flow_metadata_manager
|
||||
.flow_info_manager()
|
||||
.get(flow_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.context(FlowNotFoundSnafu { id: flow_id })?;
|
||||
|
||||
let sink_table_name = [
|
||||
info.sink_table_name().catalog_name.clone(),
|
||||
info.sink_table_name().schema_name.clone(),
|
||||
info.sink_table_name().table_name.clone(),
|
||||
];
|
||||
manager
|
||||
.create_flow(
|
||||
flow_id as _,
|
||||
sink_table_name,
|
||||
info.source_table_ids(),
|
||||
true,
|
||||
info.expire_after(),
|
||||
Some(info.comment().clone()),
|
||||
info.raw_sql().clone(),
|
||||
info.options().clone(),
|
||||
Some(
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog(info.catalog_name().clone())
|
||||
.build(),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(cnt)
|
||||
}
|
||||
|
||||
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
|
||||
/// nor does it actually start running the worker.
|
||||
async fn build_manager(
|
||||
|
||||
@@ -156,6 +156,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
flow_metadata_manager.clone(),
|
||||
);
|
||||
let flownode = Arc::new(flow_builder.build().await.unwrap());
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ select flush_flow('test_numbers_basic')<=1;
|
||||
| true |
|
||||
+----------------------------------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
INSERT INTO numbers_input_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
|
||||
@@ -14,6 +14,7 @@ SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '202
|
||||
-- because flush_flow result is at most 1
|
||||
select flush_flow('test_numbers_basic')<=1;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
INSERT INTO numbers_input_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
|
||||
Reference in New Issue
Block a user