feat: make flow distributed work&tests (#4256)

feat: flownode frontend client&test

feat: Frontend Client

feat: set frontend invoker for flownode

feat: set frontend invoker for flownode

chore: test script

WIP: test flow distributed

feat: hard coded demo

docs: flownode example toml

feat: add flownode support in runner

docs: comments for node

chore: after rebase

docs: add a todo

tests: move flow tests to common

fix: flownode sqlness dist test

chore: per review

docs: make

fix: make doc
This commit is contained in:
discord9
2024-07-05 22:46:44 +08:00
committed by GitHub
parent 30b65ca99e
commit 09fff24ac4
27 changed files with 373 additions and 24 deletions

2
Cargo.lock generated
View File

@@ -1760,6 +1760,7 @@ dependencies = [
"tikv-jemallocator", "tikv-jemallocator",
"tokio", "tokio",
"toml 0.8.14", "toml 0.8.14",
"tonic 0.11.0",
"tracing-appender", "tracing-appender",
] ]
@@ -3756,6 +3757,7 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"catalog", "catalog",
"client",
"common-base", "common-base",
"common-catalog", "common-catalog",
"common-config", "common-config",

View File

@@ -1,10 +1,12 @@
# Configurations # Configurations
- [Standalone Mode](#standalone-mode) - [Configurations](#configurations)
- [Distributed Mode](#distributed-mode) - [Standalone Mode](#standalone-mode)
- [Distributed Mode](#distributed-mode)
- [Frontend](#frontend) - [Frontend](#frontend)
- [Metasrv](#metasrv) - [Metasrv](#metasrv)
- [Datanode](#datanode) - [Datanode](#datanode)
- [Flownode](#flownode)
## Standalone Mode ## Standalone Mode
@@ -23,3 +25,7 @@
### Datanode ### Datanode
{{ toml2docs "./datanode.example.toml" }} {{ toml2docs "./datanode.example.toml" }}
### Flownode
{{ toml2docs "./flownode.example.toml"}}

View File

@@ -1,10 +1,12 @@
# Configurations # Configurations
- [Standalone Mode](#standalone-mode) - [Configurations](#configurations)
- [Distributed Mode](#distributed-mode) - [Standalone Mode](#standalone-mode)
- [Distributed Mode](#distributed-mode)
- [Frontend](#frontend) - [Frontend](#frontend)
- [Metasrv](#metasrv) - [Metasrv](#metasrv)
- [Datanode](#datanode) - [Datanode](#datanode)
- [Flownode](#flownode)
## Standalone Mode ## Standalone Mode
@@ -434,3 +436,41 @@
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. | | `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. | | `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
### Flownode
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | `None` | The flownode identifier and should be unique in the cluster. |
| `frontend_addr` | String | `http://127.0.0.1:4001` | Frontend grpc address. Used by flownode to write result back to frontend. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
| `grpc.runtime_size` | Integer | `2` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
| `meta_client.metadata_cache_max_capacity` | Integer | `100000` | The configuration about the cache of the metadata. |
| `meta_client.metadata_cache_ttl` | String | `10m` | TTL of the metadata cache. |
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `None` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |

View File

@@ -0,0 +1,93 @@
## The running mode of the flownode. It can be `standalone` or `distributed`.
mode = "distributed"
## The flownode identifier and should be unique in the cluster.
## +toml2docs:none-default
node_id = 14
## Frontend grpc address. Used by flownode to write result back to frontend.
frontend_addr = "http://127.0.0.1:4001"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:6800"
## The hostname advertised to the metasrv,
## and used for connections from outside the host
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 2
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The metasrv client options.
[meta_client]
## The addresses of the metasrv.
metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"
## Connect server timeout.
connect_timeout = "1s"
## `TCP_NODELAY` option for accepted connections.
tcp_nodelay = true
## The configuration about the cache of the metadata.
metadata_cache_max_capacity = 100000
## TTL of the metadata cache.
metadata_cache_ttl = "10m"
# TTI of the metadata cache.
metadata_cache_tti = "5m"
## The heartbeat options.
[heartbeat]
## Interval for sending heartbeat messages to the metasrv.
interval = "3s"
## Interval for retrying to send heartbeat messages to the metasrv.
retry_interval = "3s"
## The logging options.
[logging]
## The directory to store the log files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## +toml2docs:none-default
level = "info"
## Enable OTLP tracing.
enable_otlp_tracing = false
## The OTLP tracing endpoint.
## +toml2docs:none-default
otlp_endpoint = ""
## Whether to append logs to stdout.
append_stdout = true
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0
[logging.tracing_sample_ratio]
default_ratio = 1.0
## The tracing options. Only effect when compiled with `tokio-console` feature.
[tracing]
## The tokio console address.
## +toml2docs:none-default
tokio_console_addr = "127.0.0.1"

View File

@@ -74,6 +74,7 @@ substrait.workspace = true
table.workspace = true table.workspace = true
tokio.workspace = true tokio.workspace = true
toml.workspace = true toml.workspace = true
tonic.workspace = true
tracing-appender = "0.2" tracing-appender = "0.2"
[target.'cfg(not(windows))'.dependencies] [target.'cfg(not(windows))'.dependencies]

View File

@@ -346,6 +346,15 @@ pub enum Error {
location: Location, location: Location,
source: meta_client::error::Error, source: meta_client::error::Error,
}, },
#[snafu(display("Tonic transport error: {error:?} with msg: {msg:?}"))]
TonicTransport {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tonic::transport::Error,
msg: Option<String>,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -405,6 +414,7 @@ impl ErrorExt for Error {
source.status_code() source.status_code()
} }
Error::MetaClientInit { source, .. } => source.status_code(), Error::MetaClientInit { source, .. } => source.status_code(),
Error::TonicTransport { .. } => StatusCode::Internal,
} }
} }

View File

@@ -31,11 +31,12 @@ use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHa
use meta_client::{MetaClientOptions, MetaClientType}; use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode; use servers::Mode;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use tonic::transport::Endpoint;
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{ use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, TonicTransportSnafu,
}; };
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App}; use crate::{log_versions, App};
@@ -119,16 +120,26 @@ impl SubCommand {
#[derive(Debug, Parser, Default)] #[derive(Debug, Parser, Default)]
struct StartCommand { struct StartCommand {
/// Flownode's id
#[clap(long)] #[clap(long)]
node_id: Option<u64>, node_id: Option<u64>,
/// Bind address for the gRPC server.
#[clap(long)] #[clap(long)]
rpc_addr: Option<String>, rpc_addr: Option<String>,
/// Hostname for the gRPC server.
#[clap(long)] #[clap(long)]
rpc_hostname: Option<String>, rpc_hostname: Option<String>,
/// Metasrv address list;
#[clap(long, value_delimiter = ',', num_args = 1..)] #[clap(long, value_delimiter = ',', num_args = 1..)]
metasrv_addrs: Option<Vec<String>>, metasrv_addrs: Option<Vec<String>>,
/// The gprc address of the frontend server used for writing results back to the database.
/// Need prefix i.e. "http://"
#[clap(long)]
frontend_addr: Option<String>,
/// The configuration file for flownode
#[clap(short, long)] #[clap(short, long)]
config_file: Option<String>, config_file: Option<String>,
/// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
#[clap(long, default_value = "GREPTIMEDB_FLOWNODE")] #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
env_prefix: String, env_prefix: String,
} }
@@ -175,6 +186,10 @@ impl StartCommand {
opts.grpc.hostname.clone_from(hostname); opts.grpc.hostname.clone_from(hostname);
} }
if let Some(fe_addr) = &self.frontend_addr {
opts.frontend_addr = Some(fe_addr.clone());
}
if let Some(node_id) = self.node_id { if let Some(node_id) = self.node_id {
opts.node_id = Some(node_id); opts.node_id = Some(node_id);
} }
@@ -213,10 +228,13 @@ impl StartCommand {
let opts = opts.component; let opts = opts.component;
let cluster_id = opts.cluster_id.context(MissingConfigSnafu { let frontend_addr = opts.frontend_addr.clone().context(MissingConfigSnafu {
msg: "'cluster_id'", msg: "'frontend_addr'",
})?; })?;
// TODO(discord9): make it not optionale after cluster id is required
let cluster_id = opts.cluster_id.unwrap_or(0);
let member_id = opts let member_id = opts
.node_id .node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?; .context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -298,6 +316,22 @@ impl StartCommand {
let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
// set up the lazy connection to the frontend server
// TODO(discord9): consider move this to start() or pre_start()?
let endpoint =
Endpoint::from_shared(frontend_addr.clone()).context(TonicTransportSnafu {
msg: Some(format!("Fail to create from addr={}", frontend_addr)),
})?;
let chnl = endpoint.connect().await.context(TonicTransportSnafu {
msg: Some("Fail to connect to frontend".to_string()),
})?;
info!("Connected to frontend server: {:?}", frontend_addr);
let client = flow::FrontendClient::new(chnl);
flownode
.flow_worker_manager()
.set_frontend_invoker(Box::new(client))
.await;
Ok(Instance::new(flownode, guard)) Ok(Instance::new(flownode, guard))
} }
} }

View File

@@ -14,6 +14,7 @@ async-recursion = "1.0"
async-trait.workspace = true async-trait.workspace = true
bytes.workspace = true bytes.workspace = true
catalog.workspace = true catalog.workspace = true
client.workspace = true
common-base.workspace = true common-base.workspace = true
common-config.workspace = true common-config.workspace = true
common-decimal.workspace = true common-decimal.workspace = true

View File

@@ -84,6 +84,7 @@ pub struct FlownodeOptions {
pub cluster_id: Option<u64>, pub cluster_id: Option<u64>,
pub node_id: Option<u64>, pub node_id: Option<u64>,
pub grpc: GrpcOptions, pub grpc: GrpcOptions,
pub frontend_addr: Option<String>,
pub meta_client: Option<MetaClientOptions>, pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions, pub logging: LoggingOptions,
pub tracing: TracingOptions, pub tracing: TracingOptions,
@@ -97,6 +98,7 @@ impl Default for FlownodeOptions {
cluster_id: None, cluster_id: None,
node_id: None, node_id: None,
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"), grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
frontend_addr: None,
meta_client: None, meta_client: None,
logging: LoggingOptions::default(), logging: LoggingOptions::default(),
tracing: TracingOptions::default(), tracing: TracingOptions::default(),
@@ -133,10 +135,7 @@ pub struct FlowWorkerManager {
/// Building FlownodeManager /// Building FlownodeManager
impl FlowWorkerManager { impl FlowWorkerManager {
/// set frontend invoker /// set frontend invoker
pub async fn set_frontend_invoker( pub async fn set_frontend_invoker(&self, frontend: Box<dyn FrontendInvoker + Send + Sync>) {
self: &Arc<Self>,
frontend: Box<dyn FrontendInvoker + Send + Sync>,
) {
*self.frontend_invoker.write().await = Some(frontend); *self.frontend_invoker.write().await = Some(frontend);
} }

109
src/flow/src/fe_client.rs Normal file
View File

@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend Client for flownode, used for writing result back to database
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::{
GreptimeRequest, GreptimeResponse, RequestHeader, RowDeleteRequests, RowInsertRequests,
};
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_query::Output;
use common_telemetry::tracing_context::{TracingContext, W3cTrace};
use session::context::{QueryContext, QueryContextRef};
use snafu::IntoError;
use tokio::sync::Mutex;
use crate::{Error, Result};
/// Frontend client for writing result back to database
pub struct FrontendClient {
client: GreptimeDatabaseClient<tonic::transport::Channel>,
}
impl FrontendClient {
pub fn new(channel: tonic::transport::Channel) -> Self {
Self {
client: GreptimeDatabaseClient::new(channel),
}
}
}
fn to_rpc_request(request: Request, ctx: &QueryContext) -> GreptimeRequest {
let header = RequestHeader {
catalog: ctx.current_catalog().to_string(),
schema: ctx.current_schema().to_string(),
authorization: None,
// dbname is empty so that header use catalog+schema to determine the database
// see `create_query_context` in `greptime_handler.rs`
dbname: "".to_string(),
timezone: ctx.timezone().to_string(),
tracing_context: TracingContext::from_current_span().to_w3c(),
};
GreptimeRequest {
header: Some(header),
request: Some(request),
}
}
fn from_rpc_error(e: tonic::Status) -> common_frontend::error::Error {
common_frontend::error::ExternalSnafu {}
.into_error(BoxedError::new(client::error::Error::from(e)))
}
fn resp_to_output(resp: GreptimeResponse) -> Output {
let affect_rows = resp
.response
.map(|r| match r {
api::v1::greptime_response::Response::AffectedRows(r) => r.value,
})
.unwrap_or(0);
Output::new_with_affected_rows(affect_rows as usize)
}
#[async_trait::async_trait]
impl FrontendInvoker for FrontendClient {
async fn row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let req = to_rpc_request(Request::RowInserts(requests), &ctx);
let resp = self
.client
.clone()
.handle(req)
.await
.map_err(from_rpc_error)?;
Ok(resp_to_output(resp.into_inner()))
}
async fn row_deletes(
&self,
requests: RowDeleteRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
let req = to_rpc_request(Request::RowDeletes(requests), &ctx);
let resp = self
.client
.clone()
.handle(req)
.await
.map_err(from_rpc_error)?;
Ok(resp_to_output(resp.into_inner()))
}
}

View File

@@ -27,6 +27,7 @@ mod adapter;
mod compute; mod compute;
mod error; mod error;
mod expr; mod expr;
mod fe_client;
pub mod heartbeat; pub mod heartbeat;
mod plan; mod plan;
mod repr; mod repr;
@@ -36,4 +37,5 @@ mod utils;
pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use fe_client::FrontendClient;
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer}; pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer};

View File

@@ -37,6 +37,7 @@ use servers::server::Server;
use snafu::{ensure, ResultExt}; use snafu::{ensure, ResultExt};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::{oneshot, Mutex}; use tokio::sync::{oneshot, Mutex};
use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming; use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
@@ -120,6 +121,10 @@ impl FlownodeServer {
impl FlownodeServer { impl FlownodeServer {
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> { pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
flow_server::FlowServer::new(self.flow_service.clone()) flow_server::FlowServer::new(self.flow_service.clone())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd)
} }
} }

View File

@@ -26,6 +26,7 @@ use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http; use tonic::codegen::http;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message; use crate::pubsub::Message;
#[derive(Snafu)] #[derive(Snafu)]
@@ -175,15 +176,17 @@ pub enum Error {
}, },
#[snafu(display( #[snafu(display(
"Failed to request Datanode, required: {}, but only {} available", "Failed to request {}, required: {}, but only {} available",
select_target,
required, required,
available available
))] ))]
NoEnoughAvailableDatanode { NoEnoughAvailableNode {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
required: usize, required: usize,
available: usize, available: usize,
select_target: SelectTarget,
}, },
#[snafu(display("Failed to request Datanode {}", peer))] #[snafu(display("Failed to request Datanode {}", peer))]
@@ -895,7 +898,7 @@ impl ErrorExt for Error {
| Error::RetryLaterWithSource { .. } | Error::RetryLaterWithSource { .. }
| Error::StartGrpc { .. } | Error::StartGrpc { .. }
| Error::UpdateTableMetadata { .. } | Error::UpdateTableMetadata { .. }
| Error::NoEnoughAvailableDatanode { .. } | Error::NoEnoughAvailableNode { .. }
| Error::PublishMessage { .. } | Error::PublishMessage { .. }
| Error::Join { .. } | Error::Join { .. }
| Error::WeightArray { .. } | Error::WeightArray { .. }

View File

@@ -28,7 +28,7 @@ use crate::metasrv::Context;
const MAX_CACHED_STATS_PER_KEY: usize = 10; const MAX_CACHED_STATS_PER_KEY: usize = 10;
#[derive(Default)] #[derive(Debug, Default)]
struct EpochStats { struct EpochStats {
stats: Vec<Stat>, stats: Vec<Stat>,
epoch: Option<u64>, epoch: Option<u64>,

View File

@@ -14,6 +14,7 @@
pub mod builder; pub mod builder;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -248,6 +249,15 @@ pub enum SelectTarget {
Flownode, Flownode,
} }
impl Display for SelectTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SelectTarget::Datanode => write!(f, "datanode"),
SelectTarget::Flownode => write!(f, "flownode"),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct SelectorContext { pub struct SelectorContext {
pub server_addr: String, pub server_addr: String,

View File

@@ -20,6 +20,7 @@ use snafu::ensure;
use super::weighted_choose::{WeightedChoose, WeightedItem}; use super::weighted_choose::{WeightedChoose, WeightedItem};
use crate::error; use crate::error;
use crate::error::Result; use crate::error::Result;
use crate::metasrv::SelectTarget;
use crate::selector::SelectorOptions; use crate::selector::SelectorOptions;
/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`. /// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
@@ -34,9 +35,10 @@ where
let min_required_items = opts.min_required_items; let min_required_items = opts.min_required_items;
ensure!( ensure!(
!weight_array.is_empty(), !weight_array.is_empty(),
error::NoEnoughAvailableDatanodeSnafu { error::NoEnoughAvailableNodeSnafu {
required: min_required_items, required: min_required_items,
available: 0_usize, available: 0_usize,
select_target: SelectTarget::Datanode
} }
); );
@@ -52,9 +54,10 @@ where
// or equal to min_required_items, otherwise it may cause an infinite loop. // or equal to min_required_items, otherwise it may cause an infinite loop.
ensure!( ensure!(
weight_array_len >= min_required_items, weight_array_len >= min_required_items,
error::NoEnoughAvailableDatanodeSnafu { error::NoEnoughAvailableNodeSnafu {
required: min_required_items, required: min_required_items,
available: weight_array_len, available: weight_array_len,
select_target: SelectTarget::Datanode
} }
); );

View File

@@ -17,7 +17,7 @@ use std::sync::atomic::AtomicUsize;
use common_meta::peer::Peer; use common_meta::peer::Peer;
use snafu::ensure; use snafu::ensure;
use crate::error::{NoEnoughAvailableDatanodeSnafu, Result}; use crate::error::{NoEnoughAvailableNodeSnafu, Result};
use crate::lease; use crate::lease;
use crate::metasrv::{SelectTarget, SelectorContext}; use crate::metasrv::{SelectTarget, SelectorContext};
use crate::selector::{Namespace, Selector, SelectorOptions}; use crate::selector::{Namespace, Selector, SelectorOptions};
@@ -86,9 +86,10 @@ impl RoundRobinSelector {
ensure!( ensure!(
!peers.is_empty(), !peers.is_empty(),
NoEnoughAvailableDatanodeSnafu { NoEnoughAvailableNodeSnafu {
required: min_required_items, required: min_required_items,
available: 0usize, available: 0usize,
select_target: self.select_target
} }
); );

View File

@@ -22,7 +22,7 @@ use snafu::{ensure, ResultExt};
use store_api::storage::MAX_REGION_SEQ; use store_api::storage::MAX_REGION_SEQ;
use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef}; use crate::metasrv::{SelectTarget, SelectorContext, SelectorRef};
use crate::selector::SelectorOptions; use crate::selector::SelectorOptions;
pub struct MetasrvPeerAllocator { pub struct MetasrvPeerAllocator {
@@ -64,9 +64,10 @@ impl MetasrvPeerAllocator {
ensure!( ensure!(
peers.len() >= regions, peers.len() >= regions,
error::NoEnoughAvailableDatanodeSnafu { error::NoEnoughAvailableNodeSnafu {
required: regions, required: regions,
available: peers.len(), available: peers.len(),
select_target: SelectTarget::Datanode
} }
); );

View File

@@ -334,7 +334,7 @@ impl Inserter {
// already know this is not source table // already know this is not source table
Some(None) => continue, Some(None) => continue,
_ => { _ => {
// TODO(discord9): determine where to store the flow node address in distributed mode // TODO(discord9): query metasrv for actual peer address
let peers = self let peers = self
.table_flownode_set_cache .table_flownode_set_cache
.get(table_id) .get(table_id)

View File

@@ -25,7 +25,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE [\s\-]+ -- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type; SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++ +++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:6800|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version -- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
@@ -55,7 +55,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE [\s\-]+ -- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++ +++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:6800|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version -- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version

View File

@@ -115,6 +115,7 @@ impl Env {
server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), server_processes: Some(Arc::new(Mutex::new(vec![server_process]))),
metasrv_process: None, metasrv_process: None,
frontend_process: None, frontend_process: None,
flownode_process: None,
client: TokioMutex::new(db), client: TokioMutex::new(db),
ctx: db_ctx, ctx: db_ctx,
is_standalone: true, is_standalone: true,
@@ -141,6 +142,8 @@ impl Env {
let frontend = self.start_server("frontend", &db_ctx, true).await; let frontend = self.start_server("frontend", &db_ctx, true).await;
let flownode = self.start_server("flownode", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]); let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
@@ -150,6 +153,7 @@ impl Env {
]))), ]))),
metasrv_process: Some(meta_server), metasrv_process: Some(meta_server),
frontend_process: Some(frontend), frontend_process: Some(frontend),
flownode_process: Some(flownode),
client: TokioMutex::new(db), client: TokioMutex::new(db),
ctx: db_ctx, ctx: db_ctx,
is_standalone: false, is_standalone: false,
@@ -166,6 +170,7 @@ impl Env {
server_processes: None, server_processes: None,
metasrv_process: None, metasrv_process: None,
frontend_process: None, frontend_process: None,
flownode_process: None,
ctx: GreptimeDBContext { ctx: GreptimeDBContext {
time: 0, time: 0,
datanode_id: Default::default(), datanode_id: Default::default(),
@@ -192,6 +197,8 @@ impl Env {
db_ctx.incr_datanode_id(); db_ctx.incr_datanode_id();
format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id()) format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id())
} }
// The flownode id is always 0 for now
"flownode" => "greptime-sqlness-flownode.log".to_string(),
"frontend" => "greptime-sqlness-frontend.log".to_string(), "frontend" => "greptime-sqlness-frontend.log".to_string(),
"metasrv" => "greptime-sqlness-metasrv.log".to_string(), "metasrv" => "greptime-sqlness-metasrv.log".to_string(),
"standalone" => "greptime-sqlness-standalone.log".to_string(), "standalone" => "greptime-sqlness-standalone.log".to_string(),
@@ -211,6 +218,7 @@ impl Env {
let (args, check_ip_addr) = match subcommand { let (args, check_ip_addr) = match subcommand {
"datanode" => self.datanode_start_args(db_ctx), "datanode" => self.datanode_start_args(db_ctx),
"flownode" => self.flownode_start_args(db_ctx),
"standalone" => { "standalone" => {
let args = vec![ let args = vec![
DEFAULT_LOG_LEVEL.to_string(), DEFAULT_LOG_LEVEL.to_string(),
@@ -307,6 +315,22 @@ impl Env {
(args, format!("127.0.0.1:410{id}")) (args, format!("127.0.0.1:410{id}"))
} }
fn flownode_start_args(&self, _db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
let id = 0;
let subcommand = "flownode";
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
];
args.push(format!("--rpc-addr=127.0.0.1:680{id}"));
args.push(format!("--node-id={id}"));
args.push("--metasrv-addrs=127.0.0.1:3002".to_string());
args.push("--frontend-addr=http://127.0.0.1:4001".to_string());
(args, format!("127.0.0.1:680{id}"))
}
/// stop and restart the server process /// stop and restart the server process
async fn restart_server(&self, db: &GreptimeDB) { async fn restart_server(&self, db: &GreptimeDB) {
{ {
@@ -421,6 +445,7 @@ pub struct GreptimeDB {
server_processes: Option<Arc<Mutex<Vec<Child>>>>, server_processes: Option<Arc<Mutex<Vec<Child>>>>,
metasrv_process: Option<Child>, metasrv_process: Option<Child>,
frontend_process: Option<Child>, frontend_process: Option<Child>,
flownode_process: Option<Child>,
client: TokioMutex<DB>, client: TokioMutex<DB>,
ctx: GreptimeDBContext, ctx: GreptimeDBContext,
is_standalone: bool, is_standalone: bool,
@@ -516,6 +541,10 @@ impl GreptimeDB {
Env::stop_server(&mut frontend); Env::stop_server(&mut frontend);
println!("Frontend (pid = {}) is stopped", frontend.id()); println!("Frontend (pid = {}) is stopped", frontend.id());
} }
if let Some(mut flownode) = self.flownode_process.take() {
Env::stop_server(&mut flownode);
println!("Flownode (pid = {}) is stopped", flownode.id());
}
if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
{ {
util::teardown_wal(); util::teardown_wal();