diff --git a/Cargo.lock b/Cargo.lock index 1cb5079934..d365e95aec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3763,6 +3763,7 @@ dependencies = [ "async-recursion", "async-trait", "bytes", + "cache", "catalog", "client", "common-base", @@ -3796,6 +3797,8 @@ dependencies = [ "minstant", "nom", "num-traits", + "operator", + "partition", "pretty_assertions", "prost 0.12.6", "query", diff --git a/config/config.md b/config/config.md index e9f139d9bc..8838059787 100644 --- a/config/config.md +++ b/config/config.md @@ -454,7 +454,6 @@ | --- | -----| ------- | ----------- | | `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. | | `node_id` | Integer | `None` | The flownode identifier and should be unique in the cluster. | -| `frontend_addr` | String | `http://127.0.0.1:4001` | Frontend grpc address. Used by flownode to write result back to frontend. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. | | `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,
and used for connections from outside the host | diff --git a/config/flownode.example.toml b/config/flownode.example.toml index 69bcd94a22..0f9ddea457 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -5,9 +5,6 @@ mode = "distributed" ## +toml2docs:none-default node_id = 14 -## Frontend grpc address. Used by flownode to write result back to frontend. -frontend_addr = "http://127.0.0.1:4001" - ## The gRPC server options. [grpc] ## The address to bind the gRPC server. diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 8ad90b2f77..554c5d46c0 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; +use client::client_manager::NodeClients; use common_base::Plugins; use common_config::Configurable; +use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -26,17 +28,16 @@ use common_meta::key::TableMetadataManager; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; -use flow::{FlownodeBuilder, FlownodeInstance}; +use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker}; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use meta_client::{MetaClientOptions, MetaClientType}; use servers::Mode; use snafu::{OptionExt, ResultExt}; -use tonic::transport::Endpoint; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, - MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, TonicTransportSnafu, + MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -132,10 +133,6 @@ struct StartCommand { /// Metasrv address list; #[clap(long, value_delimiter = ',', num_args = 1..)] metasrv_addrs: Option>, - /// The gprc address of the frontend server used for writing results back to the database. - /// Need prefix i.e. "http://" - #[clap(long)] - frontend_addr: Option, /// The configuration file for flownode #[clap(short, long)] config_file: Option, @@ -186,10 +183,6 @@ impl StartCommand { opts.grpc.hostname.clone_from(hostname); } - if let Some(fe_addr) = &self.frontend_addr { - opts.frontend_addr = Some(fe_addr.clone()); - } - if let Some(node_id) = self.node_id { opts.node_id = Some(node_id); } @@ -228,10 +221,6 @@ impl StartCommand { let opts = opts.component; - let frontend_addr = opts.frontend_addr.clone().context(MissingConfigSnafu { - msg: "'frontend_addr'", - })?; - // TODO(discord9): make it not optionale after cluster id is required let cluster_id = opts.cluster_id.unwrap_or(0); @@ -286,7 +275,8 @@ impl StartCommand { layered_cache_registry.clone(), ); - let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend)); + let table_metadata_manager = + Arc::new(TableMetadataManager::new(cached_meta_backend.clone())); table_metadata_manager .init() .await @@ -310,26 +300,33 @@ impl StartCommand { opts, Plugins::new(), table_metadata_manager, - catalog_manager, + catalog_manager.clone(), ) .with_heartbeat_task(heartbeat_task); let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; - // set up the lazy connection to the frontend server - // TODO(discord9): consider move this to start() or pre_start()? - let endpoint = - Endpoint::from_shared(frontend_addr.clone()).context(TonicTransportSnafu { - msg: Some(format!("Fail to create from addr={}", frontend_addr)), - })?; - let chnl = endpoint.connect().await.context(TonicTransportSnafu { - msg: Some("Fail to connect to frontend".to_string()), - })?; - info!("Connected to frontend server: {:?}", frontend_addr); - let client = flow::FrontendClient::new(chnl); + // flownode's frontend to datanode need not timeout. + // Some queries are expected to take long time. + let channel_config = ChannelConfig { + timeout: None, + ..Default::default() + }; + let client = Arc::new(NodeClients::new(channel_config)); + + let invoker = FrontendInvoker::build_from( + flownode.flow_worker_manager().clone(), + catalog_manager.clone(), + cached_meta_backend.clone(), + layered_cache_registry.clone(), + meta_client.clone(), + client, + ) + .await + .context(StartFlownodeSnafu)?; flownode .flow_worker_manager() - .set_frontend_invoker(Box::new(client)) + .set_frontend_invoker(invoker) .await; Ok(Instance::new(flownode, guard)) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f97643c914..bb531a79a3 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,7 +44,7 @@ use common_wal::config::StandaloneWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; -use flow::FlownodeBuilder; +use flow::{FlowWorkerManager, FlownodeBuilder, FrontendInvoker}; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -61,13 +61,15 @@ use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; +use tokio::sync::broadcast; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, + ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, + StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -214,6 +216,9 @@ impl StandaloneOptions { pub struct Instance { datanode: Datanode, frontend: FeInstance, + // TODO(discord9): wrapped it in flownode instance instead + flow_worker_manager: Arc, + flow_shutdown: broadcast::Sender<()>, procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, @@ -245,6 +250,9 @@ impl App for Instance { .context(StartFrontendSnafu)?; self.frontend.start().await.context(StartFrontendSnafu)?; + self.flow_worker_manager + .clone() + .run_background(Some(self.flow_shutdown.subscribe())); Ok(()) } @@ -263,6 +271,15 @@ impl App for Instance { .shutdown() .await .context(ShutdownDatanodeSnafu)?; + self.flow_shutdown + .send(()) + .map_err(|_e| { + flow::error::InternalSnafu { + reason: "Failed to send shutdown signal to flow worker manager, all receiver end already closed".to_string(), + } + .build() + }) + .context(ShutdownFlownodeSnafu)?; info!("Datanode instance stopped."); Ok(()) @@ -447,6 +464,12 @@ impl StartCommand { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; + let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .context(StartDatanodeSnafu)?; + let flow_builder = FlownodeBuilder::new( Default::default(), fe_plugins.clone(), @@ -461,12 +484,6 @@ impl StartCommand { .context(OtherSnafu)?, ); - let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) - .with_kv_backend(kv_backend.clone()) - .build() - .await - .context(StartDatanodeSnafu)?; - let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), flow_server: flownode.flow_worker_manager(), @@ -510,24 +527,32 @@ impl StartCommand { let mut frontend = FrontendBuilder::new( fe_opts.clone(), - kv_backend, - layered_cache_registry, - catalog_manager, - node_manager, - ddl_task_executor, + kv_backend.clone(), + layered_cache_registry.clone(), + catalog_manager.clone(), + node_manager.clone(), + ddl_task_executor.clone(), ) .with_plugin(fe_plugins.clone()) .try_build() .await .context(StartFrontendSnafu)?; - // flow server need to be able to use frontend to write insert requests back let flow_worker_manager = flownode.flow_worker_manager(); - flow_worker_manager - .set_frontend_invoker(Box::new(frontend.clone())) - .await; - // TODO(discord9): unify with adding `start` and `shutdown` method to flownode too. - let _handle = flow_worker_manager.run_background(); + // flow server need to be able to use frontend to write insert requests back + let invoker = FrontendInvoker::build_from( + flow_worker_manager.clone(), + catalog_manager.clone(), + kv_backend.clone(), + layered_cache_registry.clone(), + ddl_task_executor.clone(), + node_manager, + ) + .await + .context(StartFlownodeSnafu)?; + flow_worker_manager.set_frontend_invoker(invoker).await; + + let (tx, _rx) = broadcast::channel(1); let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins) .build() @@ -540,6 +565,8 @@ impl StartCommand { Ok(Instance { datanode, frontend, + flow_worker_manager, + flow_shutdown: tx, procedure_manager, wal_options_allocator, _guard: guard, diff --git a/src/common/frontend/src/handler.rs b/src/common/frontend/src/handler.rs deleted file mode 100644 index c2d8a8308e..0000000000 --- a/src/common/frontend/src/handler.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::{RowDeleteRequests, RowInsertRequests}; -use async_trait::async_trait; -use common_query::Output; -use session::context::QueryContextRef; - -use crate::error::Result; - -/// [FrontendInvoker] provides the ability to: -/// - Insert rows -/// - Delete rows -#[async_trait] -pub trait FrontendInvoker { - async fn row_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> Result; - - async fn row_deletes( - &self, - requests: RowDeleteRequests, - ctx: QueryContextRef, - ) -> Result; -} diff --git a/src/common/frontend/src/lib.rs b/src/common/frontend/src/lib.rs index 69bab1326b..3824f3b377 100644 --- a/src/common/frontend/src/lib.rs +++ b/src/common/frontend/src/lib.rs @@ -13,4 +13,3 @@ // limitations under the License. pub mod error; -pub mod handler; diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index b7aefb6204..4dfc70e6c4 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -13,6 +13,7 @@ arrow-schema.workspace = true async-recursion = "1.0" async-trait.workspace = true bytes.workspace = true +cache.workspace = true catalog.workspace = true client.workspace = true common-base.workspace = true @@ -47,6 +48,8 @@ meta-client.workspace = true minstant = "0.1.7" nom = "7.1.3" num-traits = "0.2" +operator.workspace = true +partition.workspace = true prost.workspace = true query.workspace = true serde.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 00b071f474..e22894c19d 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -23,7 +23,6 @@ use std::time::{Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use common_config::Configurable; use common_error::ext::BoxedError; -use common_frontend::handler::FrontendInvoker; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -42,7 +41,8 @@ use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::broadcast::error::TryRecvError; +use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; @@ -65,6 +65,7 @@ pub(crate) mod node_context; mod table_source; use crate::error::Error; +use crate::FrontendInvoker; // TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9 pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; @@ -84,7 +85,6 @@ pub struct FlownodeOptions { pub cluster_id: Option, pub node_id: Option, pub grpc: GrpcOptions, - pub frontend_addr: Option, pub meta_client: Option, pub logging: LoggingOptions, pub tracing: TracingOptions, @@ -98,7 +98,6 @@ impl Default for FlownodeOptions { cluster_id: None, node_id: None, grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"), - frontend_addr: None, meta_client: None, logging: LoggingOptions::default(), tracing: TracingOptions::default(), @@ -120,10 +119,10 @@ pub struct FlowWorkerManager { /// which is `!Send` so a handle is used pub worker_handles: Vec>, /// The query engine that will be used to parse the query and convert it to a dataflow plan - query_engine: Arc, + pub query_engine: Arc, /// Getting table name and table schema from table info manager table_info_source: TableSource, - frontend_invoker: RwLock>>, + frontend_invoker: RwLock>, /// contains mapping from table name to global id, and table schema node_context: RwLock, flow_err_collectors: RwLock>, @@ -135,7 +134,7 @@ pub struct FlowWorkerManager { /// Building FlownodeManager impl FlowWorkerManager { /// set frontend invoker - pub async fn set_frontend_invoker(&self, frontend: Box) { + pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) { *self.frontend_invoker.write().await = Some(frontend); } @@ -460,11 +459,14 @@ impl FlowWorkerManager { /// Flow Runtime related methods impl FlowWorkerManager { /// run in common_runtime background runtime - pub fn run_background(self: Arc) -> JoinHandle<()> { + pub fn run_background( + self: Arc, + shutdown: Option>, + ) -> JoinHandle<()> { info!("Starting flownode manager's background task"); // TODO(discord9): add heartbeat tasks here common_runtime::spawn_bg(async move { - self.run().await; + self.run(shutdown).await; }) } @@ -485,7 +487,7 @@ impl FlowWorkerManager { /// Trigger dataflow running, and then send writeback request to the source sender /// /// note that this method didn't handle input mirror request, as this should be handled by grpc server - pub async fn run(&self) { + pub async fn run(&self, mut shutdown: Option>) { debug!("Starting to run"); loop { // TODO(discord9): only run when new inputs arrive or scheduled to @@ -497,8 +499,28 @@ impl FlowWorkerManager { common_telemetry::error!(err;"Send writeback request errors"); }; self.log_all_errors().await; + match &shutdown.as_mut().map(|s| s.try_recv()) { + Some(Ok(())) => { + info!("Shutdown flow's main loop"); + break; + } + Some(Err(TryRecvError::Empty)) => (), + Some(Err(TryRecvError::Closed)) => { + common_telemetry::error!("Shutdown channel is closed"); + break; + } + Some(Err(TryRecvError::Lagged(num))) => { + common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num); + break; + } + None => (), + } tokio::time::sleep(std::time::Duration::from_secs(1)).await; } + // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent: + // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter + // -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_worker_manager.frontend_invoker + self.frontend_invoker.write().await.take(); } /// Run all available subgraph in the flow node diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 6d84b6be05..7b6fd3633d 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -190,6 +190,13 @@ pub enum Error { #[snafu(source)] error: std::net::AddrParseError, }, + + #[snafu(display("Failed to get cache from cache registry: {}", name))] + CacheRequired { + #[snafu(implicit)] + location: Location, + name: String, + }, } /// Result type for flow module @@ -216,7 +223,7 @@ impl ErrorExt for Error { StatusCode::Unsupported } Self::External { source, .. } => source.status_code(), - Self::Internal { .. } => StatusCode::Internal, + Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { source.status_code() } diff --git a/src/flow/src/fe_client.rs b/src/flow/src/fe_client.rs deleted file mode 100644 index 2eac6853e8..0000000000 --- a/src/flow/src/fe_client.rs +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Frontend Client for flownode, used for writing result back to database - -use api::v1::greptime_database_client::GreptimeDatabaseClient; -use api::v1::greptime_request::Request; -use api::v1::{ - GreptimeRequest, GreptimeResponse, RequestHeader, RowDeleteRequests, RowInsertRequests, -}; -use common_error::ext::BoxedError; -use common_frontend::handler::FrontendInvoker; -use common_query::Output; -use common_telemetry::tracing_context::{TracingContext, W3cTrace}; -use session::context::{QueryContext, QueryContextRef}; -use snafu::IntoError; -use tokio::sync::Mutex; - -use crate::{Error, Result}; - -/// Frontend client for writing result back to database -pub struct FrontendClient { - client: GreptimeDatabaseClient, -} - -impl FrontendClient { - pub fn new(channel: tonic::transport::Channel) -> Self { - Self { - client: GreptimeDatabaseClient::new(channel), - } - } -} - -fn to_rpc_request(request: Request, ctx: &QueryContext) -> GreptimeRequest { - let header = RequestHeader { - catalog: ctx.current_catalog().to_string(), - schema: ctx.current_schema().to_string(), - authorization: None, - // dbname is empty so that header use catalog+schema to determine the database - // see `create_query_context` in `greptime_handler.rs` - dbname: "".to_string(), - timezone: ctx.timezone().to_string(), - tracing_context: TracingContext::from_current_span().to_w3c(), - }; - GreptimeRequest { - header: Some(header), - request: Some(request), - } -} - -fn from_rpc_error(e: tonic::Status) -> common_frontend::error::Error { - common_frontend::error::ExternalSnafu {} - .into_error(BoxedError::new(client::error::Error::from(e))) -} - -fn resp_to_output(resp: GreptimeResponse) -> Output { - let affect_rows = resp - .response - .map(|r| match r { - api::v1::greptime_response::Response::AffectedRows(r) => r.value, - }) - .unwrap_or(0); - - Output::new_with_affected_rows(affect_rows as usize) -} - -#[async_trait::async_trait] -impl FrontendInvoker for FrontendClient { - async fn row_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> common_frontend::error::Result { - let req = to_rpc_request(Request::RowInserts(requests), &ctx); - let resp = self - .client - .clone() - .handle(req) - .await - .map_err(from_rpc_error)?; - Ok(resp_to_output(resp.into_inner())) - } - - async fn row_deletes( - &self, - requests: RowDeleteRequests, - ctx: QueryContextRef, - ) -> common_frontend::error::Result { - let req = to_rpc_request(Request::RowDeletes(requests), &ctx); - let resp = self - .client - .clone() - .handle(req) - .await - .map_err(from_rpc_error)?; - Ok(resp_to_output(resp.into_inner())) - } -} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 23a401d393..d01e5ea283 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -25,9 +25,8 @@ // allow unused for now because it should be use later mod adapter; mod compute; -mod error; +pub mod error; mod expr; -mod fe_client; pub mod heartbeat; mod plan; mod repr; @@ -37,5 +36,4 @@ mod utils; pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; -pub use fe_client::FrontendClient; -pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer}; +pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index fb0f679c1f..cbe95143d6 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -17,37 +17,53 @@ use std::net::SocketAddr; use std::sync::Arc; +use api::v1::{RowDeleteRequests, RowInsertRequests}; +use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; +use client::client_manager::NodeClients; use common_base::Plugins; -use common_meta::ddl::table_meta; +use common_error::ext::BoxedError; +use common_grpc::channel_manager::ChannelConfig; +use common_meta::cache::{ + LayeredCacheRegistry, LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef, +}; +use common_meta::ddl::{table_meta, ProcedureExecutorRef}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; -use common_meta::node_manager::Flownode; +use common_meta::node_manager::{self, Flownode, NodeManagerRef}; +use common_query::Output; use common_telemetry::tracing::info; use futures::FutureExt; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use meta_client::client::MetaClient; -use query::QueryEngineFactory; +use operator::delete::Deleter; +use operator::insert::Inserter; +use operator::statement::StatementExecutor; +use partition::manager::PartitionRuleManager; +use query::{QueryEngine, QueryEngineFactory}; use serde::de::Unexpected; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::heartbeat_options::HeartbeatOptions; use servers::server::Server; -use snafu::{ensure, ResultExt}; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; -use tokio::sync::{oneshot, Mutex}; +use tokio::sync::{broadcast, oneshot, Mutex}; use tonic::codec::CompressionEncoding; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; use crate::adapter::FlowWorkerManagerRef; -use crate::error::{ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu}; +use crate::error::{ + CacheRequiredSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, +}; use crate::heartbeat::HeartbeatTask; use crate::transform::register_function_to_query_engine; use crate::{Error, FlowWorkerManager, FlownodeOptions}; -pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; +pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> #[derive(Clone)] pub struct FlowService { @@ -105,7 +121,7 @@ impl flow_server::Flow for FlowService { } pub struct FlownodeServer { - shutdown_tx: Mutex>>, + shutdown_tx: Mutex>>, flow_service: FlowService, } @@ -142,7 +158,8 @@ impl servers::server::Server for FlownodeServer { Ok(()) } async fn start(&self, addr: SocketAddr) -> Result { - let (tx, rx) = oneshot::channel::<()>(); + let (tx, rx) = broadcast::channel::<()>(1); + let mut rx_server = tx.subscribe(); let (incoming, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; ensure!( @@ -163,15 +180,16 @@ impl servers::server::Server for FlownodeServer { }; let builder = tonic::transport::Server::builder().add_service(self.create_flow_service()); + let _handle = common_runtime::spawn_bg(async move { let _result = builder - .serve_with_incoming_shutdown(incoming, rx.map(drop)) + .serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop)) .await .context(StartGrpcSnafu); }); let manager_ref = self.flow_service.manager.clone(); - let _handle = manager_ref.clone().run_background(); + let _handle = manager_ref.clone().run_background(Some(rx)); Ok(addr) } @@ -250,7 +268,20 @@ impl FlownodeBuilder { } pub async fn build(self) -> Result { - let manager = Arc::new(self.build_manager().await?); + // TODO(discord9): does this query engine need those? + let query_engine_factory = QueryEngineFactory::new_with_plugins( + // query engine in flownode is only used for translate plan with resolved table source. + self.catalog_manager.clone(), + None, + None, + None, + false, + Default::default(), + ); + let manager = Arc::new( + self.build_manager(query_engine_factory.query_engine()) + .await?, + ); let server = FlownodeServer::new(FlowService::new(manager.clone())); let heartbeat_task = self.heartbeat_task; @@ -266,21 +297,12 @@ impl FlownodeBuilder { /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, /// nor does it actually start running the worker. - async fn build_manager(&self) -> Result { - let catalog_manager = self.catalog_manager.clone(); + async fn build_manager( + &self, + query_engine: Arc, + ) -> Result { let table_meta = self.table_meta.clone(); - let query_engine_factory = QueryEngineFactory::new_with_plugins( - // query engine in flownode is only used for translate plan with resolved table source. - catalog_manager, - None, - None, - None, - false, - self.plugins.clone(), - ); - let query_engine = query_engine_factory.query_engine(); - register_function_to_query_engine(&query_engine); let (tx, rx) = oneshot::channel(); @@ -303,3 +325,101 @@ impl FlownodeBuilder { Ok(man) } } + +pub struct FrontendInvoker { + inserter: Arc, + deleter: Arc, + statement_executor: Arc, +} + +impl FrontendInvoker { + pub fn new( + inserter: Arc, + deleter: Arc, + statement_executor: Arc, + ) -> Self { + Self { + inserter, + deleter, + statement_executor, + } + } + + pub async fn build_from( + flow_worker_manager: FlowWorkerManagerRef, + catalog_manager: CatalogManagerRef, + kv_backend: KvBackendRef, + layered_cache_registry: LayeredCacheRegistryRef, + procedure_executor: ProcedureExecutorRef, + node_manager: NodeManagerRef, + ) -> Result { + let table_route_cache: TableRouteCacheRef = + layered_cache_registry.get().context(CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; + + let partition_manager = Arc::new(PartitionRuleManager::new( + kv_backend.clone(), + table_route_cache.clone(), + )); + + let table_flownode_cache: TableFlownodeSetCacheRef = + layered_cache_registry.get().context(CacheRequiredSnafu { + name: TABLE_FLOWNODE_SET_CACHE_NAME, + })?; + + let inserter = Arc::new(Inserter::new( + catalog_manager.clone(), + partition_manager.clone(), + node_manager.clone(), + table_flownode_cache, + )); + + let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager.clone(), + node_manager.clone(), + )); + + let query_engine = flow_worker_manager.query_engine.clone(); + + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + procedure_executor.clone(), + kv_backend.clone(), + layered_cache_registry.clone(), + inserter.clone(), + table_route_cache, + )); + + let invoker = FrontendInvoker::new(inserter, deleter, statement_executor); + Ok(invoker) + } +} + +impl FrontendInvoker { + pub async fn row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> common_frontend::error::Result { + self.inserter + .handle_row_inserts(requests, ctx, &self.statement_executor) + .await + .map_err(BoxedError::new) + .context(common_frontend::error::ExternalSnafu) + } + + pub async fn row_deletes( + &self, + requests: RowDeleteRequests, + ctx: QueryContextRef, + ) -> common_frontend::error::Result { + self.deleter + .handle_row_deletes(requests, ctx) + .await + .map_err(BoxedError::new) + .context(common_frontend::error::ExternalSnafu) + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 6ff95daa7e..cf2d8b8e83 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -25,7 +25,6 @@ pub mod standalone; use std::sync::Arc; -use api::v1::{RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::CatalogManagerRef; @@ -33,7 +32,6 @@ use client::OutputData; use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; -use common_frontend::handler::FrontendInvoker; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; @@ -194,33 +192,6 @@ impl Instance { } } -#[async_trait] -impl FrontendInvoker for Instance { - async fn row_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> common_frontend::error::Result { - self.inserter - .handle_row_inserts(requests, ctx, &self.statement_executor) - .await - .map_err(BoxedError::new) - .context(common_frontend::error::ExternalSnafu) - } - - async fn row_deletes( - &self, - requests: RowDeleteRequests, - ctx: QueryContextRef, - ) -> common_frontend::error::Result { - self.deleter - .handle_row_deletes(requests, ctx) - .await - .map_err(BoxedError::new) - .context(common_frontend::error::ExternalSnafu) - } -} - #[async_trait] impl FrontendInstance for Instance { async fn start(&self) -> Result<()> { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 840fec70a8..7c2a6ed923 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -105,6 +105,7 @@ impl MetaClientBuilder { Self::new(cluster_id, member_id, Role::Flownode) .enable_store() .enable_heartbeat() + .enable_procedure() } pub fn enable_heartbeat(self) -> Self { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 8a05ff81be..9de4e59498 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::KvBackendCatalogManager; +use cmd::error::StartFlownodeSnafu; use cmd::standalone::StandaloneOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; @@ -40,6 +41,7 @@ use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use servers::Mode; +use snafu::ResultExt; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -207,21 +209,30 @@ impl GreptimeDbStandaloneBuilder { let instance = FrontendBuilder::new( opts.frontend_options(), kv_backend.clone(), - cache_registry, - catalog_manager, - node_manager, - ddl_task_executor, + cache_registry.clone(), + catalog_manager.clone(), + node_manager.clone(), + ddl_task_executor.clone(), ) .with_plugin(plugins) .try_build() .await .unwrap(); - let flow_manager = flownode.flow_worker_manager(); - flow_manager - .set_frontend_invoker(Box::new(instance.clone())) - .await; - let _node_handle = flow_manager.run_background(); + let flow_worker_manager = flownode.flow_worker_manager(); + let invoker = flow::FrontendInvoker::build_from( + flow_worker_manager.clone(), + catalog_manager.clone(), + kv_backend.clone(), + cache_registry.clone(), + ddl_task_executor.clone(), + node_manager.clone(), + ) + .await + .context(StartFlownodeSnafu) + .unwrap(); + + flow_worker_manager.set_frontend_invoker(invoker).await; procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap(); diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 97791c5923..b0d4c733f0 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -327,7 +327,6 @@ impl Env { args.push(format!("--rpc-addr=127.0.0.1:680{id}")); args.push(format!("--node-id={id}")); args.push("--metasrv-addrs=127.0.0.1:3002".to_string()); - args.push("--frontend-addr=http://127.0.0.1:4001".to_string()); (args, format!("127.0.0.1:680{id}")) }