feat: integrate flow to standalone(untested)

This commit is contained in:
discord9
2024-05-07 16:01:11 +08:00
parent d88cff6f51
commit 878737f781
15 changed files with 183 additions and 29 deletions

3
Cargo.lock generated
View File

@@ -1611,6 +1611,7 @@ dependencies = [
"either",
"etcd-client",
"file-engine",
"flow",
"frontend",
"futures",
"human-panic",
@@ -3518,6 +3519,7 @@ dependencies = [
"api",
"async-trait",
"catalog",
"common-base",
"common-catalog",
"common-decimal",
"common-error",
@@ -10318,6 +10320,7 @@ dependencies = [
"datanode",
"datatypes",
"dotenv",
"flow",
"frontend",
"futures",
"futures-util",

View File

@@ -204,6 +204,7 @@ common-wal = { path = "src/common/wal" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }

View File

@@ -45,6 +45,7 @@ datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
file-engine.workspace = true
flow.workspace = true
frontend.workspace = true
futures.workspace = true
human-panic = "1.2.2"

View File

@@ -40,6 +40,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -405,11 +406,25 @@ impl StartCommand {
)
.await;
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_builder = FlownodeBuilder::new(
Default::default(),
fe_plugins.clone(),
table_metadata_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build());
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
});
let table_id_sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -427,8 +442,6 @@ impl StartCommand {
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
@@ -456,6 +469,11 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
// flow server need to be able to use frontend to write insert requests back
flownode
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await

View File

@@ -13,6 +13,8 @@ workspace = true
[dependencies]
api.workspace = true
catalog.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true

View File

@@ -19,17 +19,20 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use greptime_proto::v1;
use hydroflow::scheduled::graph::Hydroflow;
use itertools::Itertools;
use minstant::Anchor;
use prost::bytes::buf;
use query::QueryEngine;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use smallvec::SmallVec;
@@ -39,7 +42,9 @@ use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::task::LocalSet;
use crate::adapter::error::{EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::adapter::error::{
EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::{Context, DataflowState, ErrCollector};
@@ -67,18 +72,83 @@ pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
pub type FlowId = u64;
pub type TableName = Vec<String>;
/// Options for flow node
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
/// rpc address
pub rpc_addr: String,
}
/// Flownode Builder
pub struct FlownodeBuilder {
opts: FlownodeOptions,
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
table_meta: TableMetadataManagerRef,
}
impl FlownodeBuilder {
/// init flownode builder
pub fn new(
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
) -> Self {
Self {
opts,
plugins,
kv_backend: None,
table_meta,
}
}
/// set kv backend
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
..self
}
}
/// TODO(discord9): error handling
pub fn build(self) -> FlownodeManager {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode only translate plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
None,
None,
false,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
let (tx, rx) = oneshot::channel();
let _handle = std::thread::spawn(move || {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone());
let _ = tx.send(flow_node_manager);
worker.run();
});
rx.blocking_recv().unwrap()
}
}
/// This function will create a new thread for flow worker and return a handle to the flow node manager
pub fn start_flow_node_with_one_worker(
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> FlowNodeManager {
) -> FlownodeManager {
let (tx, rx) = oneshot::channel();
let _handle = std::thread::spawn(move || {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta);
FlownodeManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
worker.run();
});
@@ -86,12 +156,13 @@ pub fn start_flow_node_with_one_worker(
rx.blocking_recv().unwrap()
}
pub type FlowNodeManagerRef = Arc<FlowNodeManager>;
/// Arc-ed FlowNodeManager, cheaper to clone
pub type FlownodeManagerRef = Arc<FlownodeManager>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
pub struct FlowNodeManager {
pub struct FlownodeManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
@@ -99,7 +170,7 @@ pub struct FlowNodeManager {
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableInfoSource,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
frontend_invoker: Mutex<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlowNodeContext>,
tick_manager: FlowTickManager,
@@ -107,7 +178,7 @@ pub struct FlowNodeManager {
run_task_created: Mutex<bool>,
}
impl FlowNodeManager {
impl FlownodeManager {
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
@@ -119,9 +190,15 @@ impl FlowNodeManager {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
/// set frontend invoker
pub async fn set_frontend_invoker(&self, frontend: Box<dyn FrontendInvoker + Send + Sync>) {
*self.frontend_invoker.lock().await = Some(frontend);
}
/// Create **without** setting `frontend_invoker`
pub fn new(
node_id: Option<u32>,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
@@ -132,28 +209,31 @@ impl FlowNodeManager {
let node_context = FlowNodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlowNodeManager {
FlownodeManager {
worker_handles,
query_engine,
table_info_source: srv_map,
frontend_invoker,
frontend_invoker: Mutex::new(None),
node_context: Mutex::new(node_context),
tick_manager,
node_id,
run_task_created: Mutex::new(false),
}
}
/// Create a flownode manager with one worker
pub fn new_with_worker<'s>(
node_id: Option<u32>,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> (Self, Worker<'s>) {
let mut zelf = Self::new(node_id, frontend_invoker, query_engine, table_meta);
let mut zelf = Self::new(node_id, query_engine, table_meta);
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
(zelf, worker)
}
/// add a worker handler to manager, meaning this corrseponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(Mutex::new(handle));
}
@@ -164,7 +244,7 @@ impl FlowNodeManager {
#[test]
fn check_is_send() {
fn is_send<T: Send + Sync>() {}
is_send::<FlowNodeManager>();
is_send::<FlownodeManager>();
}
/// mapping of table name <-> table id should be query from tableinfo manager
@@ -289,7 +369,7 @@ pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
reqs
}
impl FlowNodeManager {
impl FlownodeManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
@@ -365,6 +445,12 @@ impl FlowNodeManager {
};
req_cnt += 1;
self.frontend_invoker
.lock()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {
reason: "Expect a frontend invoker for flownode to write back",
})?
.row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
.await
.map_err(BoxedError::new)
@@ -384,6 +470,12 @@ impl FlowNodeManager {
req_cnt += 1;
self.frontend_invoker
.lock()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {
reason: "Expect a frontend invoker for flownode to write back",
})?
.row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
.await
.map_err(BoxedError::new)
@@ -414,6 +506,7 @@ impl FlowNodeManager {
output
}
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {
let handle = handle.lock().await;

View File

@@ -111,6 +111,9 @@ pub enum Error {
context: String,
location: Location,
},
#[snafu(display("Unexpected: {reason}"))]
Unexpected { reason: String, location: Location },
}
/// Result type for flow module
@@ -132,7 +135,7 @@ impl ErrorExt for Error {
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::NoProtoType { .. } => StatusCode::Unexpected,
Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected,
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
}

View File

@@ -32,14 +32,14 @@ use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use crate::adapter::{FlowNodeManager, FlowNodeManagerRef};
use crate::adapter::{FlownodeManager, FlownodeManagerRef};
use crate::repr::{self, DiffRow};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
#[derive(Clone)]
pub struct FlowService {
pub manager: FlowNodeManagerRef,
pub manager: FlownodeManagerRef,
}
#[async_trait::async_trait]

View File

@@ -21,7 +21,7 @@ use common_meta::node_manager::Flownode;
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::FlowNodeManager;
use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: impl ToString) -> common_meta::error::Error {
@@ -32,7 +32,7 @@ fn to_meta_err(err: impl ToString) -> common_meta::error::Error {
}
#[async_trait::async_trait]
impl Flownode for FlowNodeManager {
impl Flownode for FlownodeManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
match request.body {
Some(flow_request::Body::Create(CreateRequest {

View File

@@ -207,7 +207,8 @@ impl WorkerHandle {
/// The actual worker that does the work and contain active state
#[derive(Debug)]
pub struct Worker<'subgraph> {
pub task_states: BTreeMap<FlowId, ActiveDataflowState<'subgraph>>,
/// Task states
pub(crate) task_states: BTreeMap<FlowId, ActiveDataflowState<'subgraph>>,
itc_server: Arc<Mutex<InterThreadCallServer>>,
}

View File

@@ -30,3 +30,8 @@ mod plan;
mod repr;
mod transform;
mod utils;
pub use adapter::{
start_flow_node_with_one_worker, FlownodeBuilder, FlownodeManager, FlownodeManagerRef,
FlownodeOptions,
};

View File

@@ -75,6 +75,12 @@ impl Key {
}
}
impl Default for Key {
fn default() -> Self {
Self::new()
}
}
/// The type of a relation.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct RelationType {

View File

@@ -31,16 +31,19 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
pub struct StandaloneDatanodeManager(pub RegionServer);
pub struct StandaloneDatanodeManager {
pub region_server: RegionServer,
pub flow_server: FlownodeRef,
}
#[async_trait]
impl NodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
RegionInvoker::arc(self.0.clone())
RegionInvoker::arc(self.region_server.clone())
}
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
self.flow_server.clone()
}
}

View File

@@ -38,6 +38,7 @@ common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true

View File

@@ -35,6 +35,7 @@ use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::FlownodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
@@ -129,6 +130,15 @@ impl GreptimeDbStandaloneBuilder {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build());
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager = KvBackendCatalogManager::new(
@@ -139,7 +149,10 @@ impl GreptimeDbStandaloneBuilder {
)
.await;
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
});
let table_id_sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -193,6 +206,10 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
flownode
.set_frontend_invoker(Box::new(instance.clone()))
.await;
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();