feat: flow cli for distributed (#4226)

* feat(WIP): add FlownodeInstance for flow cli

* feat(WIP): cli

* feat: add merge opts func

* refactor: move server&error to src dir

* feat: flownode cli build

* feat: add `flownode` subcmd to cli

* refactor: per review

* refactor!: BREAKING remove alias `metasrv-addr`

* chore: after rebase

* feat: cache invalide flownode cache

* chore: small refactor per review

* chore: fix a typo

* feat!: revert breaking change

* chore: per review

* refactor: not accept `metasrv-addr` only for flownode
This commit is contained in:
discord9
2024-07-01 17:56:15 +08:00
committed by GitHub
parent a4e99f5666
commit f035a7c79c
37 changed files with 723 additions and 132 deletions

2
Cargo.lock generated
View File

@@ -3955,10 +3955,12 @@ dependencies = [
"catalog",
"common-base",
"common-catalog",
"common-config",
"common-decimal",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-macro",
"common-meta",
"common-query",

View File

@@ -17,7 +17,7 @@
use clap::{Parser, Subcommand};
use cmd::error::Result;
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, frontend, metasrv, standalone, App};
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_version::version;
#[derive(Parser)]
@@ -37,6 +37,10 @@ enum SubCommand {
#[clap(name = "datanode")]
Datanode(datanode::Command),
/// Start flownode service.
#[clap(name = "flownode")]
Flownode(flownode::Command),
/// Start frontend service.
#[clap(name = "frontend")]
Frontend(frontend::Command),
@@ -72,6 +76,12 @@ async fn start(cli: Command) -> Result<()> {
.run()
.await
}
SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Frontend(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?

View File

@@ -87,6 +87,20 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to start flownode"))]
StartFlownode {
#[snafu(implicit)]
location: Location,
source: flow::Error,
},
#[snafu(display("Failed to shutdown flownode"))]
ShutdownFlownode {
#[snafu(implicit)]
location: Location,
source: flow::Error,
},
#[snafu(display("Failed to start frontend"))]
StartFrontend {
#[snafu(implicit)]
@@ -380,6 +394,9 @@ impl ErrorExt for Error {
Error::BuildRuntime { source, .. } => source.status_code(),
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Self::StartFlownode { source, .. } | Self::ShutdownFlownode { source, .. } => {
source.status_code()
}
}
}

301
src/cmd/src/flownode.rs Normal file
View File

@@ -0,0 +1,301 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result,
ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-flownode";
type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
pub struct Instance {
flownode: FlownodeInstance,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
impl Instance {
pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
flownode,
_guard: guard,
}
}
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}
}
#[async_trait::async_trait]
impl App for Instance {
fn name(&self) -> &str {
APP_NAME
}
async fn start(&mut self) -> Result<()> {
self.flownode.start().await.context(StartFlownodeSnafu)
}
async fn stop(&self) -> Result<()> {
self.flownode
.shutdown()
.await
.context(ShutdownFlownodeSnafu)
}
}
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
}
impl Command {
pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
match &self.subcmd {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
}
}
#[derive(Parser)]
enum SubCommand {
Start(StartCommand),
}
impl SubCommand {
async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
}
#[derive(Debug, Parser, Default)]
struct StartCommand {
#[clap(long)]
node_id: Option<u64>,
#[clap(long)]
rpc_addr: Option<String>,
#[clap(long)]
rpc_hostname: Option<String>,
#[clap(long, value_delimiter = ',', num_args = 1..)]
metasrv_addrs: Option<Vec<String>>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
env_prefix: String,
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
let mut opts = FlownodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
Ok(opts)
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
opts: &mut FlownodeOptions,
) -> Result<()> {
let opts = &mut opts.component;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
if global_options.log_level.is_some() {
opts.logging.level.clone_from(&global_options.log_level);
}
opts.tracing = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: global_options.tokio_console_addr.clone(),
};
if let Some(addr) = &self.rpc_addr {
opts.grpc.addr.clone_from(addr);
}
if let Some(hostname) = &self.rpc_hostname {
opts.grpc.hostname.clone_from(hostname);
}
if let Some(node_id) = self.node_id {
opts.node_id = Some(node_id);
}
if let Some(metasrv_addrs) = &self.metasrv_addrs {
opts.meta_client
.get_or_insert_with(MetaClientOptions::default)
.metasrv_addrs
.clone_from(metasrv_addrs);
opts.mode = Mode::Distributed;
}
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
}
.fail();
}
Ok(())
}
async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version!(), short_version!());
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);
let opts = opts.component;
let cluster_id = opts.cluster_id.context(MissingConfigSnafu {
msg: "'cluster_id'",
})?;
let node_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client_options'",
})?;
let meta_client = Arc::new(
flow::heartbeat::new_metasrv_client(cluster_id, node_id, meta_config)
.await
.context(StartFlownodeSnafu)?,
);
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;
let cache_tti = meta_config.metadata_cache_tti;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(BuildCacheRegistrySnafu)?
.build(),
);
let catalog_manager = KvBackendCatalogManager::new(
opts.mode,
Some(meta_client.clone()),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
);
let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend));
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
]);
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
);
let flownode_builder = FlownodeBuilder::new(
opts,
Plugins::new(),
table_metadata_manager,
catalog_manager,
)
.with_heartbeat_task(heartbeat_task);
let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
Ok(Instance::new(flownode, guard))
}
}

View File

@@ -283,6 +283,7 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)

View File

@@ -22,6 +22,7 @@ use crate::error::Result;
pub mod cli;
pub mod datanode;
pub mod error;
pub mod flownode;
pub mod frontend;
pub mod metasrv;
pub mod options;

View File

@@ -21,6 +21,7 @@ use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
@@ -65,9 +66,9 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
@@ -448,13 +449,18 @@ impl StartCommand {
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_builder = FlownodeBuilder::new(
1,
Default::default(),
fe_plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
);
let flownode = Arc::new(flow_builder.build().await);
let flownode = Arc::new(
flow_builder
.build()
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
);
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
@@ -464,7 +470,7 @@ impl StartCommand {
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
flow_server: flownode.flow_worker_manager(),
});
let table_id_sequence = Arc::new(
@@ -516,11 +522,12 @@ impl StartCommand {
.context(StartFrontendSnafu)?;
// flow server need to be able to use frontend to write insert requests back
flownode
let flow_worker_manager = flownode.flow_worker_manager();
flow_worker_manager
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
// TODO(discord9): unify with adding `start` and `shutdown` method to flownode too.
let _handle = flownode.clone().run_background();
let _handle = flow_worker_manager.run_background();
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()

View File

@@ -15,10 +15,12 @@ async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
common-base.workspace = true
common-config.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true

View File

@@ -21,40 +21,41 @@ use std::sync::Arc;
use std::time::{Instant, SystemTime};
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_telemetry::{debug, info};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
use itertools::Itertools;
use query::{QueryEngine, QueryEngineFactory};
use meta_client::MetaClientOptions;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{oneshot, watch, Mutex, RwLock};
use tokio::sync::{watch, Mutex, RwLock};
use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::repr::{self, DiffRow, Row};
use crate::transform::{register_function_to_query_engine, sql_to_flow_plan};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
mod flownode_impl;
mod parse_expr;
mod server;
#[cfg(test)]
mod tests;
mod util;
@@ -63,7 +64,7 @@ mod worker;
pub(crate) mod node_context;
mod table_source;
use error::Error;
use crate::error::Error;
// TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
@@ -76,79 +77,43 @@ pub type FlowId = u64;
pub type TableName = [String; 3];
/// Options for flow node
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
pub mode: Mode,
pub cluster_id: Option<u64>,
pub node_id: Option<u64>,
pub grpc: GrpcOptions,
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
pub heartbeat: HeartbeatOptions,
}
/// Flownode Builder
pub struct FlownodeBuilder {
flow_node_id: u32,
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
}
impl FlownodeBuilder {
/// init flownode builder
pub fn new(
flow_node_id: u32,
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
) -> Self {
impl Default for FlownodeOptions {
fn default() -> Self {
Self {
flow_node_id,
opts,
plugins,
table_meta,
catalog_manager,
mode: servers::Mode::Standalone,
cluster_id: None,
node_id: None,
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
}
}
/// TODO(discord9): error handling
pub async fn build(self) -> FlownodeManager {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode only translate plan with resolved table source.
self.catalog_manager.clone(),
None,
None,
None,
false,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
register_function_to_query_engine(&query_engine);
let (tx, rx) = oneshot::channel();
let node_id = Some(self.flow_node_id);
let _handle = std::thread::spawn(move || {
let (flow_node_manager, mut worker) =
FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone());
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.unwrap();
info!("Flow Node Manager started");
man
}
}
impl Configurable for FlownodeOptions {}
/// Arc-ed FlowNodeManager, cheaper to clone
pub type FlownodeManagerRef = Arc<FlownodeManager>;
pub type FlowWorkerManagerRef = Arc<FlowWorkerManager>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
pub struct FlownodeManager {
pub struct FlowWorkerManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
@@ -166,7 +131,7 @@ pub struct FlownodeManager {
}
/// Building FlownodeManager
impl FlownodeManager {
impl FlowWorkerManager {
/// set frontend invoker
pub async fn set_frontend_invoker(
self: &Arc<Self>,
@@ -188,7 +153,7 @@ impl FlownodeManager {
let node_context = FlownodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlownodeManager {
FlowWorkerManager {
worker_handles,
query_engine,
table_info_source: srv_map,
@@ -248,7 +213,7 @@ pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
}
/// This impl block contains methods to send writeback requests to frontend
impl FlownodeManager {
impl FlowWorkerManager {
/// TODO(discord9): merge all same type of diff row into one requests
///
/// Return the number of requests it made
@@ -494,7 +459,7 @@ impl FlownodeManager {
}
/// Flow Runtime related methods
impl FlownodeManager {
impl FlowWorkerManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
@@ -604,7 +569,7 @@ impl FlownodeManager {
}
/// Create&Remove flow
impl FlownodeManager {
impl FlowWorkerManager {
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {

View File

@@ -26,11 +26,11 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::error::InternalSnafu;
use crate::adapter::FlownodeManager;
use crate::adapter::FlowWorkerManager;
use crate::error::InternalSnafu;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
@@ -38,7 +38,7 @@ fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
}
#[async_trait::async_trait]
impl Flownode for FlownodeManager {
impl Flownode for FlowWorkerManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
let query_ctx = request
.header

View File

@@ -23,8 +23,8 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP};
@@ -317,7 +317,6 @@ impl FlownodeContext {
/// Assign a schema to a table
///
/// TODO(discord9): error handling
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
@@ -327,7 +326,10 @@ impl FlownodeContext {
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
.context(TableNotFoundSnafu {
name: format!("Table not found: {:?} in flownode cache", table_name),
})?;
self.schema.insert(gid, schema);
Ok(())
}

View File

@@ -20,10 +20,10 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::adapter::error::{
use crate::adapter::TableName;
use crate::error::{
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::TableName;
use crate::repr::{self, ColumnType, RelationDesc, RelationType};
/// mapping of table name <-> table id should be query from tableinfo manager

View File

@@ -19,7 +19,7 @@ use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::error::{Error, ExternalSnafu};
use crate::error::{Error, ExternalSnafu};
/// convert `ColumnSchema` lists to it's corresponding proto type
pub fn column_schemas_to_proto(

View File

@@ -24,9 +24,9 @@ use hydroflow::scheduled::graph::Hydroflow;
use snafu::ensure;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use crate::adapter::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu};
use crate::adapter::FlowId;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow};

View File

@@ -32,9 +32,9 @@ use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use super::state::Scheduler;
use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::compute::state::DataflowState;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,

View File

@@ -19,10 +19,10 @@ use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};

View File

@@ -22,10 +22,10 @@ use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::{Context, SubgraphArg};
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};

View File

@@ -23,9 +23,9 @@ use snafu::OptionExt;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, GlobalId};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};

View File

@@ -162,6 +162,34 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start server"))]
StartServer {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to shutdown server"))]
ShutdownServer {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
#[snafu(source)]
error: std::net::AddrParseError,
},
}
/// Result type for flow module
@@ -184,11 +212,16 @@ impl ErrorExt for Error {
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected,
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
}
&Self::External { .. } => StatusCode::Unknown,
Self::External { source, .. } => source.status_code(),
Self::Internal { .. } => StatusCode::Internal,
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
source.status_code()
}
Self::MetaClientInit { source, .. } => source.status_code(),
Self::ParseAddr { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use substrait::df_logical_plan::consumer::name_to_op;
use crate::adapter::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
use crate::expr::error::{
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu,
TryFromValueSnafu, TypeMismatchSnafu,

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use crate::adapter::error::{Error, InvalidQuerySnafu};
use crate::error::{Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr};
use crate::repr::{self, value_to_internal_ts, Diff, Row};

View File

@@ -24,7 +24,7 @@ use smallvec::smallvec;
use snafu::{OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use crate::adapter::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
use crate::expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu};
use crate::expr::relation::accum::{Accum, Accumulator};
use crate::expr::signature::{GenericFn, Signature};

View File

@@ -32,7 +32,7 @@ use substrait::error::{DecodeRelSnafu, EncodeRelSnafu};
use substrait::substrait_proto_df::proto::expression::{RexType, ScalarFunction};
use substrait::substrait_proto_df::proto::Expression;
use crate::adapter::error::{
use crate::error::{
DatafusionSnafu, Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu,
};
use crate::expr::error::{
@@ -284,7 +284,7 @@ impl RawDfScalarFn {
f.encode(&mut buf)
.context(EncodeRelSnafu)
.map_err(BoxedError::new)
.context(crate::adapter::error::ExternalSnafu)?;
.context(crate::error::ExternalSnafu)?;
Ok(Self {
f: buf,
input_schema,
@@ -295,7 +295,7 @@ impl RawDfScalarFn {
let f = ScalarFunction::decode(&mut self.f.as_ref())
.context(DecodeRelSnafu)
.map_err(BoxedError::new)
.context(crate::adapter::error::ExternalSnafu)?;
.context(crate::error::ExternalSnafu)?;
let input_schema = &self.input_schema;
let extensions = &self.extensions;
@@ -371,7 +371,7 @@ impl ScalarExpr {
})?;
let typ = ConcreteDataType::try_from(&arrow_typ)
.map_err(BoxedError::new)
.context(crate::adapter::error::ExternalSnafu)?;
.context(crate::error::ExternalSnafu)?;
Ok(ColumnType::new_nullable(typ))
}
}

View File

@@ -14,25 +14,29 @@
//! Send heartbeat from flownode to metasrv
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use crate::adapter::error::ExternalSnafu;
use crate::error::{ExternalSnafu, MetaClientInitSnafu};
use crate::{Error, FlownodeOptions};
/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
@@ -45,6 +49,7 @@ pub struct HeartbeatTask {
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
running: Arc<AtomicBool>,
}
impl HeartbeatTask {
@@ -62,10 +67,19 @@ impl HeartbeatTask {
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
running: Arc::new(AtomicBool::new(false)),
}
}
pub async fn start(&self) -> Result<(), Error> {
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Heartbeat task started multiple times");
return Ok(());
}
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -86,6 +100,17 @@ impl HeartbeatTask {
Ok(())
}
pub fn shutdown(&self) {
info!("Close heartbeat task for flownode");
if self
.running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
}
fn create_heartbeat_request(
message: Option<OutgoingMessage>,
peer: Option<Peer>,
@@ -208,3 +233,38 @@ impl HeartbeatTask {
}
}
}
/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
cluster_id: u64,
node_id: u64,
meta_config: &MetaClientOptions,
) -> Result<MetaClient, Error> {
let member_id = node_id;
let config = ChannelConfig::new()
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout)
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config.clone());
let heartbeat_channel_manager = ChannelManager::with_config(
config
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout),
);
let mut meta_client = MetaClientBuilder::flownode_default_options(cluster_id, member_id)
.channel_manager(channel_manager)
.heartbeat_channel_manager(heartbeat_channel_manager)
.build();
meta_client
.start(&meta_config.metasrv_addrs)
.await
.context(MetaClientInitSnafu)?;
// required only when the heartbeat_client is enabled
meta_client
.ask_leader()
.await
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}

View File

@@ -25,12 +25,15 @@
// allow unused for now because it should be use later
mod adapter;
mod compute;
mod error;
mod expr;
mod heartbeat;
pub mod heartbeat;
mod plan;
mod repr;
mod server;
mod transform;
mod utils;
pub use adapter::error::{Error, Result};
pub use adapter::{FlownodeBuilder, FlownodeManager, FlownodeManagerRef, FlownodeOptions};
pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer};

View File

@@ -23,7 +23,7 @@ use std::collections::BTreeSet;
use datatypes::arrow::ipc::Map;
use serde::{Deserialize, Serialize};
use crate::adapter::error::Error;
use crate::error::Error;
use crate::expr::{
AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
TypedExpr,

View File

@@ -21,9 +21,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::error::{
DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu,
};
use crate::error::{DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu};
use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr};
/// a set of column indices that are "keys" for the collection.

View File

@@ -15,26 +15,48 @@
//! Implementation of grpc service for flow node
use std::net::SocketAddr;
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::ddl::table_meta;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::Flownode;
use common_telemetry::tracing::info;
use futures::FutureExt;
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
use meta_client::client::MetaClient;
use query::QueryEngineFactory;
use serde::de::Unexpected;
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::Server;
use snafu::{ensure, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{oneshot, Mutex};
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlownodeManagerRef;
use crate::adapter::FlowWorkerManagerRef;
use crate::error::{ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu};
use crate::heartbeat::HeartbeatTask;
use crate::transform::register_function_to_query_engine;
use crate::{Error, FlowWorkerManager, FlownodeOptions};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)]
pub struct FlowService {
pub manager: FlownodeManagerRef,
pub manager: FlowWorkerManagerRef,
}
impl FlowService {
pub fn new(manager: FlowWorkerManagerRef) -> Self {
Self { manager }
}
}
#[async_trait::async_trait]
@@ -82,8 +104,17 @@ impl flow_server::Flow for FlowService {
}
pub struct FlownodeServer {
pub shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
pub flow_service: FlowService,
shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
flow_service: FlowService,
}
impl FlownodeServer {
pub fn new(flow_service: FlowService) -> Self {
Self {
flow_service,
shutdown_tx: Mutex::new(None),
}
}
}
impl FlownodeServer {
@@ -134,7 +165,6 @@ impl servers::server::Server for FlownodeServer {
.context(StartGrpcSnafu);
});
// TODO(discord9): better place for dataflow to run per second
let manager_ref = self.flow_service.manager.clone();
let _handle = manager_ref.clone().run_background();
@@ -145,3 +175,126 @@ impl servers::server::Server for FlownodeServer {
FLOW_NODE_SERVER_NAME
}
}
/// The flownode server instance.
pub struct FlownodeInstance {
server: FlownodeServer,
addr: SocketAddr,
heartbeat_task: Option<HeartbeatTask>,
}
impl FlownodeInstance {
pub async fn start(&mut self) -> Result<(), crate::Error> {
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
self.addr = self
.server
.start(self.addr)
.await
.context(StartServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), crate::Error> {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.shutdown();
}
Ok(())
}
pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef {
self.server.flow_service.manager.clone()
}
}
/// [`FlownodeInstance`] Builder
pub struct FlownodeBuilder {
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
heartbeat_task: Option<HeartbeatTask>,
}
impl FlownodeBuilder {
/// init flownode builder
pub fn new(
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
) -> Self {
Self {
opts,
plugins,
table_meta,
catalog_manager,
heartbeat_task: None,
}
}
pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
Self {
heartbeat_task: Some(heartbeat_task),
..self
}
}
pub async fn build(self) -> Result<FlownodeInstance, Error> {
let manager = Arc::new(self.build_manager().await?);
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let heartbeat_task = self.heartbeat_task;
let addr = self.opts.grpc.addr;
let instance = FlownodeInstance {
server,
addr: addr.parse().context(ParseAddrSnafu { addr })?,
heartbeat_task,
};
Ok(instance)
}
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(&self) -> Result<FlowWorkerManager, Error> {
let catalog_manager = self.catalog_manager.clone();
let table_meta = self.table_meta.clone();
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode is only used for translate plan with resolved table source.
catalog_manager,
None,
None,
None,
false,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
register_function_to_query_engine(&query_engine);
let (tx, rx) = oneshot::channel();
let node_id = self.opts.node_id.map(|id| id as u32);
let _handle = std::thread::spawn(move || {
let (flow_node_manager, mut worker) =
FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.map_err(|_e| {
UnexpectedSnafu {
reason: "sender is dropped, failed to create flow node manager",
}
.build()
})?;
info!("Flow Node Manager started");
Ok(man)
}
}

View File

@@ -37,11 +37,11 @@ use substrait::{
use substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use crate::adapter::error::{
use crate::adapter::FlownodeContext;
use crate::error::{
Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
};
use crate::adapter::FlownodeContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;

View File

@@ -45,7 +45,7 @@ use substrait_proto::proto::read_rel::ReadType;
use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel};
use crate::adapter::error::{
use crate::error::{
DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu,
TableNotFoundSnafu,
};

View File

@@ -25,7 +25,7 @@ use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction};
use substrait_proto::proto::function_argument::ArgType;
use substrait_proto::proto::Expression;
use crate::adapter::error::{
use crate::error::{
DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu,
PlanSnafu,
};

View File

@@ -26,7 +26,7 @@ use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::Literal;
use substrait_proto::proto::r#type::Kind;
use crate::adapter::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::transform::substrait_proto;
/// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null)

View File

@@ -22,7 +22,7 @@ use substrait_proto::proto::read_rel::ReadType;
use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
use crate::adapter::error::{
use crate::error::{
Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
};
use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc};

View File

@@ -100,6 +100,13 @@ impl MetaClientBuilder {
.enable_heartbeat()
}
/// Returns the role of Flownode's default options.
pub fn flownode_default_options(cluster_id: ClusterId, member_id: u64) -> Self {
Self::new(cluster_id, member_id, Role::Flownode)
.enable_store()
.enable_heartbeat()
}
pub fn enable_heartbeat(self) -> Self {
Self {
enable_heartbeat: true,

View File

@@ -57,6 +57,21 @@ impl MetasrvCacheInvalidator {
.broadcast(&BroadcastChannel::Frontend, msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let msg = &MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Flownode broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| meta_error::SerdeJsonSnafu)?;
self.mailbox
.broadcast(&BroadcastChannel::Flownode, msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
}

View File

@@ -33,6 +33,7 @@ pub type MessageId = u64;
pub enum Channel {
Datanode(u64),
Frontend(u64),
Flownode(u64),
}
impl Display for Channel {
@@ -44,6 +45,9 @@ impl Display for Channel {
Channel::Frontend(id) => {
write!(f, "Frontend-{}", id)
}
Channel::Flownode(id) => {
write!(f, "Flownode-{}", id)
}
}
}
}
@@ -53,12 +57,14 @@ impl Channel {
match self {
Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id),
Channel::Frontend(id) => format!("{}-{}", Role::Frontend as i32, id),
Channel::Flownode(id) => format!("{}-{}", Role::Flownode as i32, id),
}
}
}
pub enum BroadcastChannel {
Datanode,
Frontend,
Flownode,
}
impl BroadcastChannel {
@@ -70,7 +76,11 @@ impl BroadcastChannel {
},
BroadcastChannel::Frontend => Range {
start: format!("{}-", Role::Frontend as i32),
end: format!("{}-", Role::Frontend as i32 + 1),
end: format!("{}-", Role::Flownode as i32),
},
BroadcastChannel::Flownode => Range {
start: format!("{}-", Role::Flownode as i32),
end: format!("{}-", Role::Flownode as i32 + 1),
},
}
}
@@ -144,5 +154,9 @@ mod tests {
BroadcastChannel::Frontend.pusher_range(),
("1-".to_string().."2-".to_string())
);
assert_eq!(
BroadcastChannel::Flownode.pusher_range(),
("2-".to_string().."3-".to_string())
);
}
}

View File

@@ -151,17 +151,16 @@ impl GreptimeDbStandaloneBuilder {
);
let flow_builder = FlownodeBuilder::new(
1, // for standalone mode this value is default to one
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
);
let flownode = Arc::new(flow_builder.build().await);
let flownode = Arc::new(flow_builder.build().await.unwrap());
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
flow_server: flownode.flow_worker_manager(),
});
let table_id_sequence = Arc::new(
@@ -219,10 +218,11 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
flownode
let flow_manager = flownode.flow_worker_manager();
flow_manager
.set_frontend_invoker(Box::new(instance.clone()))
.await;
let _node_handle = flownode.run_background();
let _node_handle = flow_manager.run_background();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();