feat: flownode use Inserter to write to database (#4323)

* feat: use `Inserter` as Frontend

* fix: enable procedure in flownode

* docs: remove `frontend_addr` opts

* chore: rm fe addr in test runner

* refactor: int test also use inserter invoker

* feat: flow shutdown&refactor: remove `Frontendinvoker`

* refactor: rename `RemoteFrontendInvoker` to `FrontendInvoker`

* refactor: per review

* refactor: remove a layer of  box

* fix: standalone use `node_manager`

* fix: remove a `Arc` cycle
This commit is contained in:
discord9
2024-07-09 18:44:22 +08:00
committed by GitHub
parent 185953e586
commit 1ddf19d886
17 changed files with 287 additions and 280 deletions

3
Cargo.lock generated
View File

@@ -3763,6 +3763,7 @@ dependencies = [
"async-recursion",
"async-trait",
"bytes",
"cache",
"catalog",
"client",
"common-base",
@@ -3796,6 +3797,8 @@ dependencies = [
"minstant",
"nom",
"num-traits",
"operator",
"partition",
"pretty_assertions",
"prost 0.12.6",
"query",

View File

@@ -454,7 +454,6 @@
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | `None` | The flownode identifier and should be unique in the cluster. |
| `frontend_addr` | String | `http://127.0.0.1:4001` | Frontend grpc address. Used by flownode to write result back to frontend. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |

View File

@@ -5,9 +5,6 @@ mode = "distributed"
## +toml2docs:none-default
node_id = 14
## Frontend grpc address. Used by flownode to write result back to frontend.
frontend_addr = "http://127.0.0.1:4001"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.

View File

@@ -17,8 +17,10 @@ use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
@@ -26,17 +28,16 @@ use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tonic::transport::Endpoint;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, TonicTransportSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
@@ -132,10 +133,6 @@ struct StartCommand {
/// Metasrv address list;
#[clap(long, value_delimiter = ',', num_args = 1..)]
metasrv_addrs: Option<Vec<String>>,
/// The gprc address of the frontend server used for writing results back to the database.
/// Need prefix i.e. "http://"
#[clap(long)]
frontend_addr: Option<String>,
/// The configuration file for flownode
#[clap(short, long)]
config_file: Option<String>,
@@ -186,10 +183,6 @@ impl StartCommand {
opts.grpc.hostname.clone_from(hostname);
}
if let Some(fe_addr) = &self.frontend_addr {
opts.frontend_addr = Some(fe_addr.clone());
}
if let Some(node_id) = self.node_id {
opts.node_id = Some(node_id);
}
@@ -228,10 +221,6 @@ impl StartCommand {
let opts = opts.component;
let frontend_addr = opts.frontend_addr.clone().context(MissingConfigSnafu {
msg: "'frontend_addr'",
})?;
// TODO(discord9): make it not optionale after cluster id is required
let cluster_id = opts.cluster_id.unwrap_or(0);
@@ -286,7 +275,8 @@ impl StartCommand {
layered_cache_registry.clone(),
);
let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend));
let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
table_metadata_manager
.init()
.await
@@ -310,26 +300,33 @@ impl StartCommand {
opts,
Plugins::new(),
table_metadata_manager,
catalog_manager,
catalog_manager.clone(),
)
.with_heartbeat_task(heartbeat_task);
let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
// set up the lazy connection to the frontend server
// TODO(discord9): consider move this to start() or pre_start()?
let endpoint =
Endpoint::from_shared(frontend_addr.clone()).context(TonicTransportSnafu {
msg: Some(format!("Fail to create from addr={}", frontend_addr)),
})?;
let chnl = endpoint.connect().await.context(TonicTransportSnafu {
msg: Some("Fail to connect to frontend".to_string()),
})?;
info!("Connected to frontend server: {:?}", frontend_addr);
let client = flow::FrontendClient::new(chnl);
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));
let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(),
catalog_manager.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
meta_client.clone(),
client,
)
.await
.context(StartFlownodeSnafu)?;
flownode
.flow_worker_manager()
.set_frontend_invoker(Box::new(client))
.set_frontend_invoker(invoker)
.await;
Ok(Instance::new(flownode, guard))

View File

@@ -44,7 +44,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeBuilder;
use flow::{FlowWorkerManager, FlownodeBuilder, FrontendInvoker};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -61,13 +61,15 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::broadcast;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
@@ -214,6 +216,9 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
// TODO(discord9): wrapped it in flownode instance instead
flow_worker_manager: Arc<FlowWorkerManager>,
flow_shutdown: broadcast::Sender<()>,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
@@ -245,6 +250,9 @@ impl App for Instance {
.context(StartFrontendSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)?;
self.flow_worker_manager
.clone()
.run_background(Some(self.flow_shutdown.subscribe()));
Ok(())
}
@@ -263,6 +271,15 @@ impl App for Instance {
.shutdown()
.await
.context(ShutdownDatanodeSnafu)?;
self.flow_shutdown
.send(())
.map_err(|_e| {
flow::error::InternalSnafu {
reason: "Failed to send shutdown signal to flow worker manager, all receiver end already closed".to_string(),
}
.build()
})
.context(ShutdownFlownodeSnafu)?;
info!("Datanode instance stopped.");
Ok(())
@@ -447,6 +464,12 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let flow_builder = FlownodeBuilder::new(
Default::default(),
fe_plugins.clone(),
@@ -461,12 +484,6 @@ impl StartCommand {
.context(OtherSnafu)?,
);
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
@@ -510,24 +527,32 @@ impl StartCommand {
let mut frontend = FrontendBuilder::new(
fe_opts.clone(),
kv_backend,
layered_cache_registry,
catalog_manager,
node_manager,
ddl_task_executor,
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
// flow server need to be able to use frontend to write insert requests back
let flow_worker_manager = flownode.flow_worker_manager();
flow_worker_manager
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
// TODO(discord9): unify with adding `start` and `shutdown` method to flownode too.
let _handle = flow_worker_manager.run_background();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(),
catalog_manager.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
ddl_task_executor.clone(),
node_manager,
)
.await
.context(StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;
let (tx, _rx) = broadcast::channel(1);
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins)
.build()
@@ -540,6 +565,8 @@ impl StartCommand {
Ok(Instance {
datanode,
frontend,
flow_worker_manager,
flow_shutdown: tx,
procedure_manager,
wal_options_allocator,
_guard: guard,

View File

@@ -1,38 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use common_query::Output;
use session::context::QueryContextRef;
use crate::error::Result;
/// [FrontendInvoker] provides the ability to:
/// - Insert rows
/// - Delete rows
#[async_trait]
pub trait FrontendInvoker {
async fn row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output>;
async fn row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> Result<Output>;
}

View File

@@ -13,4 +13,3 @@
// limitations under the License.
pub mod error;
pub mod handler;

View File

@@ -13,6 +13,7 @@ arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true
@@ -47,6 +48,8 @@ meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
operator.workspace = true
partition.workspace = true
prost.workspace = true
query.workspace = true
serde.workspace = true

View File

@@ -23,7 +23,6 @@ use std::time::{Instant, SystemTime};
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
@@ -42,7 +41,8 @@ use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{watch, Mutex, RwLock};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
@@ -65,6 +65,7 @@ pub(crate) mod node_context;
mod table_source;
use crate::error::Error;
use crate::FrontendInvoker;
// TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
@@ -84,7 +85,6 @@ pub struct FlownodeOptions {
pub cluster_id: Option<u64>,
pub node_id: Option<u64>,
pub grpc: GrpcOptions,
pub frontend_addr: Option<String>,
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
@@ -98,7 +98,6 @@ impl Default for FlownodeOptions {
cluster_id: None,
node_id: None,
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
frontend_addr: None,
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
@@ -120,10 +119,10 @@ pub struct FlowWorkerManager {
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
query_engine: Arc<dyn QueryEngine>,
pub query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
frontend_invoker: RwLock<Option<FrontendInvoker>>,
/// contains mapping from table name to global id, and table schema
node_context: RwLock<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
@@ -135,7 +134,7 @@ pub struct FlowWorkerManager {
/// Building FlownodeManager
impl FlowWorkerManager {
/// set frontend invoker
pub async fn set_frontend_invoker(&self, frontend: Box<dyn FrontendInvoker + Send + Sync>) {
pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) {
*self.frontend_invoker.write().await = Some(frontend);
}
@@ -460,11 +459,14 @@ impl FlowWorkerManager {
/// Flow Runtime related methods
impl FlowWorkerManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
pub fn run_background(
self: Arc<Self>,
shutdown: Option<broadcast::Receiver<()>>,
) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
// TODO(discord9): add heartbeat tasks here
common_runtime::spawn_bg(async move {
self.run().await;
self.run(shutdown).await;
})
}
@@ -485,7 +487,7 @@ impl FlowWorkerManager {
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
debug!("Starting to run");
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
@@ -497,8 +499,28 @@ impl FlowWorkerManager {
common_telemetry::error!(err;"Send writeback request errors");
};
self.log_all_errors().await;
match &shutdown.as_mut().map(|s| s.try_recv()) {
Some(Ok(())) => {
info!("Shutdown flow's main loop");
break;
}
Some(Err(TryRecvError::Empty)) => (),
Some(Err(TryRecvError::Closed)) => {
common_telemetry::error!("Shutdown channel is closed");
break;
}
Some(Err(TryRecvError::Lagged(num))) => {
common_telemetry::error!("Shutdown channel is lagged by {}, meaning multiple shutdown cmd have been issued", num);
break;
}
None => (),
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
// FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
// -> Inserter.node_manager -> NodeManager.flownode -> Flownode.flow_worker_manager.frontend_invoker
self.frontend_invoker.write().await.take();
}
/// Run all available subgraph in the flow node

View File

@@ -190,6 +190,13 @@ pub enum Error {
#[snafu(source)]
error: std::net::AddrParseError,
},
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
}
/// Result type for flow module
@@ -216,7 +223,7 @@ impl ErrorExt for Error {
StatusCode::Unsupported
}
Self::External { source, .. } => source.status_code(),
Self::Internal { .. } => StatusCode::Internal,
Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
source.status_code()
}

View File

@@ -1,109 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend Client for flownode, used for writing result back to database
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::{
GreptimeRequest, GreptimeResponse, RequestHeader, RowDeleteRequests, RowInsertRequests,
};
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_query::Output;
use common_telemetry::tracing_context::{TracingContext, W3cTrace};
use session::context::{QueryContext, QueryContextRef};
use snafu::IntoError;
use tokio::sync::Mutex;
use crate::{Error, Result};
/// Frontend client for writing result back to database
pub struct FrontendClient {
client: GreptimeDatabaseClient<tonic::transport::Channel>,
}
impl FrontendClient {
pub fn new(channel: tonic::transport::Channel) -> Self {
Self {
client: GreptimeDatabaseClient::new(channel),
}
}
}
fn to_rpc_request(request: Request, ctx: &QueryContext) -> GreptimeRequest {
let header = RequestHeader {
catalog: ctx.current_catalog().to_string(),
schema: ctx.current_schema().to_string(),
authorization: None,
// dbname is empty so that header use catalog+schema to determine the database
// see `create_query_context` in `greptime_handler.rs`
dbname: "".to_string(),
timezone: ctx.timezone().to_string(),
tracing_context: TracingContext::from_current_span().to_w3c(),
};
GreptimeRequest {
header: Some(header),
request: Some(request),
}
}
fn from_rpc_error(e: tonic::Status) -> common_frontend::error::Error {
common_frontend::error::ExternalSnafu {}
.into_error(BoxedError::new(client::error::Error::from(e)))
}
fn resp_to_output(resp: GreptimeResponse) -> Output {
let affect_rows = resp
.response
.map(|r| match r {
api::v1::greptime_response::Response::AffectedRows(r) => r.value,
})
.unwrap_or(0);
Output::new_with_affected_rows(affect_rows as usize)
}
#[async_trait::async_trait]
impl FrontendInvoker for FrontendClient {
async fn row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let req = to_rpc_request(Request::RowInserts(requests), &ctx);
let resp = self
.client
.clone()
.handle(req)
.await
.map_err(from_rpc_error)?;
Ok(resp_to_output(resp.into_inner()))
}
async fn row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let req = to_rpc_request(Request::RowDeletes(requests), &ctx);
let resp = self
.client
.clone()
.handle(req)
.await
.map_err(from_rpc_error)?;
Ok(resp_to_output(resp.into_inner()))
}
}

View File

@@ -25,9 +25,8 @@
// allow unused for now because it should be use later
mod adapter;
mod compute;
mod error;
pub mod error;
mod expr;
mod fe_client;
pub mod heartbeat;
mod plan;
mod repr;
@@ -37,5 +36,4 @@ mod utils;
pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use fe_client::FrontendClient;
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer};
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};

View File

@@ -17,37 +17,53 @@
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_meta::ddl::table_meta;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{
LayeredCacheRegistry, LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef,
};
use common_meta::ddl::{table_meta, ProcedureExecutorRef};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::Flownode;
use common_meta::node_manager::{self, Flownode, NodeManagerRef};
use common_query::Output;
use common_telemetry::tracing::info;
use futures::FutureExt;
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
use meta_client::client::MetaClient;
use query::QueryEngineFactory;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
use serde::de::Unexpected;
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::Server;
use snafu::{ensure, ResultExt};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{oneshot, Mutex};
use tokio::sync::{broadcast, oneshot, Mutex};
use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlowWorkerManagerRef;
use crate::error::{ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu};
use crate::error::{
CacheRequiredSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::transform::register_function_to_query_engine;
use crate::{Error, FlowWorkerManager, FlownodeOptions};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)]
pub struct FlowService {
@@ -105,7 +121,7 @@ impl flow_server::Flow for FlowService {
}
pub struct FlownodeServer {
shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
shutdown_tx: Mutex<Option<broadcast::Sender<()>>>,
flow_service: FlowService,
}
@@ -142,7 +158,8 @@ impl servers::server::Server for FlownodeServer {
Ok(())
}
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
let (tx, rx) = oneshot::channel::<()>();
let (tx, rx) = broadcast::channel::<()>(1);
let mut rx_server = tx.subscribe();
let (incoming, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
@@ -163,15 +180,16 @@ impl servers::server::Server for FlownodeServer {
};
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
let _handle = common_runtime::spawn_bg(async move {
let _result = builder
.serve_with_incoming_shutdown(incoming, rx.map(drop))
.serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop))
.await
.context(StartGrpcSnafu);
});
let manager_ref = self.flow_service.manager.clone();
let _handle = manager_ref.clone().run_background();
let _handle = manager_ref.clone().run_background(Some(rx));
Ok(addr)
}
@@ -250,7 +268,20 @@ impl FlownodeBuilder {
}
pub async fn build(self) -> Result<FlownodeInstance, Error> {
let manager = Arc::new(self.build_manager().await?);
// TODO(discord9): does this query engine need those?
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode is only used for translate plan with resolved table source.
self.catalog_manager.clone(),
None,
None,
None,
false,
Default::default(),
);
let manager = Arc::new(
self.build_manager(query_engine_factory.query_engine())
.await?,
);
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let heartbeat_task = self.heartbeat_task;
@@ -266,21 +297,12 @@ impl FlownodeBuilder {
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(&self) -> Result<FlowWorkerManager, Error> {
let catalog_manager = self.catalog_manager.clone();
async fn build_manager(
&self,
query_engine: Arc<dyn QueryEngine>,
) -> Result<FlowWorkerManager, Error> {
let table_meta = self.table_meta.clone();
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode is only used for translate plan with resolved table source.
catalog_manager,
None,
None,
None,
false,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
register_function_to_query_engine(&query_engine);
let (tx, rx) = oneshot::channel();
@@ -303,3 +325,101 @@ impl FlownodeBuilder {
Ok(man)
}
}
pub struct FrontendInvoker {
inserter: Arc<Inserter>,
deleter: Arc<Deleter>,
statement_executor: Arc<StatementExecutor>,
}
impl FrontendInvoker {
pub fn new(
inserter: Arc<Inserter>,
deleter: Arc<Deleter>,
statement_executor: Arc<StatementExecutor>,
) -> Self {
Self {
inserter,
deleter,
statement_executor,
}
}
pub async fn build_from(
flow_worker_manager: FlowWorkerManagerRef,
catalog_manager: CatalogManagerRef,
kv_backend: KvBackendRef,
layered_cache_registry: LayeredCacheRegistryRef,
procedure_executor: ProcedureExecutorRef,
node_manager: NodeManagerRef,
) -> Result<FrontendInvoker, Error> {
let table_route_cache: TableRouteCacheRef =
layered_cache_registry.get().context(CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;
let partition_manager = Arc::new(PartitionRuleManager::new(
kv_backend.clone(),
table_route_cache.clone(),
));
let table_flownode_cache: TableFlownodeSetCacheRef =
layered_cache_registry.get().context(CacheRequiredSnafu {
name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?;
let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flownode_cache,
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
));
let query_engine = flow_worker_manager.query_engine.clone();
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
procedure_executor.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
inserter.clone(),
table_route_cache,
));
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
Ok(invoker)
}
}
impl FrontendInvoker {
pub async fn row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}
pub async fn row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
self.deleter
.handle_row_deletes(requests, ctx)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}
}

View File

@@ -25,7 +25,6 @@ pub mod standalone;
use std::sync::Arc;
use api::v1::{RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
@@ -33,7 +32,6 @@ use client::OutputData;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_frontend::handler::FrontendInvoker;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
@@ -194,33 +192,6 @@ impl Instance {
}
}
#[async_trait]
impl FrontendInvoker for Instance {
async fn row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}
async fn row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
self.deleter
.handle_row_deletes(requests, ctx)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)
}
}
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&self) -> Result<()> {

View File

@@ -105,6 +105,7 @@ impl MetaClientBuilder {
Self::new(cluster_id, member_id, Role::Flownode)
.enable_store()
.enable_heartbeat()
.enable_procedure()
}
pub fn enable_heartbeat(self) -> Self {

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::KvBackendCatalogManager;
use cmd::error::StartFlownodeSnafu;
use cmd::standalone::StandaloneOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
@@ -40,6 +41,7 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use servers::Mode;
use snafu::ResultExt;
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -207,21 +209,30 @@ impl GreptimeDbStandaloneBuilder {
let instance = FrontendBuilder::new(
opts.frontend_options(),
kv_backend.clone(),
cache_registry,
catalog_manager,
node_manager,
ddl_task_executor,
cache_registry.clone(),
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
let flow_manager = flownode.flow_worker_manager();
flow_manager
.set_frontend_invoker(Box::new(instance.clone()))
.await;
let _node_handle = flow_manager.run_background();
let flow_worker_manager = flownode.flow_worker_manager();
let invoker = flow::FrontendInvoker::build_from(
flow_worker_manager.clone(),
catalog_manager.clone(),
kv_backend.clone(),
cache_registry.clone(),
ddl_task_executor.clone(),
node_manager.clone(),
)
.await
.context(StartFlownodeSnafu)
.unwrap();
flow_worker_manager.set_frontend_invoker(invoker).await;
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();

View File

@@ -327,7 +327,6 @@ impl Env {
args.push(format!("--rpc-addr=127.0.0.1:680{id}"));
args.push(format!("--node-id={id}"));
args.push("--metasrv-addrs=127.0.0.1:3002".to_string());
args.push("--frontend-addr=http://127.0.0.1:4001".to_string());
(args, format!("127.0.0.1:680{id}"))
}