diff --git a/Cargo.lock b/Cargo.lock
index c1613f97f3..64c95c3cc0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1760,6 +1760,7 @@ dependencies = [
"tikv-jemallocator",
"tokio",
"toml 0.8.14",
+ "tonic 0.11.0",
"tracing-appender",
]
@@ -3756,6 +3757,7 @@ dependencies = [
"async-trait",
"bytes",
"catalog",
+ "client",
"common-base",
"common-catalog",
"common-config",
diff --git a/config/config-docs-template.md b/config/config-docs-template.md
index b70c20184d..81020133f8 100644
--- a/config/config-docs-template.md
+++ b/config/config-docs-template.md
@@ -1,10 +1,12 @@
# Configurations
-- [Standalone Mode](#standalone-mode)
-- [Distributed Mode](#distributed-mode)
+- [Configurations](#configurations)
+ - [Standalone Mode](#standalone-mode)
+ - [Distributed Mode](#distributed-mode)
- [Frontend](#frontend)
- [Metasrv](#metasrv)
- [Datanode](#datanode)
+ - [Flownode](#flownode)
## Standalone Mode
@@ -23,3 +25,7 @@
### Datanode
{{ toml2docs "./datanode.example.toml" }}
+
+### Flownode
+
+{{ toml2docs "./flownode.example.toml"}}
\ No newline at end of file
diff --git a/config/config.md b/config/config.md
index 32f34304c6..132fc7aff8 100644
--- a/config/config.md
+++ b/config/config.md
@@ -1,10 +1,12 @@
# Configurations
-- [Standalone Mode](#standalone-mode)
-- [Distributed Mode](#distributed-mode)
+- [Configurations](#configurations)
+ - [Standalone Mode](#standalone-mode)
+ - [Distributed Mode](#distributed-mode)
- [Frontend](#frontend)
- [Metasrv](#metasrv)
- [Datanode](#datanode)
+ - [Flownode](#flownode)
## Standalone Mode
@@ -434,3 +436,41 @@
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
+
+
+### Flownode
+
+| Key | Type | Default | Descriptions |
+| --- | -----| ------- | ----------- |
+| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
+| `node_id` | Integer | `None` | The flownode identifier and should be unique in the cluster. |
+| `frontend_addr` | String | `http://127.0.0.1:4001` | Frontend grpc address. Used by flownode to write result back to frontend. |
+| `grpc` | -- | -- | The gRPC server options. |
+| `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
+| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,
and used for connections from outside the host |
+| `grpc.runtime_size` | Integer | `2` | The number of server worker threads. |
+| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
+| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
+| `meta_client` | -- | -- | The metasrv client options. |
+| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
+| `meta_client.timeout` | String | `3s` | Operation timeout. |
+| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
+| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
+| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
+| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
+| `meta_client.metadata_cache_max_capacity` | Integer | `100000` | The configuration about the cache of the metadata. |
+| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
+| `meta_client.metadata_cache_tti` | String | `5m` | -- |
+| `heartbeat` | -- | -- | The heartbeat options. |
+| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
+| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
+| `logging` | -- | -- | The logging options. |
+| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
+| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
+| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
+| `logging.otlp_endpoint` | String | `None` | The OTLP tracing endpoint. |
+| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
+| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
+| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
+| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
+| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
new file mode 100644
index 0000000000..69bcd94a22
--- /dev/null
+++ b/config/flownode.example.toml
@@ -0,0 +1,93 @@
+## The running mode of the flownode. It can be `standalone` or `distributed`.
+mode = "distributed"
+
+## The flownode identifier and should be unique in the cluster.
+## +toml2docs:none-default
+node_id = 14
+
+## Frontend grpc address. Used by flownode to write result back to frontend.
+frontend_addr = "http://127.0.0.1:4001"
+
+## The gRPC server options.
+[grpc]
+## The address to bind the gRPC server.
+addr = "127.0.0.1:6800"
+## The hostname advertised to the metasrv,
+## and used for connections from outside the host
+hostname = "127.0.0.1"
+## The number of server worker threads.
+runtime_size = 2
+## The maximum receive message size for gRPC server.
+max_recv_message_size = "512MB"
+## The maximum send message size for gRPC server.
+max_send_message_size = "512MB"
+
+
+## The metasrv client options.
+[meta_client]
+## The addresses of the metasrv.
+metasrv_addrs = ["127.0.0.1:3002"]
+
+## Operation timeout.
+timeout = "3s"
+
+## Heartbeat timeout.
+heartbeat_timeout = "500ms"
+
+## DDL timeout.
+ddl_timeout = "10s"
+
+## Connect server timeout.
+connect_timeout = "1s"
+
+## `TCP_NODELAY` option for accepted connections.
+tcp_nodelay = true
+
+## The configuration about the cache of the metadata.
+metadata_cache_max_capacity = 100000
+
+## TTL of the metadata cache.
+metadata_cache_ttl = "10m"
+
+# TTI of the metadata cache.
+metadata_cache_tti = "5m"
+
+## The heartbeat options.
+[heartbeat]
+## Interval for sending heartbeat messages to the metasrv.
+interval = "3s"
+
+## Interval for retrying to send heartbeat messages to the metasrv.
+retry_interval = "3s"
+
+## The logging options.
+[logging]
+## The directory to store the log files.
+dir = "/tmp/greptimedb/logs"
+
+## The log level. Can be `info`/`debug`/`warn`/`error`.
+## +toml2docs:none-default
+level = "info"
+
+## Enable OTLP tracing.
+enable_otlp_tracing = false
+
+## The OTLP tracing endpoint.
+## +toml2docs:none-default
+otlp_endpoint = ""
+
+## Whether to append logs to stdout.
+append_stdout = true
+
+## The percentage of tracing will be sampled and exported.
+## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
+## ratio > 1 are treated as 1. Fractions < 0 are treated as 0
+[logging.tracing_sample_ratio]
+default_ratio = 1.0
+
+## The tracing options. Only effect when compiled with `tokio-console` feature.
+[tracing]
+## The tokio console address.
+## +toml2docs:none-default
+tokio_console_addr = "127.0.0.1"
+
diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml
index 9bc3d77564..6a0ef4a2f2 100644
--- a/src/cmd/Cargo.toml
+++ b/src/cmd/Cargo.toml
@@ -74,6 +74,7 @@ substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
+tonic.workspace = true
tracing-appender = "0.2"
[target.'cfg(not(windows))'.dependencies]
diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs
index 1bc3d0b8eb..2e4e252cc4 100644
--- a/src/cmd/src/error.rs
+++ b/src/cmd/src/error.rs
@@ -346,6 +346,15 @@ pub enum Error {
location: Location,
source: meta_client::error::Error,
},
+
+ #[snafu(display("Tonic transport error: {error:?} with msg: {msg:?}"))]
+ TonicTransport {
+ #[snafu(implicit)]
+ location: Location,
+ #[snafu(source)]
+ error: tonic::transport::Error,
+ msg: Option,
+ },
}
pub type Result = std::result::Result;
@@ -405,6 +414,7 @@ impl ErrorExt for Error {
source.status_code()
}
Error::MetaClientInit { source, .. } => source.status_code(),
+ Error::TonicTransport { .. } => StatusCode::Internal,
}
}
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index c0de61564a..8ad90b2f77 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -31,11 +31,12 @@ use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHa
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
+use tonic::transport::Endpoint;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
- MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
+ MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, TonicTransportSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
@@ -119,16 +120,26 @@ impl SubCommand {
#[derive(Debug, Parser, Default)]
struct StartCommand {
+ /// Flownode's id
#[clap(long)]
node_id: Option,
+ /// Bind address for the gRPC server.
#[clap(long)]
rpc_addr: Option,
+ /// Hostname for the gRPC server.
#[clap(long)]
rpc_hostname: Option,
+ /// Metasrv address list;
#[clap(long, value_delimiter = ',', num_args = 1..)]
metasrv_addrs: Option>,
+ /// The gprc address of the frontend server used for writing results back to the database.
+ /// Need prefix i.e. "http://"
+ #[clap(long)]
+ frontend_addr: Option,
+ /// The configuration file for flownode
#[clap(short, long)]
config_file: Option,
+ /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
#[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
env_prefix: String,
}
@@ -175,6 +186,10 @@ impl StartCommand {
opts.grpc.hostname.clone_from(hostname);
}
+ if let Some(fe_addr) = &self.frontend_addr {
+ opts.frontend_addr = Some(fe_addr.clone());
+ }
+
if let Some(node_id) = self.node_id {
opts.node_id = Some(node_id);
}
@@ -213,10 +228,13 @@ impl StartCommand {
let opts = opts.component;
- let cluster_id = opts.cluster_id.context(MissingConfigSnafu {
- msg: "'cluster_id'",
+ let frontend_addr = opts.frontend_addr.clone().context(MissingConfigSnafu {
+ msg: "'frontend_addr'",
})?;
+ // TODO(discord9): make it not optionale after cluster id is required
+ let cluster_id = opts.cluster_id.unwrap_or(0);
+
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -298,6 +316,22 @@ impl StartCommand {
let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
+ // set up the lazy connection to the frontend server
+ // TODO(discord9): consider move this to start() or pre_start()?
+ let endpoint =
+ Endpoint::from_shared(frontend_addr.clone()).context(TonicTransportSnafu {
+ msg: Some(format!("Fail to create from addr={}", frontend_addr)),
+ })?;
+ let chnl = endpoint.connect().await.context(TonicTransportSnafu {
+ msg: Some("Fail to connect to frontend".to_string()),
+ })?;
+ info!("Connected to frontend server: {:?}", frontend_addr);
+ let client = flow::FrontendClient::new(chnl);
+ flownode
+ .flow_worker_manager()
+ .set_frontend_invoker(Box::new(client))
+ .await;
+
Ok(Instance::new(flownode, guard))
}
}
diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml
index 395f9cf075..b7aefb6204 100644
--- a/src/flow/Cargo.toml
+++ b/src/flow/Cargo.toml
@@ -14,6 +14,7 @@ async-recursion = "1.0"
async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
+client.workspace = true
common-base.workspace = true
common-config.workspace = true
common-decimal.workspace = true
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index c9d1ca570a..00b071f474 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -84,6 +84,7 @@ pub struct FlownodeOptions {
pub cluster_id: Option,
pub node_id: Option,
pub grpc: GrpcOptions,
+ pub frontend_addr: Option,
pub meta_client: Option,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
@@ -97,6 +98,7 @@ impl Default for FlownodeOptions {
cluster_id: None,
node_id: None,
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
+ frontend_addr: None,
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
@@ -133,10 +135,7 @@ pub struct FlowWorkerManager {
/// Building FlownodeManager
impl FlowWorkerManager {
/// set frontend invoker
- pub async fn set_frontend_invoker(
- self: &Arc,
- frontend: Box,
- ) {
+ pub async fn set_frontend_invoker(&self, frontend: Box) {
*self.frontend_invoker.write().await = Some(frontend);
}
diff --git a/src/flow/src/fe_client.rs b/src/flow/src/fe_client.rs
new file mode 100644
index 0000000000..2eac6853e8
--- /dev/null
+++ b/src/flow/src/fe_client.rs
@@ -0,0 +1,109 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Frontend Client for flownode, used for writing result back to database
+
+use api::v1::greptime_database_client::GreptimeDatabaseClient;
+use api::v1::greptime_request::Request;
+use api::v1::{
+ GreptimeRequest, GreptimeResponse, RequestHeader, RowDeleteRequests, RowInsertRequests,
+};
+use common_error::ext::BoxedError;
+use common_frontend::handler::FrontendInvoker;
+use common_query::Output;
+use common_telemetry::tracing_context::{TracingContext, W3cTrace};
+use session::context::{QueryContext, QueryContextRef};
+use snafu::IntoError;
+use tokio::sync::Mutex;
+
+use crate::{Error, Result};
+
+/// Frontend client for writing result back to database
+pub struct FrontendClient {
+ client: GreptimeDatabaseClient,
+}
+
+impl FrontendClient {
+ pub fn new(channel: tonic::transport::Channel) -> Self {
+ Self {
+ client: GreptimeDatabaseClient::new(channel),
+ }
+ }
+}
+
+fn to_rpc_request(request: Request, ctx: &QueryContext) -> GreptimeRequest {
+ let header = RequestHeader {
+ catalog: ctx.current_catalog().to_string(),
+ schema: ctx.current_schema().to_string(),
+ authorization: None,
+ // dbname is empty so that header use catalog+schema to determine the database
+ // see `create_query_context` in `greptime_handler.rs`
+ dbname: "".to_string(),
+ timezone: ctx.timezone().to_string(),
+ tracing_context: TracingContext::from_current_span().to_w3c(),
+ };
+ GreptimeRequest {
+ header: Some(header),
+ request: Some(request),
+ }
+}
+
+fn from_rpc_error(e: tonic::Status) -> common_frontend::error::Error {
+ common_frontend::error::ExternalSnafu {}
+ .into_error(BoxedError::new(client::error::Error::from(e)))
+}
+
+fn resp_to_output(resp: GreptimeResponse) -> Output {
+ let affect_rows = resp
+ .response
+ .map(|r| match r {
+ api::v1::greptime_response::Response::AffectedRows(r) => r.value,
+ })
+ .unwrap_or(0);
+
+ Output::new_with_affected_rows(affect_rows as usize)
+}
+
+#[async_trait::async_trait]
+impl FrontendInvoker for FrontendClient {
+ async fn row_inserts(
+ &self,
+ requests: RowInsertRequests,
+ ctx: QueryContextRef,
+ ) -> common_frontend::error::Result