mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
42 Commits
flow/add_a
...
poc_datafl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93fcb7454c | ||
|
|
cfe28b6974 | ||
|
|
9ec6107988 | ||
|
|
e840bb469d | ||
|
|
684850f451 | ||
|
|
76aadb2223 | ||
|
|
f360b2e812 | ||
|
|
138a2aba7f | ||
|
|
8f6462c0b0 | ||
|
|
46d0b3cd64 | ||
|
|
a17a7f4e47 | ||
|
|
50335dd53c | ||
|
|
abaf881f06 | ||
|
|
e1a8215394 | ||
|
|
d7942a1a00 | ||
|
|
a6727e2e8d | ||
|
|
d5bdbedcd6 | ||
|
|
878737f781 | ||
|
|
d88cff6f51 | ||
|
|
e7801abd0c | ||
|
|
d7a132a02f | ||
|
|
a3417f50cf | ||
|
|
099f414f63 | ||
|
|
c22185abce | ||
|
|
e33afa53f4 | ||
|
|
7eaf471808 | ||
|
|
acba753500 | ||
|
|
5736373820 | ||
|
|
74dee25688 | ||
|
|
edcbc89c38 | ||
|
|
e88a40b58b | ||
|
|
c7647759be | ||
|
|
d8a191a2db | ||
|
|
ea40691c71 | ||
|
|
640674b9bc | ||
|
|
3fb3fb18c2 | ||
|
|
1067d3453d | ||
|
|
57e3912aca | ||
|
|
ebcfb0f1d7 | ||
|
|
6442c96847 | ||
|
|
b19febc97c | ||
|
|
8240a1ace1 |
44
Cargo.lock
generated
44
Cargo.lock
generated
@@ -1611,6 +1611,7 @@ dependencies = [
|
||||
"either",
|
||||
"etcd-client",
|
||||
"file-engine",
|
||||
"flow",
|
||||
"frontend",
|
||||
"futures",
|
||||
"human-panic",
|
||||
@@ -2459,6 +2460,16 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ctor"
|
||||
version = "0.1.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.14.4"
|
||||
@@ -3506,11 +3517,16 @@ name = "flow"
|
||||
version = "0.7.2"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
"catalog",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-frontend",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"datafusion-common",
|
||||
@@ -3518,8 +3534,12 @@ dependencies = [
|
||||
"datafusion-substrait",
|
||||
"datatypes",
|
||||
"enum_dispatch",
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"hydroflow",
|
||||
"itertools 0.10.5",
|
||||
"minstant",
|
||||
"nom",
|
||||
"num-traits",
|
||||
"prost 0.12.4",
|
||||
"query",
|
||||
@@ -3529,6 +3549,7 @@ dependencies = [
|
||||
"session",
|
||||
"smallvec",
|
||||
"snafu",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"substrait 0.7.2",
|
||||
"table",
|
||||
@@ -5397,6 +5418,16 @@ dependencies = [
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minstant"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db"
|
||||
dependencies = [
|
||||
"ctor",
|
||||
"web-time 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.11"
|
||||
@@ -10289,6 +10320,7 @@ dependencies = [
|
||||
"datanode",
|
||||
"datatypes",
|
||||
"dotenv",
|
||||
"flow",
|
||||
"frontend",
|
||||
"futures",
|
||||
"futures-util",
|
||||
@@ -11011,7 +11043,7 @@ dependencies = [
|
||||
"tracing-core",
|
||||
"tracing-log 0.2.0",
|
||||
"tracing-subscriber",
|
||||
"web-time",
|
||||
"web-time 0.2.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11769,6 +11801,16 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "web-time"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webbrowser"
|
||||
version = "0.8.15"
|
||||
|
||||
@@ -204,6 +204,7 @@ common-wal = { path = "src/common/wal" }
|
||||
datanode = { path = "src/datanode" }
|
||||
datatypes = { path = "src/datatypes" }
|
||||
file-engine = { path = "src/file-engine" }
|
||||
flow = { path = "src/flow" }
|
||||
frontend = { path = "src/frontend" }
|
||||
index = { path = "src/index" }
|
||||
log-store = { path = "src/log-store" }
|
||||
|
||||
@@ -45,6 +45,7 @@ datatypes.workspace = true
|
||||
either = "1.8"
|
||||
etcd-client.workspace = true
|
||||
file-engine.workspace = true
|
||||
flow.workspace = true
|
||||
frontend.workspace = true
|
||||
futures.workspace = true
|
||||
human-panic = "1.2.2"
|
||||
|
||||
@@ -40,6 +40,7 @@ use common_wal::config::StandaloneWalConfig;
|
||||
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
|
||||
use datanode::datanode::{Datanode, DatanodeBuilder};
|
||||
use file_engine::config::EngineConfig as FileEngineConfig;
|
||||
use flow::FlownodeBuilder;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
|
||||
@@ -405,11 +406,26 @@ impl StartCommand {
|
||||
)
|
||||
.await;
|
||||
|
||||
let table_metadata_manager =
|
||||
Self::create_table_metadata_manager(kv_backend.clone()).await?;
|
||||
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
fe_plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
)
|
||||
.with_kv_backend(kv_backend.clone());
|
||||
let flownode = Arc::new(flow_builder.build().await);
|
||||
|
||||
let builder =
|
||||
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
|
||||
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
|
||||
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager {
|
||||
region_server: datanode.region_server(),
|
||||
flow_server: flownode.clone(),
|
||||
});
|
||||
|
||||
let table_id_sequence = Arc::new(
|
||||
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
|
||||
@@ -427,8 +443,6 @@ impl StartCommand {
|
||||
opts.wal_meta.clone(),
|
||||
kv_backend.clone(),
|
||||
));
|
||||
let table_metadata_manager =
|
||||
Self::create_table_metadata_manager(kv_backend.clone()).await?;
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
|
||||
table_id_sequence,
|
||||
@@ -456,6 +470,12 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
// flow server need to be able to use frontend to write insert requests back
|
||||
flownode
|
||||
.set_frontend_invoker(Box::new(frontend.clone()))
|
||||
.await;
|
||||
let _handle = flownode.clone().run_background();
|
||||
|
||||
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
|
||||
.build()
|
||||
.await
|
||||
|
||||
@@ -128,6 +128,7 @@ impl TableInfoValue {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableInfoManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -9,9 +9,13 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
catalog.workspace = true
|
||||
common-base.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-frontend.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
@@ -19,17 +23,28 @@ datafusion-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
datatypes.workspace = true
|
||||
enum_dispatch = "0.3"
|
||||
futures = "0.3"
|
||||
# This fork is simply for keeping our dependency in our org, and pin the version
|
||||
# it is the same with upstream repo
|
||||
async-trait.workspace = true
|
||||
common-meta.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
itertools.workspace = true
|
||||
minstant = "0.1.7"
|
||||
nom = "7.1.3"
|
||||
num-traits = "0.2"
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
serde.workspace = true
|
||||
servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
strum.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
|
||||
@@ -14,5 +14,648 @@
|
||||
|
||||
//! for getting data from source and sending results to sink
|
||||
//! and communicating with other parts of the database
|
||||
#![warn(unused_imports)]
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use common_base::Plugins;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_frontend::handler::FrontendInvoker;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::{debug, info};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1;
|
||||
use itertools::Itertools;
|
||||
use minstant::Anchor;
|
||||
use query::{QueryEngine, QueryEngineFactory};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{ConcreteDataType, RegionId};
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::{oneshot, Mutex, RwLock};
|
||||
|
||||
use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
|
||||
pub(crate) use crate::adapter::node_context::FlownodeContext;
|
||||
use crate::adapter::parse_expr::parse_fixed;
|
||||
use crate::adapter::table_source::TableSource;
|
||||
use crate::adapter::util::column_schemas_to_proto;
|
||||
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{self, DiffRow, Row};
|
||||
use crate::transform::sql_to_flow_plan;
|
||||
|
||||
pub(crate) mod error;
|
||||
mod flownode_impl;
|
||||
mod parse_expr;
|
||||
mod server;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod util;
|
||||
mod worker;
|
||||
|
||||
pub(crate) mod node_context;
|
||||
mod table_source;
|
||||
|
||||
use error::Error;
|
||||
|
||||
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
|
||||
|
||||
// TODO: refactor common types for flow to a separate module
|
||||
/// FlowId is a unique identifier for a flow task
|
||||
pub type FlowId = u64;
|
||||
pub type TableName = [String; 3];
|
||||
|
||||
/// Options for flow node
|
||||
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct FlownodeOptions {
|
||||
/// rpc address
|
||||
pub rpc_addr: String,
|
||||
}
|
||||
|
||||
/// Flownode Builder
|
||||
pub struct FlownodeBuilder {
|
||||
opts: FlownodeOptions,
|
||||
plugins: Plugins,
|
||||
kv_backend: Option<KvBackendRef>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: Arc<KvBackendCatalogManager>,
|
||||
}
|
||||
|
||||
impl FlownodeBuilder {
|
||||
/// init flownode builder
|
||||
pub fn new(
|
||||
opts: FlownodeOptions,
|
||||
plugins: Plugins,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
catalog_manager: Arc<KvBackendCatalogManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
opts,
|
||||
plugins,
|
||||
kv_backend: None,
|
||||
table_meta,
|
||||
catalog_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// set kv backend
|
||||
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
|
||||
Self {
|
||||
kv_backend: Some(kv_backend),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(discord9): error handling
|
||||
pub async fn build(self) -> FlownodeManager {
|
||||
let query_engine_factory = QueryEngineFactory::new_with_plugins(
|
||||
// query engine in flownode only translate plan with resolved table source.
|
||||
self.catalog_manager.clone(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
self.plugins.clone(),
|
||||
);
|
||||
let query_engine = query_engine_factory.query_engine();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let _handle = std::thread::spawn(move || {
|
||||
let node_id = Some(1);
|
||||
let (flow_node_manager, mut worker) =
|
||||
FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone());
|
||||
let _ = tx.send(flow_node_manager);
|
||||
info!("Flow Worker started in new thread");
|
||||
worker.run();
|
||||
});
|
||||
let man = rx.await.unwrap();
|
||||
info!("Flow Node Manager started");
|
||||
man
|
||||
}
|
||||
}
|
||||
|
||||
/// This function will create a new thread for flow worker and return a handle to the flow node manager
|
||||
pub fn start_flow_node_with_one_worker(
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
) -> FlownodeManager {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let _handle = std::thread::spawn(move || {
|
||||
let node_id = Some(1);
|
||||
let (flow_node_manager, mut worker) =
|
||||
FlownodeManager::new_with_worker(node_id, query_engine, table_meta);
|
||||
let _ = tx.send(flow_node_manager);
|
||||
worker.run();
|
||||
});
|
||||
|
||||
rx.blocking_recv().unwrap()
|
||||
}
|
||||
|
||||
/// Arc-ed FlowNodeManager, cheaper to clone
|
||||
pub type FlownodeManagerRef = Arc<FlownodeManager>;
|
||||
|
||||
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
|
||||
///
|
||||
/// The choice of timestamp is just using current system timestamp for now
|
||||
pub struct FlownodeManager {
|
||||
/// The handler to the worker that will run the dataflow
|
||||
/// which is `!Send` so a handle is used
|
||||
pub worker_handles: Vec<Mutex<WorkerHandle>>,
|
||||
/// The query engine that will be used to parse the query and convert it to a dataflow plan
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
/// Getting table name and table schema from table info manager
|
||||
table_info_source: TableSource,
|
||||
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
|
||||
/// contains mapping from table name to global id, and table schema
|
||||
node_context: Mutex<FlownodeContext>,
|
||||
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
|
||||
tick_manager: FlowTickManager,
|
||||
node_id: Option<u32>,
|
||||
}
|
||||
|
||||
/// Building FlownodeManager
|
||||
impl FlownodeManager {
|
||||
/// set frontend invoker
|
||||
pub async fn set_frontend_invoker(
|
||||
self: &Arc<Self>,
|
||||
frontend: Box<dyn FrontendInvoker + Send + Sync>,
|
||||
) {
|
||||
*self.frontend_invoker.write().await = Some(frontend);
|
||||
}
|
||||
|
||||
/// Create **without** setting `frontend_invoker`
|
||||
pub fn new(
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
) -> Self {
|
||||
let srv_map = TableSource::new(
|
||||
table_meta.table_info_manager().clone(),
|
||||
table_meta.table_name_manager().clone(),
|
||||
);
|
||||
let node_context = FlownodeContext::default();
|
||||
let tick_manager = FlowTickManager::new();
|
||||
let worker_handles = Vec::new();
|
||||
FlownodeManager {
|
||||
worker_handles,
|
||||
query_engine,
|
||||
table_info_source: srv_map,
|
||||
frontend_invoker: RwLock::new(None),
|
||||
node_context: Mutex::new(node_context),
|
||||
flow_err_collectors: Default::default(),
|
||||
tick_manager,
|
||||
node_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a flownode manager with one worker
|
||||
pub fn new_with_worker<'s>(
|
||||
node_id: Option<u32>,
|
||||
query_engine: Arc<dyn QueryEngine>,
|
||||
table_meta: TableMetadataManagerRef,
|
||||
) -> (Self, Worker<'s>) {
|
||||
let mut zelf = Self::new(node_id, query_engine, table_meta);
|
||||
let (handle, worker) = create_worker();
|
||||
zelf.add_worker_handle(handle);
|
||||
(zelf, worker)
|
||||
}
|
||||
|
||||
/// add a worker handler to manager, meaning this corrseponding worker is under it's manage
|
||||
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
|
||||
self.worker_handles.push(Mutex::new(handle));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DiffRequest {
|
||||
Insert(Vec<(Row, repr::Timestamp)>),
|
||||
Delete(Vec<(Row, repr::Timestamp)>),
|
||||
}
|
||||
|
||||
/// iterate through the diff row and form continuous diff row with same diff type
|
||||
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
|
||||
let mut reqs = Vec::new();
|
||||
for (row, ts, diff) in rows {
|
||||
let last = reqs.last_mut();
|
||||
match (last, diff) {
|
||||
(Some(DiffRequest::Insert(rows)), 1) => {
|
||||
rows.push((row, ts));
|
||||
}
|
||||
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
|
||||
(Some(DiffRequest::Delete(rows)), -1) => {
|
||||
rows.push((row, ts));
|
||||
}
|
||||
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
|
||||
(None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
|
||||
(None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
reqs
|
||||
}
|
||||
|
||||
/// This impl block contains methods to send writeback requests to frontend
|
||||
impl FlownodeManager {
|
||||
/// TODO(discord9): merge all same type of diff row into one requests
|
||||
///
|
||||
/// Return the number of requests it made
|
||||
pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
|
||||
let all_reqs = self.generate_writeback_request().await;
|
||||
if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
|
||||
return Ok(0);
|
||||
}
|
||||
let mut req_cnt = 0;
|
||||
for (table_name, reqs) in all_reqs {
|
||||
if reqs.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
|
||||
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
|
||||
// TODO(discord9): instead of auto build table from request schema, actually build table
|
||||
// before `create flow` to be able to assign pk and ts etc.
|
||||
let (primary_keys, schema) = if let Some(table_id) = self
|
||||
.table_info_source
|
||||
.get_table_id_from_name(&table_name)
|
||||
.await?
|
||||
{
|
||||
let table_info = self
|
||||
.table_info_source
|
||||
.get_table_info_value(&table_id)
|
||||
.await?
|
||||
.unwrap();
|
||||
let meta = table_info.table_info.meta;
|
||||
let primary_keys = meta
|
||||
.primary_key_indices
|
||||
.into_iter()
|
||||
.map(|i| meta.schema.column_schemas[i].name.clone())
|
||||
.collect_vec();
|
||||
let schema = meta.schema.column_schemas;
|
||||
(primary_keys, schema)
|
||||
} else {
|
||||
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
|
||||
let (primary_keys, schema) = {
|
||||
let node_ctx = self.node_context.lock().await;
|
||||
let gid: GlobalId = node_ctx
|
||||
.table_repr
|
||||
.get_by_name(&table_name)
|
||||
.map(|x| x.1)
|
||||
.unwrap();
|
||||
let schema = node_ctx
|
||||
.schema
|
||||
.get(&gid)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Table name = {:?}", table_name),
|
||||
})?
|
||||
.clone();
|
||||
// TODO(discord9): use default key from schema
|
||||
let primary_keys = schema
|
||||
.keys
|
||||
.first()
|
||||
.map(|v| {
|
||||
v.column_indices
|
||||
.iter()
|
||||
.map(|i| format!("Col_{i}"))
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let ts_col = ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_time_index(true);
|
||||
|
||||
let wout_ts = schema
|
||||
.column_types
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
|
||||
})
|
||||
.collect_vec();
|
||||
let mut with_ts = wout_ts.clone();
|
||||
with_ts.push(ts_col);
|
||||
(primary_keys, with_ts)
|
||||
};
|
||||
(primary_keys, schema)
|
||||
};
|
||||
|
||||
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
|
||||
|
||||
debug!(
|
||||
"Sending {} writeback requests to table {}, reqs={:?}",
|
||||
reqs.len(),
|
||||
table_name.join("."),
|
||||
reqs
|
||||
);
|
||||
|
||||
for req in reqs {
|
||||
match req {
|
||||
DiffRequest::Insert(insert) => {
|
||||
let rows_proto: Vec<v1::Row> = insert
|
||||
.into_iter()
|
||||
.map(|(mut row, _ts)| {
|
||||
row.extend(Some(Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)));
|
||||
row.into()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let table_name = table_name.last().unwrap().clone();
|
||||
let req = RowInsertRequest {
|
||||
table_name,
|
||||
rows: Some(v1::Rows {
|
||||
schema: proto_schema.clone(),
|
||||
rows: rows_proto,
|
||||
}),
|
||||
};
|
||||
req_cnt += 1;
|
||||
self.frontend_invoker
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: "Expect a frontend invoker for flownode to write back",
|
||||
})?
|
||||
.row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| ExternalSnafu {})?;
|
||||
}
|
||||
DiffRequest::Delete(remove) => {
|
||||
info!("original remove rows={:?}", remove);
|
||||
let rows_proto: Vec<v1::Row> = remove
|
||||
.into_iter()
|
||||
.map(|(mut row, _ts)| {
|
||||
row.extend(Some(Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)));
|
||||
row.into()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let table_name = table_name.last().unwrap().clone();
|
||||
let req = RowDeleteRequest {
|
||||
table_name,
|
||||
rows: Some(v1::Rows {
|
||||
schema: proto_schema.clone(),
|
||||
rows: rows_proto,
|
||||
}),
|
||||
};
|
||||
|
||||
req_cnt += 1;
|
||||
self.frontend_invoker
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: "Expect a frontend invoker for flownode to write back",
|
||||
})?
|
||||
.row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| ExternalSnafu {})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(req_cnt)
|
||||
}
|
||||
|
||||
/// Generate writeback request for all sink table
|
||||
pub async fn generate_writeback_request(&self) -> BTreeMap<TableName, Vec<DiffRequest>> {
|
||||
let mut output = BTreeMap::new();
|
||||
for (name, sink_recv) in self
|
||||
.node_context
|
||||
.lock()
|
||||
.await
|
||||
.sink_receiver
|
||||
.iter_mut()
|
||||
.map(|(n, (_s, r))| (n, r))
|
||||
{
|
||||
let mut rows = Vec::new();
|
||||
while let Ok(row) = sink_recv.try_recv() {
|
||||
rows.push(row);
|
||||
}
|
||||
let reqs = diff_row_to_request(rows);
|
||||
output.insert(name.clone(), reqs);
|
||||
}
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
/// Flow Runtime related methods
|
||||
impl FlownodeManager {
|
||||
/// run in common_runtime background runtime
|
||||
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
|
||||
info!("Starting flownode manager's background task");
|
||||
common_runtime::spawn_bg(async move {
|
||||
self.run().await;
|
||||
})
|
||||
}
|
||||
|
||||
/// log all flow errors
|
||||
pub async fn log_all_errors(&self) {
|
||||
for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
|
||||
let all_errors = f_err.get_all().await;
|
||||
if !all_errors.is_empty() {
|
||||
let all_errors = all_errors
|
||||
.into_iter()
|
||||
.map(|i| format!("{:?}", i))
|
||||
.join("\n");
|
||||
common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trigger dataflow running, and then send writeback request to the source sender
|
||||
///
|
||||
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
|
||||
pub async fn run(&self) {
|
||||
loop {
|
||||
// TODO(discord9): only run when new inputs arrive or scheduled to
|
||||
self.run_available().await;
|
||||
// TODO(discord9): error handling
|
||||
self.send_writeback_requests().await.unwrap();
|
||||
self.log_all_errors().await;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Run all available subgraph in the flow node
|
||||
/// This will try to run all dataflow in this node
|
||||
///
|
||||
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
|
||||
/// TODO(discord9): add flag for subgraph that have input since last run
|
||||
pub async fn run_available(&self) {
|
||||
let now = self.tick_manager.tick();
|
||||
for worker in self.worker_handles.iter() {
|
||||
worker.lock().await.run_available(now).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// send write request to related source sender
|
||||
pub async fn handle_write_request(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
rows: Vec<DiffRow>,
|
||||
) -> Result<(), Error> {
|
||||
let table_id = region_id.table_id();
|
||||
debug!("Send {} rows to table {}", rows.len(), table_id);
|
||||
self.node_context.lock().await.send(table_id, rows)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create&Remove flow
|
||||
impl FlownodeManager {
|
||||
/// remove a flow by it's id
|
||||
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
for handle in self.worker_handles.iter() {
|
||||
let handle = handle.lock().await;
|
||||
if handle.contains_flow(flow_id).await? {
|
||||
handle.remove_flow(flow_id).await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return task id if a new task is created, otherwise return None
|
||||
///
|
||||
/// steps to create task:
|
||||
/// 1. parse query into typed plan(and optional parse expire_when expr)
|
||||
/// 2. render source/sink with output table id and used input table id
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_flow(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
sink_table_name: TableName,
|
||||
source_table_ids: &[TableId],
|
||||
create_if_not_exist: bool,
|
||||
expire_when: Option<String>,
|
||||
comment: Option<String>,
|
||||
sql: String,
|
||||
flow_options: HashMap<String, String>,
|
||||
query_ctx: Option<QueryContext>,
|
||||
) -> Result<Option<FlowId>, Error> {
|
||||
if create_if_not_exist {
|
||||
// check if the task already exists
|
||||
for handle in self.worker_handles.iter() {
|
||||
if handle.lock().await.contains_flow(flow_id).await? {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut node_ctx = self.node_context.lock().await;
|
||||
// assign global id to source and sink table
|
||||
for source in source_table_ids {
|
||||
node_ctx
|
||||
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
|
||||
.await?;
|
||||
}
|
||||
node_ctx
|
||||
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
|
||||
.await?;
|
||||
|
||||
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
|
||||
|
||||
node_ctx.query_context = query_ctx.map(Arc::new);
|
||||
// construct a active dataflow state with it
|
||||
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
|
||||
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
|
||||
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
|
||||
|
||||
let expire_when = expire_when
|
||||
.and_then(|s| {
|
||||
if s.is_empty() || s.split_whitespace().join("").is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
})
|
||||
.map(|d| {
|
||||
let d = d.as_ref();
|
||||
parse_fixed(d)
|
||||
.map(|(_, n)| n)
|
||||
.map_err(|err| err.to_string())
|
||||
})
|
||||
.transpose()
|
||||
.map_err(|err| UnexpectedSnafu { reason: err }.build())?;
|
||||
let _ = comment;
|
||||
let _ = flow_options;
|
||||
|
||||
// TODO(discord9): add more than one handles
|
||||
let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
|
||||
let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
|
||||
|
||||
let source_ids = source_table_ids
|
||||
.iter()
|
||||
.map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
|
||||
.collect_vec();
|
||||
let source_senders = source_ids
|
||||
.iter()
|
||||
.map(|id| node_ctx.get_source_by_global_id(id).map(|s| s.subscribe()))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let err_collector = ErrCollector::default();
|
||||
self.flow_err_collectors
|
||||
.write()
|
||||
.await
|
||||
.insert(flow_id, err_collector.clone());
|
||||
let handle = &self.worker_handles[0].lock().await;
|
||||
handle
|
||||
.create_flow(
|
||||
flow_id,
|
||||
flow_plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
&source_ids,
|
||||
source_senders,
|
||||
expire_when,
|
||||
create_if_not_exist,
|
||||
err_collector,
|
||||
)
|
||||
.await?;
|
||||
info!("Successfully create flow with id={}", flow_id);
|
||||
Ok(Some(flow_id))
|
||||
}
|
||||
}
|
||||
|
||||
/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
|
||||
///
|
||||
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
|
||||
/// TSO coord mess
|
||||
#[derive(Clone)]
|
||||
pub struct FlowTickManager {
|
||||
anchor: Anchor,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FlowTickManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("FlowTickManager").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowTickManager {
|
||||
pub fn new() -> Self {
|
||||
FlowTickManager {
|
||||
anchor: Anchor::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the current timestamp in milliseconds
|
||||
pub fn tick(&self) -> repr::Timestamp {
|
||||
(minstant::Instant::now().as_unix_nanos(&self.anchor) / 1_000_000) as repr::Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,11 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_telemetry::common_error::ext::ErrorExt;
|
||||
use common_telemetry::common_error::status_code::StatusCode;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::define_into_tonic_status;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
@@ -32,6 +31,11 @@ use crate::expr::EvalError;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("External error"))]
|
||||
External {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
/// TODO(discord9): add detailed location of column
|
||||
#[snafu(display("Failed to eval stream"))]
|
||||
Eval {
|
||||
@@ -42,6 +46,13 @@ pub enum Error {
|
||||
#[snafu(display("Table not found: {name}"))]
|
||||
TableNotFound { name: String, location: Location },
|
||||
|
||||
#[snafu(display("Table not found: {msg}, meta error: {source}"))]
|
||||
TableNotFoundMeta {
|
||||
source: common_meta::error::Error,
|
||||
msg: String,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table already exist: {name}"))]
|
||||
TableAlreadyExist { name: String, location: Location },
|
||||
|
||||
@@ -52,6 +63,24 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query plan: {source}"))]
|
||||
InvalidQueryPlan {
|
||||
source: query::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
|
||||
InvalidQueryProst {
|
||||
inner: api::DecodeError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
|
||||
InvalidQuerySubstrait {
|
||||
source: substrait::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query: {reason}"))]
|
||||
InvalidQuery { reason: String, location: Location },
|
||||
|
||||
@@ -80,6 +109,9 @@ pub enum Error {
|
||||
context: String,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected: {reason}"))]
|
||||
Unexpected { reason: String, location: Location },
|
||||
}
|
||||
|
||||
/// Result type for flow module
|
||||
@@ -92,14 +124,20 @@ impl ErrorExt for Error {
|
||||
StatusCode::Internal
|
||||
}
|
||||
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
Self::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
&Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => {
|
||||
StatusCode::PlanQuery
|
||||
Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => {
|
||||
StatusCode::TableNotFound
|
||||
}
|
||||
Self::NoProtoType { .. } => StatusCode::Unexpected,
|
||||
Self::InvalidQueryPlan { .. }
|
||||
| Self::InvalidQuerySubstrait { .. }
|
||||
| Self::InvalidQueryProst { .. }
|
||||
| &Self::InvalidQuery { .. }
|
||||
| &Self::Plan { .. }
|
||||
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected,
|
||||
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
||||
StatusCode::Unsupported
|
||||
}
|
||||
&Self::External { .. } => StatusCode::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
117
src/flow/src/adapter/flownode_impl.rs
Normal file
117
src/flow/src/adapter/flownode_impl.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
|
||||
|
||||
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
|
||||
use common_meta::node_manager::Flownode;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::adapter::FlownodeManager;
|
||||
use crate::repr::{self, DiffRow};
|
||||
|
||||
fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
|
||||
// TODO(discord9): refactor this
|
||||
Err::<(), _>(BoxedError::new(err))
|
||||
.with_context(|_| ExternalSnafu)
|
||||
.unwrap_err()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Flownode for FlownodeManager {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
|
||||
let query_ctx = request
|
||||
.header
|
||||
.and_then(|h| h.query_context)
|
||||
.map(|ctx| ctx.into());
|
||||
match request.body {
|
||||
Some(flow_request::Body::Create(CreateRequest {
|
||||
flow_id: Some(task_id),
|
||||
source_table_ids,
|
||||
sink_table_name: Some(sink_table_name),
|
||||
create_if_not_exists,
|
||||
expire_when,
|
||||
comment,
|
||||
sql,
|
||||
flow_options,
|
||||
})) => {
|
||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
||||
let sink_table_name = [
|
||||
sink_table_name.catalog_name,
|
||||
sink_table_name.schema_name,
|
||||
sink_table_name.table_name,
|
||||
];
|
||||
let ret = self
|
||||
.create_flow(
|
||||
task_id.id as u64,
|
||||
sink_table_name,
|
||||
&source_table_ids,
|
||||
create_if_not_exists,
|
||||
Some(expire_when),
|
||||
Some(comment),
|
||||
sql,
|
||||
flow_options,
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(to_meta_err)?;
|
||||
Ok(FlowResponse {
|
||||
affected_flows: ret
|
||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
||||
.into_iter()
|
||||
.collect_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
Some(flow_request::Body::Drop(DropRequest {
|
||||
flow_id: Some(flow_id),
|
||||
})) => {
|
||||
self.remove_flow(flow_id.id as u64)
|
||||
.await
|
||||
.map_err(to_meta_err)?;
|
||||
Ok(Default::default())
|
||||
}
|
||||
None => UnexpectedSnafu {
|
||||
err_msg: "Missing request body",
|
||||
}
|
||||
.fail(),
|
||||
_ => UnexpectedSnafu {
|
||||
err_msg: "Invalid request body.",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
for write_request in request.requests {
|
||||
let region_id = write_request.region_id;
|
||||
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
|
||||
// TODO(discord9): reconsider time assignment mechanism
|
||||
let now = self.tick_manager.tick();
|
||||
let rows: Vec<DiffRow> = rows_proto
|
||||
.into_iter()
|
||||
.map(repr::Row::from)
|
||||
.map(|r| (r, now, 1))
|
||||
.collect_vec();
|
||||
self.handle_write_request(region_id.into(), rows)
|
||||
.await
|
||||
.map_err(to_meta_err)?;
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
}
|
||||
300
src/flow/src/adapter/node_context.rs
Normal file
300
src/flow/src/adapter/node_context.rs
Normal file
@@ -0,0 +1,300 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Node context, prone to change with every incoming requests
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
|
||||
use crate::adapter::{FlowId, TableName, TableSource};
|
||||
use crate::expr::error::InternalSnafu;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
|
||||
|
||||
/// A context that holds the information of the dataflow
|
||||
#[derive(Default)]
|
||||
pub struct FlownodeContext {
|
||||
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
|
||||
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
|
||||
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
|
||||
pub flow_to_sink: BTreeMap<FlowId, TableName>,
|
||||
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
|
||||
///
|
||||
/// Note that we are getting insert requests with table id, so we should use table id as the key
|
||||
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
|
||||
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
|
||||
///
|
||||
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
|
||||
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
|
||||
pub sink_receiver: BTreeMap<
|
||||
TableName,
|
||||
(
|
||||
mpsc::UnboundedSender<DiffRow>,
|
||||
mpsc::UnboundedReceiver<DiffRow>,
|
||||
),
|
||||
>,
|
||||
/// store source in buffer for each source table, in case broadcast channel is full
|
||||
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
|
||||
/// the schema of the table, query from metasrv or infered from TypedPlan
|
||||
pub schema: HashMap<GlobalId, RelationType>,
|
||||
/// All the tables that have been registered in the worker
|
||||
pub table_repr: IdToNameMap,
|
||||
pub query_context: Option<Arc<QueryContext>>,
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
// return number of rows it actuall send(including what's in the buffer)
|
||||
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
let sender = self
|
||||
.source_sender
|
||||
.get(&table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_id.to_string(),
|
||||
})?;
|
||||
let send_buffer = self.send_buffer.entry(table_id).or_default();
|
||||
send_buffer.extend(rows);
|
||||
let mut row_cnt = 0;
|
||||
while let Some(row) = send_buffer.pop_front() {
|
||||
if sender.len() >= BROADCAST_CAP {
|
||||
break;
|
||||
}
|
||||
row_cnt += 1;
|
||||
sender
|
||||
.send(row)
|
||||
.map_err(|err| {
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Failed to send row to table_id = {:?}, error = {:?}",
|
||||
table_id, err
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
.with_context(|_| EvalSnafu)?;
|
||||
}
|
||||
|
||||
Ok(row_cnt)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
/// mapping source table to task, and sink table to task in worker context
|
||||
///
|
||||
/// also add their corrseponding broadcast sender/receiver
|
||||
pub fn register_task_src_sink(
|
||||
&mut self,
|
||||
task_id: FlowId,
|
||||
source_table_ids: &[TableId],
|
||||
sink_table_name: TableName,
|
||||
) {
|
||||
for source_table_id in source_table_ids {
|
||||
self.add_source_sender(*source_table_id);
|
||||
self.source_to_tasks
|
||||
.entry(*source_table_id)
|
||||
.or_default()
|
||||
.insert(task_id);
|
||||
}
|
||||
|
||||
self.add_sink_receiver(sink_table_name.clone());
|
||||
self.flow_to_sink.insert(task_id, sink_table_name);
|
||||
}
|
||||
|
||||
pub fn add_source_sender(&mut self, table_id: TableId) {
|
||||
self.source_sender
|
||||
.entry(table_id)
|
||||
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
|
||||
}
|
||||
|
||||
pub fn add_sink_receiver(&mut self, table_name: TableName) {
|
||||
self.sink_receiver
|
||||
.entry(table_name)
|
||||
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
|
||||
}
|
||||
|
||||
pub fn get_source_by_global_id(
|
||||
&self,
|
||||
id: &GlobalId,
|
||||
) -> Result<&broadcast::Sender<DiffRow>, Error> {
|
||||
let table_id = self
|
||||
.table_repr
|
||||
.get_by_global_id(id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Global Id = {:?}", id),
|
||||
})?
|
||||
.1
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Table Id = {:?}", id),
|
||||
})?;
|
||||
self.source_sender
|
||||
.get(&table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_id.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_sink_by_global_id(
|
||||
&self,
|
||||
id: &GlobalId,
|
||||
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
|
||||
let table_name = self
|
||||
.table_repr
|
||||
.get_by_global_id(id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("{:?}", id),
|
||||
})?
|
||||
.0
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Global Id = {:?}", id),
|
||||
})?;
|
||||
self.sink_receiver
|
||||
.get(&table_name)
|
||||
.map(|(s, _r)| s.clone())
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_name.join("."),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
|
||||
///
|
||||
/// Returns an error if no table has been registered with the provided names
|
||||
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
|
||||
let id = self
|
||||
.table_repr
|
||||
.get_by_name(name)
|
||||
.map(|(_tid, gid)| gid)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: name.join("."),
|
||||
})?;
|
||||
let schema = self
|
||||
.schema
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: name.join("."),
|
||||
})?;
|
||||
Ok((id, schema))
|
||||
}
|
||||
|
||||
/// Assign a global id to a table, if already assigned, return the existing global id
|
||||
///
|
||||
/// require at least one of `table_name` or `table_id` to be `Some`
|
||||
///
|
||||
/// and will try to fetch the schema from table info manager(if table exist now)
|
||||
///
|
||||
/// NOTE: this will not actually render the table into collection refered as GlobalId
|
||||
/// merely creating a mapping from table id to global id
|
||||
pub async fn assign_global_id_to_table(
|
||||
&mut self,
|
||||
srv_map: &TableSource,
|
||||
mut table_name: Option<TableName>,
|
||||
table_id: Option<TableId>,
|
||||
) -> Result<GlobalId, Error> {
|
||||
// if we can find by table name/id. not assign it
|
||||
if let Some(gid) = table_name
|
||||
.as_ref()
|
||||
.and_then(|table_name| self.table_repr.get_by_name(table_name))
|
||||
.map(|(_, gid)| gid)
|
||||
.or_else(|| {
|
||||
table_id
|
||||
.and_then(|id| self.table_repr.get_by_table_id(&id))
|
||||
.map(|(_, gid)| gid)
|
||||
})
|
||||
{
|
||||
Ok(gid)
|
||||
} else {
|
||||
let global_id = self.new_global_id();
|
||||
|
||||
if let Some(table_id) = table_id {
|
||||
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
|
||||
table_name = table_name.or(Some(known_table_name));
|
||||
self.schema.insert(global_id, schema);
|
||||
} // if we don't have table id, it means database havn't assign one yet or we don't need it
|
||||
|
||||
self.table_repr.insert(table_name, table_id, global_id);
|
||||
Ok(global_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Assign a schema to a table
|
||||
///
|
||||
/// TODO(discord9): error handling
|
||||
pub fn assign_table_schema(
|
||||
&mut self,
|
||||
table_name: &TableName,
|
||||
schema: RelationType,
|
||||
) -> Result<(), Error> {
|
||||
let gid = self
|
||||
.table_repr
|
||||
.get_by_name(table_name)
|
||||
.map(|(_, gid)| gid)
|
||||
.unwrap();
|
||||
self.schema.insert(gid, schema);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a new global id
|
||||
pub fn new_global_id(&self) -> GlobalId {
|
||||
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
/// A tri-directional map that maps table name, table id, and global id
|
||||
#[derive(Default, Debug)]
|
||||
pub struct IdToNameMap {
|
||||
name_to_global_id: HashMap<TableName, GlobalId>,
|
||||
id_to_global_id: HashMap<TableId, GlobalId>,
|
||||
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
|
||||
}
|
||||
|
||||
impl IdToNameMap {
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
|
||||
name.clone()
|
||||
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
|
||||
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
|
||||
self.global_id_to_name_id.insert(global_id, (name, id));
|
||||
}
|
||||
|
||||
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
|
||||
self.name_to_global_id.get(name).map(|global_id| {
|
||||
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
|
||||
(*id, *global_id)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
|
||||
self.id_to_global_id.get(id).map(|global_id| {
|
||||
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
|
||||
(name.clone(), *global_id)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_by_global_id(
|
||||
&self,
|
||||
global_id: &GlobalId,
|
||||
) -> Option<(Option<TableName>, Option<TableId>)> {
|
||||
self.global_id_to_name_id.get(global_id).cloned()
|
||||
}
|
||||
}
|
||||
245
src/flow/src/adapter/parse_expr.rs
Normal file
245
src/flow/src/adapter/parse_expr.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! parse expr like "ts <= now() - interval '5 m'"
|
||||
|
||||
use nom::branch::alt;
|
||||
use nom::bytes::complete::{tag, tag_no_case};
|
||||
use nom::character::complete::{alphanumeric1, digit0, multispace0};
|
||||
use nom::combinator::peek;
|
||||
use nom::sequence::tuple;
|
||||
use nom::IResult;
|
||||
|
||||
use crate::repr;
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration() {
|
||||
let input = "1 h 5 m 42 second";
|
||||
let (remain, ttl) = parse_duration(input).unwrap();
|
||||
assert_eq!(remain, "");
|
||||
assert_eq!(ttl, (3600 + 5 * 60 + 42) * 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_fixed() {
|
||||
let input = "timestamp < now() - INTERVAL '5m 42s'";
|
||||
let (remain, ttl) = parse_fixed(input).unwrap();
|
||||
assert_eq!(remain, "");
|
||||
assert_eq!(ttl, (5 * 60 + 42) * 1000);
|
||||
}
|
||||
|
||||
pub fn parse_fixed(input: &str) -> IResult<&str, i64> {
|
||||
let (r, _) = tuple((
|
||||
multispace0,
|
||||
tag_no_case("timestamp"),
|
||||
multispace0,
|
||||
tag("<"),
|
||||
multispace0,
|
||||
tag_no_case("now()"),
|
||||
multispace0,
|
||||
tag("-"),
|
||||
multispace0,
|
||||
tag_no_case("interval"),
|
||||
multispace0,
|
||||
))(input)?;
|
||||
tuple((tag("'"), parse_duration, tag("'")))(r).map(|(r, (_, ttl, _))| (r, ttl))
|
||||
}
|
||||
|
||||
/// parse duration and return ttl, currently only support time part of psql interval type
|
||||
pub fn parse_duration(input: &str) -> IResult<&str, i64> {
|
||||
let mut intervals = vec![];
|
||||
let mut remain = input;
|
||||
while peek(parse_quality)(remain).is_ok() {
|
||||
let (r, number) = parse_quality(remain)?;
|
||||
let (r, unit) = parse_time_unit(r)?;
|
||||
intervals.push((number, unit));
|
||||
remain = r;
|
||||
}
|
||||
let mut total = 0;
|
||||
for (number, unit) in intervals {
|
||||
let number = match unit {
|
||||
TimeUnit::Second => number,
|
||||
TimeUnit::Minute => number * 60,
|
||||
TimeUnit::Hour => number * 60 * 60,
|
||||
};
|
||||
total += number;
|
||||
}
|
||||
total *= 1000;
|
||||
Ok((remain, total))
|
||||
}
|
||||
|
||||
enum Expr {
|
||||
Col(String),
|
||||
Now,
|
||||
Duration(repr::Duration),
|
||||
Binary {
|
||||
left: Box<Expr>,
|
||||
op: String,
|
||||
right: Box<Expr>,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_expr(input: &str) -> IResult<&str, Expr> {
|
||||
parse_expr_bp(input, 0)
|
||||
}
|
||||
|
||||
/// a simple pratt parser
|
||||
fn parse_expr_bp(input: &str, min_bp: u8) -> IResult<&str, Expr> {
|
||||
let (mut input, mut lhs): (&str, Expr) = parse_item(input)?;
|
||||
loop {
|
||||
let (r, op) = parse_op(input)?;
|
||||
let (_, (l_bp, r_bp)) = infix_binding_power(op)?;
|
||||
if l_bp < min_bp {
|
||||
return Ok((input, lhs));
|
||||
}
|
||||
let (r, rhs) = parse_expr_bp(r, r_bp)?;
|
||||
input = r;
|
||||
lhs = Expr::Binary {
|
||||
left: Box::new(lhs),
|
||||
op: op.to_string(),
|
||||
right: Box::new(rhs),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_op(input: &str) -> IResult<&str, &str> {
|
||||
alt((parse_add_sub, parse_cmp))(input)
|
||||
}
|
||||
|
||||
fn parse_item(input: &str) -> IResult<&str, Expr> {
|
||||
if let Ok((r, name)) = parse_col_name(input) {
|
||||
Ok((r, Expr::Col(name.to_string())))
|
||||
} else if let Ok((r, _now)) = parse_now(input) {
|
||||
Ok((r, Expr::Now))
|
||||
} else if let Ok((_r, _num)) = parse_quality(input) {
|
||||
todo!()
|
||||
} else {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
fn infix_binding_power(op: &str) -> IResult<&str, (u8, u8)> {
|
||||
let ret = match op {
|
||||
"<" | ">" | "<=" | ">=" => (1, 2),
|
||||
"+" | "-" => (3, 4),
|
||||
_ => {
|
||||
return Err(nom::Err::Error(nom::error::Error::new(
|
||||
op,
|
||||
nom::error::ErrorKind::Fail,
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok((op, ret))
|
||||
}
|
||||
|
||||
fn parse_col_name(input: &str) -> IResult<&str, &str> {
|
||||
tuple((multispace0, alphanumeric1, multispace0))(input).map(|(r, (_, name, _))| (r, name))
|
||||
}
|
||||
|
||||
fn parse_now(input: &str) -> IResult<&str, &str> {
|
||||
tag_no_case("now()")(input)
|
||||
}
|
||||
|
||||
fn parse_add_sub(input: &str) -> IResult<&str, &str> {
|
||||
tuple((multispace0, alt((tag("+"), tag("-"))), multispace0))(input)
|
||||
.map(|(r, (_, op, _))| (r, op))
|
||||
}
|
||||
|
||||
fn parse_cmp(input: &str) -> IResult<&str, &str> {
|
||||
tuple((
|
||||
multispace0,
|
||||
alt((tag("<="), tag(">="), tag("<"), tag(">"))),
|
||||
multispace0,
|
||||
))(input)
|
||||
.map(|(r, (_, op, _))| (r, op))
|
||||
}
|
||||
|
||||
/// parse a number with optional sign
|
||||
fn parse_quality(input: &str) -> IResult<&str, repr::Duration> {
|
||||
tuple((
|
||||
multispace0,
|
||||
alt((tag("+"), tag("-"), tag(""))),
|
||||
digit0,
|
||||
multispace0,
|
||||
))(input)
|
||||
.map(|(r, (_, sign, name, _))| (r, sign, name))
|
||||
.and_then(|(r, sign, name)| {
|
||||
let num = name.parse::<repr::Duration>().map_err(|_| {
|
||||
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
|
||||
})?;
|
||||
let num = match sign {
|
||||
"+" => num,
|
||||
"-" => -num,
|
||||
_ => num,
|
||||
};
|
||||
Ok((r, num))
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum TimeUnit {
|
||||
Second,
|
||||
Minute,
|
||||
Hour,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum DateUnit {
|
||||
Day,
|
||||
Month,
|
||||
Year,
|
||||
}
|
||||
|
||||
fn parse_time_unit(input: &str) -> IResult<&str, TimeUnit> {
|
||||
fn to_second(input: &str) -> IResult<&str, TimeUnit> {
|
||||
alt((
|
||||
tag_no_case("second"),
|
||||
tag_no_case("seconds"),
|
||||
tag_no_case("S"),
|
||||
))(input)
|
||||
.map(move |(r, _)| (r, TimeUnit::Second))
|
||||
}
|
||||
fn to_minute(input: &str) -> IResult<&str, TimeUnit> {
|
||||
alt((
|
||||
tag_no_case("minute"),
|
||||
tag_no_case("minutes"),
|
||||
tag_no_case("m"),
|
||||
))(input)
|
||||
.map(move |(r, _)| (r, TimeUnit::Minute))
|
||||
}
|
||||
fn to_hour(input: &str) -> IResult<&str, TimeUnit> {
|
||||
alt((tag_no_case("hour"), tag_no_case("hours"), tag_no_case("h")))(input)
|
||||
.map(move |(r, _)| (r, TimeUnit::Hour))
|
||||
}
|
||||
|
||||
tuple((
|
||||
multispace0,
|
||||
alt((
|
||||
to_second, to_minute,
|
||||
to_hour, /*
|
||||
tag_no_case("day"),
|
||||
tag_no_case("days"),
|
||||
tag_no_case("d"),
|
||||
tag_no_case("month"),
|
||||
tag_no_case("months"),
|
||||
tag_no_case("m"),
|
||||
tag_no_case("year"),
|
||||
tag_no_case("years"),
|
||||
tag_no_case("y"),
|
||||
*/
|
||||
)),
|
||||
multispace0,
|
||||
))(input)
|
||||
.map(|(r, (_, unit, _))| (r, unit))
|
||||
}
|
||||
147
src/flow/src/adapter/server.rs
Normal file
147
src/flow/src/adapter/server.rs
Normal file
@@ -0,0 +1,147 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Implementation of grpc service for flow node
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use common_meta::node_manager::Flownode;
|
||||
use common_telemetry::tracing::info;
|
||||
use futures::FutureExt;
|
||||
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
|
||||
use itertools::Itertools;
|
||||
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{oneshot, Mutex};
|
||||
use tonic::transport::server::TcpIncoming;
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
use crate::adapter::FlownodeManagerRef;
|
||||
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
|
||||
|
||||
/// wrapping flow node manager to avoid orphan rule with Arc<...>
|
||||
#[derive(Clone)]
|
||||
pub struct FlowService {
|
||||
pub manager: FlownodeManagerRef,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl flow_server::Flow for FlowService {
|
||||
async fn handle_create_remove(
|
||||
&self,
|
||||
request: Request<FlowRequest>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
self.manager
|
||||
.handle(request)
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(|e| {
|
||||
let msg = format!("failed to handle request: {:?}", e);
|
||||
Status::internal(msg)
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_mirror_request(
|
||||
&self,
|
||||
request: Request<InsertRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
// TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
|
||||
let request = api::v1::region::InsertRequests {
|
||||
requests: request
|
||||
.requests
|
||||
.into_iter()
|
||||
.map(|insert| api::v1::region::InsertRequest {
|
||||
region_id: insert.region_id,
|
||||
rows: insert.rows,
|
||||
})
|
||||
.collect_vec(),
|
||||
};
|
||||
self.manager
|
||||
.handle_inserts(request)
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(|e| {
|
||||
let msg = format!("failed to handle request: {:?}", e);
|
||||
Status::internal(msg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FlownodeServer {
|
||||
pub shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
|
||||
pub flow_service: FlowService,
|
||||
}
|
||||
|
||||
impl FlownodeServer {
|
||||
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
|
||||
flow_server::FlowServer::new(self.flow_service.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl servers::server::Server for FlownodeServer {
|
||||
async fn shutdown(&self) -> Result<(), servers::error::Error> {
|
||||
let mut shutdown_tx = self.shutdown_tx.lock().await;
|
||||
if let Some(tx) = shutdown_tx.take() {
|
||||
if tx.send(()).is_err() {
|
||||
info!("Receiver dropped, the flow node server has already shutdown");
|
||||
}
|
||||
}
|
||||
info!("Shutdown flow node server");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
|
||||
let (tx, rx) = oneshot::channel::<()>();
|
||||
let (incoming, addr) = {
|
||||
let mut shutdown_tx = self.shutdown_tx.lock().await;
|
||||
ensure!(
|
||||
shutdown_tx.is_none(),
|
||||
AlreadyStartedSnafu { server: "flow" }
|
||||
);
|
||||
let listener = TcpListener::bind(addr)
|
||||
.await
|
||||
.context(TcpBindSnafu { addr })?;
|
||||
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
|
||||
let incoming =
|
||||
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
|
||||
info!("flow server is bound to {}", addr);
|
||||
|
||||
*shutdown_tx = Some(tx);
|
||||
|
||||
(incoming, addr)
|
||||
};
|
||||
|
||||
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let _result = builder
|
||||
.serve_with_incoming_shutdown(incoming, rx.map(drop))
|
||||
.await
|
||||
.context(StartGrpcSnafu);
|
||||
});
|
||||
|
||||
// TODO(discord9): better place for dataflow to run per second
|
||||
let manager_ref = self.flow_service.manager.clone();
|
||||
let _handle = manager_ref.clone().run_background();
|
||||
|
||||
Ok(addr)
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
FLOW_NODE_SERVER_NAME
|
||||
}
|
||||
}
|
||||
139
src/flow/src/adapter/table_source.rs
Normal file
139
src/flow/src/adapter/table_source.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! How to query table information from database
|
||||
|
||||
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
|
||||
use common_meta::key::table_name::{TableNameKey, TableNameManager};
|
||||
use itertools::Itertools;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::adapter::error::{Error, TableNotFoundMetaSnafu, TableNotFoundSnafu};
|
||||
use crate::adapter::TableName;
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
|
||||
/// mapping of table name <-> table id should be query from tableinfo manager
|
||||
pub struct TableSource {
|
||||
/// for query `TableId -> TableName` mapping
|
||||
table_info_manager: TableInfoManager,
|
||||
table_name_manager: TableNameManager,
|
||||
}
|
||||
|
||||
impl TableSource {
|
||||
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
|
||||
TableSource {
|
||||
table_info_manager,
|
||||
table_name_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_table_id_from_proto_name(
|
||||
&self,
|
||||
name: &greptime_proto::v1::TableName,
|
||||
) -> Result<TableId, Error> {
|
||||
self.table_name_manager
|
||||
.get(TableNameKey::new(
|
||||
&name.catalog_name,
|
||||
&name.schema_name,
|
||||
&name.table_name,
|
||||
))
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})
|
||||
.map(|id| id.unwrap().table_id())
|
||||
}
|
||||
|
||||
/// If the table havn't been created in database, the tableId returned would be null
|
||||
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
|
||||
let ret = self
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})?
|
||||
.map(|id| id.table_id());
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// query metasrv about the table name and table id
|
||||
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
|
||||
self.table_info_manager
|
||||
.get(*table_id)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("TableId = {:?}, couldn't found table name", table_id),
|
||||
})
|
||||
.map(|name| name.unwrap().table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
|
||||
}
|
||||
/// query metasrv about the table name and table id
|
||||
pub async fn get_table_info_value(
|
||||
&self,
|
||||
table_id: &TableId,
|
||||
) -> Result<Option<TableInfoValue>, Error> {
|
||||
Ok(self
|
||||
.table_info_manager
|
||||
.get(*table_id)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("TableId = {:?}, couldn't found table name", table_id),
|
||||
})?
|
||||
.map(|v| v.into_inner()))
|
||||
}
|
||||
|
||||
pub async fn get_table_name_schema(
|
||||
&self,
|
||||
table_id: &TableId,
|
||||
) -> Result<(TableName, RelationType), Error> {
|
||||
let table_info_value = self
|
||||
.get_table_info_value(table_id)
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("TableId = {:?}, Can't found table info", table_id),
|
||||
})?;
|
||||
|
||||
let table_name = table_info_value.table_name();
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
|
||||
let raw_schema = table_info_value.table_info.meta.schema;
|
||||
let column_types = raw_schema
|
||||
.column_schemas
|
||||
.into_iter()
|
||||
.map(|col| ColumnType {
|
||||
nullable: col.is_nullable(),
|
||||
scalar_type: col.data_type,
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let key = table_info_value.table_info.meta.primary_key_indices;
|
||||
let keys = vec![repr::Key::from(key)];
|
||||
|
||||
let time_index = raw_schema.timestamp_index;
|
||||
Ok((
|
||||
table_name,
|
||||
RelationType {
|
||||
column_types,
|
||||
keys,
|
||||
time_index,
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
64
src/flow/src/adapter/tests.rs
Normal file
64
src/flow/src/adapter/tests.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Mock test for adapter module
|
||||
//! TODO(discord9): write mock test
|
||||
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use store_api::storage::ConcreteDataType;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
|
||||
|
||||
use super::*;
|
||||
|
||||
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
table_name: &str,
|
||||
region_numbers: I,
|
||||
) -> TableInfo {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
];
|
||||
let schema = SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.version(123)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let meta = TableMetaBuilder::default()
|
||||
.schema(Arc::new(schema))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
|
||||
.build()
|
||||
.unwrap();
|
||||
TableInfoBuilder::default()
|
||||
.table_id(table_id)
|
||||
.table_version(5)
|
||||
.name(table_name)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Create a mock harness for flow node manager
|
||||
///
|
||||
/// containing several default table info and schema
|
||||
fn mock_harness_flow_node_manager() {}
|
||||
60
src/flow/src/adapter/util.rs
Normal file
60
src/flow/src/adapter/util.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
|
||||
use common_error::ext::BoxedError;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::adapter::error::{Error, ExternalSnafu};
|
||||
|
||||
/// convert `ColumnSchema` lists to it's corrsponding proto type
|
||||
pub fn column_schemas_to_proto(
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
primary_keys: &[String],
|
||||
) -> Result<Vec<api::v1::ColumnSchema>, Error> {
|
||||
let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
|
||||
.iter()
|
||||
.map(|c| {
|
||||
ColumnDataTypeWrapper::try_from(c.data_type.clone())
|
||||
.map(|w| w.to_parts())
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| ExternalSnafu)
|
||||
})
|
||||
.try_collect()?;
|
||||
|
||||
let ret = column_schemas
|
||||
.iter()
|
||||
.zip(column_datatypes)
|
||||
.map(|(schema, datatype)| {
|
||||
let semantic_type = if schema.is_time_index() {
|
||||
SemanticType::Timestamp
|
||||
} else if primary_keys.contains(&schema.name) {
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field
|
||||
} as i32;
|
||||
|
||||
api::v1::ColumnSchema {
|
||||
column_name: schema.name.clone(),
|
||||
datatype: datatype.0 as i32,
|
||||
semantic_type,
|
||||
datatype_extension: datatype.1,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(ret)
|
||||
}
|
||||
509
src/flow/src/adapter/worker.rs
Normal file
509
src/flow/src/adapter/worker.rs
Normal file
@@ -0,0 +1,509 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! For single-thread flow worker
|
||||
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::{broadcast, mpsc, Mutex};
|
||||
|
||||
use crate::adapter::error::{Error, EvalSnafu};
|
||||
use crate::adapter::FlowId;
|
||||
use crate::compute::{Context, DataflowState, ErrCollector};
|
||||
use crate::expr::error::InternalSnafu;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::plan::TypedPlan;
|
||||
use crate::repr::{self, DiffRow};
|
||||
|
||||
pub type SharedBuf = Arc<Mutex<VecDeque<DiffRow>>>;
|
||||
|
||||
/// Create both worker(`!Send`) and worker handle(`Send + Sync`)
|
||||
pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
|
||||
let (itc_client, itc_server) = create_inter_thread_call();
|
||||
let worker_handle = WorkerHandle {
|
||||
itc_client: Mutex::new(itc_client),
|
||||
};
|
||||
let worker = Worker {
|
||||
task_states: BTreeMap::new(),
|
||||
itc_server: Arc::new(Mutex::new(itc_server)),
|
||||
};
|
||||
(worker_handle, worker)
|
||||
}
|
||||
|
||||
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
|
||||
|
||||
pub(crate) struct ActiveDataflowState<'subgraph> {
|
||||
df: Hydroflow<'subgraph>,
|
||||
state: DataflowState,
|
||||
err_collector: ErrCollector,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ActiveDataflowState<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ActiveDataflowState")
|
||||
.field("df", &"<Hydroflow>")
|
||||
.field("state", &self.state)
|
||||
.field("err_collector", &self.err_collector)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ActiveDataflowState<'_> {
|
||||
fn default() -> Self {
|
||||
ActiveDataflowState {
|
||||
df: Hydroflow::new(),
|
||||
state: DataflowState::default(),
|
||||
err_collector: ErrCollector::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'subgraph> ActiveDataflowState<'subgraph> {
|
||||
/// Create a new render context, assigned with given global id
|
||||
pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph>
|
||||
where
|
||||
'subgraph: 'ctx,
|
||||
{
|
||||
Context {
|
||||
id: global_id,
|
||||
df: &mut self.df,
|
||||
compute_state: &mut self.state,
|
||||
err_collector: self.err_collector.clone(),
|
||||
input_collection: Default::default(),
|
||||
local_scope: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_current_ts(&mut self, ts: repr::Timestamp) {
|
||||
self.state.set_current_ts(ts);
|
||||
}
|
||||
|
||||
/// Run all available subgraph
|
||||
///
|
||||
/// return true if any subgraph actually executed
|
||||
pub fn run_available(&mut self) -> bool {
|
||||
self.state.run_available_with_schedule(&mut self.df)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WorkerHandle {
|
||||
itc_client: Mutex<InterThreadCallClient>,
|
||||
}
|
||||
|
||||
impl WorkerHandle {
|
||||
/// create task, return task id
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_flow(
|
||||
&self,
|
||||
task_id: FlowId,
|
||||
plan: TypedPlan,
|
||||
sink_id: GlobalId,
|
||||
sink_sender: mpsc::UnboundedSender<DiffRow>,
|
||||
source_ids: &[GlobalId],
|
||||
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
|
||||
expire_when: Option<repr::Duration>,
|
||||
create_if_not_exist: bool,
|
||||
err_collector: ErrCollector,
|
||||
) -> Result<Option<FlowId>, Error> {
|
||||
let req = Request::Create {
|
||||
task_id,
|
||||
plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
source_ids: source_ids.to_vec(),
|
||||
src_recvs,
|
||||
expire_when,
|
||||
create_if_not_exist,
|
||||
err_collector,
|
||||
};
|
||||
|
||||
let ret = self.itc_client.lock().await.call_blocking(req).await?;
|
||||
if let Response::Create {
|
||||
result: task_create_result,
|
||||
} = ret
|
||||
{
|
||||
task_create_result
|
||||
} else {
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Flow Node/Worker itc failed, expect Response::Create, found {ret:?}"
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
.with_context(|_| EvalSnafu {})
|
||||
}
|
||||
}
|
||||
|
||||
/// remove task, return task id
|
||||
pub async fn remove_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
let req = Request::Remove { task_id };
|
||||
let ret = self.itc_client.lock().await.call_blocking(req).await?;
|
||||
if let Response::Remove { result } = ret {
|
||||
Ok(result)
|
||||
} else {
|
||||
InternalSnafu {
|
||||
reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"),
|
||||
}
|
||||
.fail()
|
||||
.with_context(|_| EvalSnafu {})
|
||||
}
|
||||
}
|
||||
|
||||
/// trigger running the worker, will not block, and will run the worker parallelly
|
||||
///
|
||||
/// will set the current timestamp to `now` for all dataflows before running them
|
||||
pub async fn run_available(&self, now: repr::Timestamp) {
|
||||
self.itc_client
|
||||
.lock()
|
||||
.await
|
||||
.call_non_blocking(Request::RunAvail { now })
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn contains_flow(&self, task_id: FlowId) -> Result<bool, Error> {
|
||||
let req = Request::ContainTask { task_id };
|
||||
let ret = self
|
||||
.itc_client
|
||||
.lock()
|
||||
.await
|
||||
.call_blocking(req)
|
||||
.await
|
||||
.unwrap();
|
||||
if let Response::ContainTask {
|
||||
result: task_contain_result,
|
||||
} = ret
|
||||
{
|
||||
Ok(task_contain_result)
|
||||
} else {
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}"
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
.with_context(|_| EvalSnafu {})
|
||||
}
|
||||
}
|
||||
|
||||
/// shutdown the worker
|
||||
pub async fn shutdown(&self) {
|
||||
self.itc_client
|
||||
.lock()
|
||||
.await
|
||||
.call_non_blocking(Request::Shutdown)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual worker that does the work and contain active state
|
||||
#[derive(Debug)]
|
||||
pub struct Worker<'subgraph> {
|
||||
/// Task states
|
||||
pub(crate) task_states: BTreeMap<FlowId, ActiveDataflowState<'subgraph>>,
|
||||
itc_server: Arc<Mutex<InterThreadCallServer>>,
|
||||
}
|
||||
|
||||
impl<'s> Worker<'s> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn create_flow(
|
||||
&mut self,
|
||||
task_id: FlowId,
|
||||
plan: TypedPlan,
|
||||
sink_id: GlobalId,
|
||||
sink_sender: mpsc::UnboundedSender<DiffRow>,
|
||||
source_ids: &[GlobalId],
|
||||
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
|
||||
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
|
||||
expire_when: Option<repr::Duration>,
|
||||
create_if_not_exist: bool,
|
||||
err_collector: ErrCollector,
|
||||
) -> Result<Option<FlowId>, Error> {
|
||||
let _ = expire_when;
|
||||
if create_if_not_exist {
|
||||
// check if the task already exists
|
||||
if self.task_states.contains_key(&task_id) {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
let mut cur_task_state = ActiveDataflowState::<'s> {
|
||||
err_collector,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
{
|
||||
let mut ctx = cur_task_state.new_ctx(sink_id);
|
||||
for (source_id, src_recv) in source_ids.iter().zip(src_recvs) {
|
||||
let bundle = ctx.render_source(src_recv)?;
|
||||
ctx.insert_global(*source_id, bundle);
|
||||
}
|
||||
|
||||
let rendered = ctx.render_plan(plan.plan)?;
|
||||
ctx.render_unbounded_sink(rendered, sink_sender);
|
||||
}
|
||||
self.task_states.insert(task_id, cur_task_state);
|
||||
Ok(Some(task_id))
|
||||
}
|
||||
|
||||
/// remove task, return true if a task is removed
|
||||
pub fn remove_flow(&mut self, task_id: FlowId) -> bool {
|
||||
self.task_states.remove(&task_id).is_some()
|
||||
}
|
||||
|
||||
/// Run the worker, blocking, until shutdown signal is received
|
||||
pub fn run(&mut self) {
|
||||
loop {
|
||||
let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap();
|
||||
let ret = self.handle_req(req_id, req);
|
||||
match ret {
|
||||
Ok(Some((id, resp))) => {
|
||||
self.itc_server.blocking_lock().resp(id, resp);
|
||||
}
|
||||
Ok(None) => continue,
|
||||
Err(()) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// run with tick acquired from tick manager(usually means system time)
|
||||
/// TODO(discord9): better tick management
|
||||
pub fn run_tick(&mut self, now: repr::Timestamp) {
|
||||
for (_task_id, task_state) in self.task_states.iter_mut() {
|
||||
task_state.set_current_ts(now);
|
||||
task_state.run_available();
|
||||
}
|
||||
}
|
||||
/// handle request, return response if any, Err if receive shutdown signal
|
||||
fn handle_req(&mut self, req_id: usize, req: Request) -> Result<Option<(usize, Response)>, ()> {
|
||||
let ret = match req {
|
||||
Request::Create {
|
||||
task_id,
|
||||
plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
source_ids,
|
||||
src_recvs,
|
||||
expire_when,
|
||||
create_if_not_exist,
|
||||
err_collector,
|
||||
} => {
|
||||
let task_create_result = self.create_flow(
|
||||
task_id,
|
||||
plan,
|
||||
sink_id,
|
||||
sink_sender,
|
||||
&source_ids,
|
||||
src_recvs,
|
||||
expire_when,
|
||||
create_if_not_exist,
|
||||
err_collector,
|
||||
);
|
||||
Some((
|
||||
req_id,
|
||||
Response::Create {
|
||||
result: task_create_result,
|
||||
},
|
||||
))
|
||||
}
|
||||
Request::Remove { task_id } => {
|
||||
let ret = self.remove_flow(task_id);
|
||||
Some((req_id, Response::Remove { result: ret }))
|
||||
}
|
||||
Request::RunAvail { now } => {
|
||||
self.run_tick(now);
|
||||
None
|
||||
}
|
||||
Request::ContainTask { task_id } => {
|
||||
let ret = self.task_states.contains_key(&task_id);
|
||||
Some((req_id, Response::ContainTask { result: ret }))
|
||||
}
|
||||
Request::Shutdown => return Err(()),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Request {
|
||||
Create {
|
||||
task_id: FlowId,
|
||||
plan: TypedPlan,
|
||||
sink_id: GlobalId,
|
||||
sink_sender: mpsc::UnboundedSender<DiffRow>,
|
||||
source_ids: Vec<GlobalId>,
|
||||
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
|
||||
expire_when: Option<repr::Duration>,
|
||||
create_if_not_exist: bool,
|
||||
err_collector: ErrCollector,
|
||||
},
|
||||
Remove {
|
||||
task_id: FlowId,
|
||||
},
|
||||
/// Trigger the worker to run, useful after input buffer is full
|
||||
RunAvail {
|
||||
now: repr::Timestamp,
|
||||
},
|
||||
ContainTask {
|
||||
task_id: FlowId,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Response {
|
||||
Create {
|
||||
result: Result<Option<FlowId>, Error>,
|
||||
// TODO(discord9): add flow err_collector
|
||||
},
|
||||
Remove {
|
||||
result: bool,
|
||||
},
|
||||
ContainTask {
|
||||
result: bool,
|
||||
},
|
||||
}
|
||||
|
||||
fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) {
|
||||
let (arg_send, arg_recv) = mpsc::unbounded_channel();
|
||||
let (ret_send, ret_recv) = mpsc::unbounded_channel();
|
||||
let client = InterThreadCallClient {
|
||||
call_id: Arc::new(Mutex::new(0)),
|
||||
arg_sender: arg_send,
|
||||
ret_recv,
|
||||
};
|
||||
let server = InterThreadCallServer {
|
||||
arg_recv,
|
||||
ret_sender: ret_send,
|
||||
};
|
||||
(client, server)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InterThreadCallClient {
|
||||
call_id: Arc<Mutex<usize>>,
|
||||
arg_sender: mpsc::UnboundedSender<(usize, Request)>,
|
||||
ret_recv: mpsc::UnboundedReceiver<(usize, Response)>,
|
||||
}
|
||||
|
||||
impl InterThreadCallClient {
|
||||
/// call without expecting responses or blocking
|
||||
async fn call_non_blocking(&self, req: Request) {
|
||||
let call_id = {
|
||||
let mut call_id = self.call_id.lock().await;
|
||||
*call_id += 1;
|
||||
*call_id
|
||||
};
|
||||
self.arg_sender.send((call_id, req)).unwrap();
|
||||
}
|
||||
/// call blocking, and return the result
|
||||
async fn call_blocking(&mut self, req: Request) -> Result<Response, Error> {
|
||||
let call_id = {
|
||||
let mut call_id = self.call_id.lock().await;
|
||||
*call_id += 1;
|
||||
*call_id
|
||||
};
|
||||
self.arg_sender.send((call_id, req)).unwrap();
|
||||
// TODO(discord9): better inter thread call impl
|
||||
let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap();
|
||||
if ret_call_id != call_id {
|
||||
return InternalSnafu {
|
||||
reason: "call id mismatch, worker/worker handler should be in sync",
|
||||
}
|
||||
.fail()
|
||||
.with_context(|_| EvalSnafu {});
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InterThreadCallServer {
|
||||
pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>,
|
||||
pub ret_sender: mpsc::UnboundedSender<(usize, Response)>,
|
||||
}
|
||||
|
||||
impl InterThreadCallServer {
|
||||
pub async fn recv(&mut self) -> Option<(usize, Request)> {
|
||||
self.arg_recv.recv().await
|
||||
}
|
||||
|
||||
pub fn blocking_recv(&mut self) -> Option<(usize, Request)> {
|
||||
self.arg_recv.blocking_recv()
|
||||
}
|
||||
|
||||
/// Send response back to the client
|
||||
pub fn resp(&self, call_id: usize, resp: Response) {
|
||||
self.ret_sender.send((call_id, resp)).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::*;
|
||||
use crate::adapter::FlowTickManager;
|
||||
use crate::expr::Id;
|
||||
use crate::plan::Plan;
|
||||
use crate::repr::{RelationType, Row};
|
||||
#[tokio::test]
|
||||
pub async fn test_simple_get_with_worker_and_handle() {
|
||||
let flow_tick = FlowTickManager::new();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let worker_thread_handle = std::thread::spawn(move || {
|
||||
let (handle, mut worker) = create_worker();
|
||||
tx.send(handle).unwrap();
|
||||
worker.run();
|
||||
});
|
||||
let handle = rx.await.unwrap();
|
||||
let src_ids = vec![GlobalId::User(1)];
|
||||
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
|
||||
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
|
||||
let (task_id, plan) = (
|
||||
1,
|
||||
TypedPlan {
|
||||
plan: Plan::Get {
|
||||
id: Id::Global(GlobalId::User(1)),
|
||||
},
|
||||
typ: RelationType::new(vec![]),
|
||||
},
|
||||
);
|
||||
handle
|
||||
.create_flow(
|
||||
task_id,
|
||||
plan,
|
||||
GlobalId::User(1),
|
||||
sink_tx,
|
||||
&src_ids,
|
||||
vec![rx],
|
||||
None,
|
||||
true,
|
||||
ErrCollector::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
tx.send((Row::empty(), 0, 0)).unwrap();
|
||||
handle.run_available(flow_tick.tick()).await;
|
||||
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
|
||||
handle.shutdown().await;
|
||||
worker_thread_handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
@@ -17,3 +17,7 @@
|
||||
mod render;
|
||||
mod state;
|
||||
mod types;
|
||||
|
||||
pub(crate) use render::Context;
|
||||
pub(crate) use state::DataflowState;
|
||||
pub(crate) use types::ErrCollector;
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
|
||||
|
||||
mod map;
|
||||
mod reduce;
|
||||
mod src_sink;
|
||||
|
||||
/// The Context for build a Operator with id of `GlobalId`
|
||||
pub struct Context<'referred, 'df> {
|
||||
@@ -52,13 +53,15 @@ pub struct Context<'referred, 'df> {
|
||||
pub df: &'referred mut Hydroflow<'df>,
|
||||
pub compute_state: &'referred mut DataflowState,
|
||||
/// a list of all collections being used in the operator
|
||||
///
|
||||
/// TODO(discord9): remove extra clone by counting usage and remove it on last usage?
|
||||
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
|
||||
/// used by `Get`/`Let` Plan for getting/setting local variables
|
||||
///
|
||||
/// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
|
||||
local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
|
||||
pub local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
|
||||
// Collect all errors in this operator's evaluation
|
||||
err_collector: ErrCollector,
|
||||
pub err_collector: ErrCollector,
|
||||
}
|
||||
|
||||
impl<'referred, 'df> Drop for Context<'referred, 'df> {
|
||||
@@ -235,7 +238,7 @@ mod test {
|
||||
for now in time_range {
|
||||
state.set_current_ts(now);
|
||||
state.run_available_with_schedule(df);
|
||||
assert!(state.get_err_collector().inner.borrow().is_empty());
|
||||
assert!(state.get_err_collector().is_empty());
|
||||
if let Some(expected) = expected.get(&now) {
|
||||
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
|
||||
} else {
|
||||
|
||||
@@ -153,7 +153,7 @@ fn eval_mfp_core(
|
||||
) -> Vec<KeyValDiffRow> {
|
||||
let mut all_updates = Vec::new();
|
||||
for (mut row, _sys_time, diff) in input.into_iter() {
|
||||
// this updates is expected to be only zero to two rows
|
||||
// this updates is expected to be only zero, one or two rows
|
||||
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
|
||||
// TODO(discord9): refactor error handling
|
||||
// Expect error in a single row to not interrupt the whole evaluation
|
||||
|
||||
@@ -80,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
||||
out_send_port,
|
||||
move |_ctx, recv, send| {
|
||||
// mfp only need to passively receive updates from recvs
|
||||
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
|
||||
let data = recv
|
||||
.take_inner()
|
||||
.into_iter()
|
||||
.flat_map(|v| v.into_iter())
|
||||
.collect_vec();
|
||||
|
||||
reduce_subgraph(
|
||||
&reduce_arrange,
|
||||
@@ -378,9 +382,8 @@ fn reduce_accum_subgraph(
|
||||
|
||||
let mut all_updates = Vec::with_capacity(key_to_vals.len());
|
||||
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
|
||||
|
||||
// lock the arrange for write for the rest of function body
|
||||
// so to prevent wide race condition since we are going to update the arrangement by write after read
|
||||
// so to prevent wired race condition since we are going to update the arrangement by write after read
|
||||
// TODO(discord9): consider key-based lock
|
||||
let mut arrange = arrange.write();
|
||||
for (key, value_diffs) in key_to_vals {
|
||||
@@ -395,6 +398,7 @@ fn reduce_accum_subgraph(
|
||||
}
|
||||
};
|
||||
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
|
||||
|
||||
let accums = accums.inner;
|
||||
|
||||
// deser accums from offsets
|
||||
|
||||
161
src/flow/src/compute/render/src_sink.rs
Normal file
161
src/flow/src/compute/render/src_sink.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Source and Sink for the dataflow
|
||||
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
|
||||
use common_telemetry::{debug, info};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
use itertools::Itertools;
|
||||
use snafu::OptionExt;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use crate::adapter::error::{Error, PlanSnafu};
|
||||
use crate::compute::render::Context;
|
||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
|
||||
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
impl<'referred, 'df> Context<'referred, 'df> {
|
||||
/// Render a source which comes from brocast channel into the dataflow
|
||||
/// will immediately send updates not greater than `now` and buffer the rest in arrangement
|
||||
pub fn render_source(
|
||||
&mut self,
|
||||
mut src_recv: broadcast::Receiver<DiffRow>,
|
||||
) -> Result<CollectionBundle, Error> {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source");
|
||||
let arrange_handler = self.compute_state.new_arrange(None);
|
||||
let arrange_handler_inner =
|
||||
arrange_handler
|
||||
.clone_future_only()
|
||||
.with_context(|| PlanSnafu {
|
||||
reason: "No write is expected at this point",
|
||||
})?;
|
||||
|
||||
let schd = self.compute_state.get_scheduler();
|
||||
let inner_schd = schd.clone();
|
||||
let now = self.compute_state.current_time_ref();
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
let sub = self
|
||||
.df
|
||||
.add_subgraph_source("source", send_port, move |_ctx, send| {
|
||||
let now = *now.borrow();
|
||||
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
|
||||
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
|
||||
|
||||
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
|
||||
let mut to_send = Vec::new();
|
||||
let mut to_arrange = Vec::new();
|
||||
|
||||
// TODO(discord9): handling tokio broadcast error
|
||||
while let Ok((r, t, d)) = src_recv.try_recv() {
|
||||
if t <= now {
|
||||
to_send.push((r, t, d));
|
||||
} else {
|
||||
to_arrange.push(((r, Row::empty()), t, d));
|
||||
}
|
||||
}
|
||||
let all = prev_avail.chain(to_send).collect_vec();
|
||||
if !all.is_empty() || !to_arrange.is_empty() {
|
||||
debug!(
|
||||
"All send: {} rows, not yet send: {} rows",
|
||||
all.len(),
|
||||
to_arrange.len()
|
||||
);
|
||||
}
|
||||
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
|
||||
send.give(all);
|
||||
// always schedule source to run at next tick
|
||||
inner_schd.schedule_at(now + 1);
|
||||
});
|
||||
schd.set_cur_subgraph(sub);
|
||||
let arranged = Arranged::new(arrange_handler);
|
||||
arranged.writer.borrow_mut().replace(sub);
|
||||
let arranged = BTreeMap::from([(vec![], arranged)]);
|
||||
Ok(CollectionBundle {
|
||||
collection: Collection::from_port(recv_port),
|
||||
arranged,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn render_unbounded_sink(
|
||||
&mut self,
|
||||
bundle: CollectionBundle,
|
||||
sender: mpsc::UnboundedSender<DiffRow>,
|
||||
) {
|
||||
let CollectionBundle {
|
||||
collection,
|
||||
arranged: _,
|
||||
} = bundle;
|
||||
|
||||
let _sink = self.df.add_subgraph_sink(
|
||||
"UnboundedSink",
|
||||
collection.into_inner(),
|
||||
move |_ctx, recv| {
|
||||
let data = recv.take_inner();
|
||||
for row in data.into_iter().flat_map(|i| i.into_iter()) {
|
||||
// if the sender is closed, stop sending
|
||||
if sender.is_closed() {
|
||||
break;
|
||||
}
|
||||
// TODO(discord9): handling tokio error
|
||||
let _ = sender.send(row);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full
|
||||
pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender<DiffRow>) {
|
||||
let CollectionBundle {
|
||||
collection,
|
||||
arranged: _,
|
||||
} = bundle;
|
||||
let mut buf = VecDeque::with_capacity(1000);
|
||||
|
||||
let schd = self.compute_state.get_scheduler();
|
||||
let inner_schd = schd.clone();
|
||||
let now = self.compute_state.current_time_ref();
|
||||
|
||||
let sink = self
|
||||
.df
|
||||
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
|
||||
let data = recv.take_inner();
|
||||
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
|
||||
if sender.len() >= BROADCAST_CAP {
|
||||
return;
|
||||
} else {
|
||||
while let Some(row) = buf.pop_front() {
|
||||
// if the sender is full, stop sending
|
||||
if sender.len() >= BROADCAST_CAP {
|
||||
break;
|
||||
}
|
||||
// TODO(discord9): handling tokio broadcast error
|
||||
let _ = sender.send(row);
|
||||
}
|
||||
}
|
||||
|
||||
// if buffer is not empty, schedule the next run at next tick
|
||||
// so the buffer can be drained as soon as possible
|
||||
if !buf.is_empty() {
|
||||
inner_schd.schedule_at(*now.borrow() + 1);
|
||||
}
|
||||
});
|
||||
|
||||
schd.set_cur_subgraph(sink);
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement};
|
||||
|
||||
/// input/output of a dataflow
|
||||
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
|
||||
#[derive(Default)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DataflowState {
|
||||
/// it is important to use a deque to maintain the order of subgraph here
|
||||
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule
|
||||
|
||||
@@ -21,7 +21,8 @@ use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff;
|
||||
use hydroflow::scheduled::port::RecvPort;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
use tokio::sync::RwLock;
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::compute::render::Context;
|
||||
use crate::expr::{EvalError, ScalarExpr};
|
||||
@@ -146,14 +147,22 @@ impl CollectionBundle {
|
||||
///
|
||||
/// Using a `VecDeque` to preserve the order of errors
|
||||
/// when running dataflow continuously and need errors in order
|
||||
#[derive(Default, Clone)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ErrCollector {
|
||||
pub inner: Rc<RefCell<VecDeque<EvalError>>>,
|
||||
pub inner: Arc<Mutex<VecDeque<EvalError>>>,
|
||||
}
|
||||
|
||||
impl ErrCollector {
|
||||
pub async fn get_all(&self) -> Vec<EvalError> {
|
||||
self.inner.lock().await.drain(..).collect_vec()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.inner.blocking_lock().is_empty()
|
||||
}
|
||||
|
||||
pub fn push_err(&self, err: EvalError) {
|
||||
self.inner.borrow_mut().push_back(err)
|
||||
self.inner.blocking_lock().push_back(err)
|
||||
}
|
||||
|
||||
pub fn run<F, R>(&self, f: F) -> Option<R>
|
||||
|
||||
@@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
fn is_send_sync() {
|
||||
fn check<T: Send + Sync>() {}
|
||||
check::<EvalError>();
|
||||
}
|
||||
|
||||
/// EvalError is about errors happen on columnar evaluation
|
||||
///
|
||||
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use common_time::DateTime;
|
||||
use datafusion_expr::Operator;
|
||||
use datafusion_substrait::logical_plan::consumer::name_to_op;
|
||||
@@ -206,8 +207,9 @@ impl UnaryFunc {
|
||||
from: arg_ty,
|
||||
to: to.clone(),
|
||||
}
|
||||
})?;
|
||||
Ok(res)
|
||||
});
|
||||
debug!("Cast to type: {to:?}, result: {:?}", res);
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Global id's scope is in Current Worker, and is cross-dataflow
|
||||
/// Global id's scope is in Current Flow node, and is cross-dataflow
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
|
||||
pub enum GlobalId {
|
||||
/// System namespace.
|
||||
|
||||
@@ -206,12 +206,16 @@ impl AggregateFunc {
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype);
|
||||
let input_type = if matches!(generic_fn, GenericFn::Count) {
|
||||
ConcreteDataType::null_datatype()
|
||||
} else {
|
||||
arg_type.unwrap_or_else(ConcreteDataType::null_datatype)
|
||||
};
|
||||
rule.get(&(generic_fn, input_type.clone()))
|
||||
.cloned()
|
||||
.with_context(|| InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"No specialization found for binary function {:?} with input type {:?}",
|
||||
"No specialization found for aggregate function {:?} with input type {:?}",
|
||||
generic_fn, input_type
|
||||
),
|
||||
})
|
||||
|
||||
@@ -30,3 +30,8 @@ mod plan;
|
||||
mod repr;
|
||||
mod transform;
|
||||
mod utils;
|
||||
|
||||
pub use adapter::{
|
||||
start_flow_node_with_one_worker, FlownodeBuilder, FlownodeManager, FlownodeManagerRef,
|
||||
FlownodeOptions,
|
||||
};
|
||||
|
||||
@@ -18,12 +18,15 @@
|
||||
mod join;
|
||||
mod reduce;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use datatypes::arrow::ipc::Map;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::adapter::error::Error;
|
||||
use crate::expr::{
|
||||
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr,
|
||||
AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
|
||||
TypedExpr,
|
||||
};
|
||||
use crate::plan::join::JoinPlan;
|
||||
pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
|
||||
@@ -71,6 +74,7 @@ impl TypedPlan {
|
||||
let mfp = MapFilterProject::new(input_arity)
|
||||
.map(exprs)?
|
||||
.project(input_arity..input_arity + output_arity)?;
|
||||
let out_typ = self.typ.apply_mfp(&mfp, &expr_typs);
|
||||
// special case for mfp to compose when the plan is already mfp
|
||||
let plan = match self.plan {
|
||||
Plan::Mfp {
|
||||
@@ -85,8 +89,7 @@ impl TypedPlan {
|
||||
mfp,
|
||||
},
|
||||
};
|
||||
let typ = RelationType::new(expr_typs);
|
||||
Ok(TypedPlan { typ, plan })
|
||||
Ok(TypedPlan { typ: out_typ, plan })
|
||||
}
|
||||
|
||||
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
|
||||
@@ -182,3 +185,45 @@ pub enum Plan {
|
||||
consolidate_output: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Plan {
|
||||
/// Find all the used collection in the plan
|
||||
pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
|
||||
fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
|
||||
match plan {
|
||||
Plan::Get { id } => {
|
||||
match id {
|
||||
Id::Local(_) => (),
|
||||
Id::Global(g) => {
|
||||
used.insert(*g);
|
||||
}
|
||||
};
|
||||
}
|
||||
Plan::Let { value, body, .. } => {
|
||||
recur_find_use(value, used);
|
||||
recur_find_use(body, used);
|
||||
}
|
||||
Plan::Mfp { input, .. } => {
|
||||
recur_find_use(input, used);
|
||||
}
|
||||
Plan::Reduce { input, .. } => {
|
||||
recur_find_use(input, used);
|
||||
}
|
||||
Plan::Join { inputs, .. } => {
|
||||
for input in inputs {
|
||||
recur_find_use(input, used);
|
||||
}
|
||||
}
|
||||
Plan::Union { inputs, .. } => {
|
||||
for input in inputs {
|
||||
recur_find_use(input, used);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
let mut ret = Default::default();
|
||||
recur_find_use(self, &mut ret);
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ use datatypes::types::cast;
|
||||
use datatypes::types::cast::CastOption;
|
||||
use datatypes::value::Value;
|
||||
use itertools::Itertools;
|
||||
pub(crate) use relation::{ColumnType, RelationDesc, RelationType};
|
||||
pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff);
|
||||
/// Row with key-value pair, timestamp and diff
|
||||
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
|
||||
|
||||
/// broadcast channel capacity
|
||||
pub const BROADCAST_CAP: usize = 1024;
|
||||
|
||||
/// Convert a value that is or can be converted to Datetime to internal timestamp
|
||||
///
|
||||
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
|
||||
@@ -104,6 +107,11 @@ impl Row {
|
||||
Self { inner: vec![] }
|
||||
}
|
||||
|
||||
/// Returns true if the Row contains no elements.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.inner.is_empty()
|
||||
}
|
||||
|
||||
/// Create a row from a vector of values
|
||||
pub fn new(row: Vec<Value>) -> Self {
|
||||
Self { inner: row }
|
||||
|
||||
@@ -12,11 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::adapter::error::{InvalidQuerySnafu, Result};
|
||||
use crate::expr::MapFilterProject;
|
||||
|
||||
/// a set of column indices that are "keys" for the collection.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
@@ -75,6 +79,12 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Key {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// The type of a relation.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub struct RelationType {
|
||||
@@ -96,6 +106,49 @@ pub struct RelationType {
|
||||
}
|
||||
|
||||
impl RelationType {
|
||||
pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Self {
|
||||
let all_types = self
|
||||
.column_types
|
||||
.iter()
|
||||
.chain(expr_typs.iter())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
let mfp_out_types = mfp
|
||||
.projection
|
||||
.iter()
|
||||
.map(|i| all_types[*i].clone())
|
||||
.collect_vec();
|
||||
let old_to_new_col = BTreeMap::from_iter(
|
||||
mfp.projection
|
||||
.clone()
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(new, old)| (old, new)),
|
||||
);
|
||||
|
||||
// since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
|
||||
let keys = self
|
||||
.keys
|
||||
.iter()
|
||||
.filter_map(|key| {
|
||||
key.column_indices
|
||||
.iter()
|
||||
.map(|old| old_to_new_col.get(old).cloned())
|
||||
.collect::<Option<Vec<_>>>()
|
||||
.and_then(|v| if v.is_empty() { None } else { Some(v) })
|
||||
.map(Key::from)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let time_index = self
|
||||
.time_index
|
||||
.and_then(|old| old_to_new_col.get(&old).cloned());
|
||||
Self {
|
||||
column_types: mfp_out_types,
|
||||
keys,
|
||||
time_index,
|
||||
}
|
||||
}
|
||||
/// Constructs a `RelationType` representing the relation with no columns and
|
||||
/// no keys.
|
||||
pub fn empty() -> Self {
|
||||
|
||||
@@ -14,11 +14,24 @@
|
||||
|
||||
//! Transform Substrait into execution plan
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use prost::Message;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::QueryEngine;
|
||||
use session::context::QueryContext;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
|
||||
use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu};
|
||||
use crate::adapter::error::{
|
||||
Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu,
|
||||
NotImplementedSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::adapter::FlownodeContext;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::plan::TypedPlan;
|
||||
use crate::repr::RelationType;
|
||||
/// a simple macro to generate a not implemented error
|
||||
macro_rules! not_impl_err {
|
||||
@@ -44,7 +57,7 @@ mod literal;
|
||||
mod plan;
|
||||
|
||||
use literal::{from_substrait_literal, from_substrait_type};
|
||||
use snafu::OptionExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
|
||||
use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration;
|
||||
|
||||
@@ -79,38 +92,32 @@ impl FunctionExtensions {
|
||||
}
|
||||
}
|
||||
|
||||
/// A context that holds the information of the dataflow
|
||||
pub struct DataflowContext {
|
||||
/// `id` refer to any source table in the dataflow, and `name` is the name of the table
|
||||
/// which is a `Vec<String>` in substrait
|
||||
id_to_name: HashMap<GlobalId, Vec<String>>,
|
||||
/// see `id_to_name`
|
||||
name_to_id: HashMap<Vec<String>, GlobalId>,
|
||||
/// the schema of the table
|
||||
schema: HashMap<GlobalId, RelationType>,
|
||||
}
|
||||
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
|
||||
/// then to a substrait plan, and finally to a flow plan.
|
||||
pub async fn sql_to_flow_plan(
|
||||
ctx: &mut FlownodeContext,
|
||||
engine: &Arc<dyn QueryEngine>,
|
||||
sql: &str,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
let query_ctx = ctx.query_context.clone().unwrap();
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
.await
|
||||
.context(InvalidQueryPlanSnafu)?;
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
|
||||
impl DataflowContext {
|
||||
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
|
||||
///
|
||||
/// Returns an error if no table has been registered with the provided names
|
||||
pub fn table(&self, name: &Vec<String>) -> Result<(GlobalId, RelationType), Error> {
|
||||
let id = self
|
||||
.name_to_id
|
||||
.get(name)
|
||||
.copied()
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: name.join("."),
|
||||
})?;
|
||||
let schema = self
|
||||
.schema
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: name.join("."),
|
||||
})?;
|
||||
Ok((id, schema))
|
||||
}
|
||||
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
|
||||
let bytes = DFLogicalSubstraitConvertor {}
|
||||
.encode(&plan)
|
||||
.context(InvalidQuerySubstraitSnafu)?;
|
||||
|
||||
let sub_plan = substrait::substrait_proto::proto::Plan::decode(bytes)
|
||||
.map_err(|inner| InvalidQueryProstSnafu { inner }.build())?;
|
||||
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?;
|
||||
|
||||
Ok(flow_plan)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -129,17 +136,24 @@ mod test {
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
|
||||
use super::*;
|
||||
use crate::adapter::node_context::IdToNameMap;
|
||||
use crate::repr::ColumnType;
|
||||
|
||||
pub fn create_test_ctx() -> DataflowContext {
|
||||
pub fn create_test_ctx() -> FlownodeContext {
|
||||
let gid = GlobalId::User(0);
|
||||
let name = vec!["numbers".to_string()];
|
||||
let name = [
|
||||
"greptime".to_string(),
|
||||
"public".to_string(),
|
||||
"numbers".to_string(),
|
||||
];
|
||||
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
||||
|
||||
DataflowContext {
|
||||
id_to_name: HashMap::from([(gid, name.clone())]),
|
||||
name_to_id: HashMap::from([(name.clone(), gid)]),
|
||||
let mut tri_map = IdToNameMap::new();
|
||||
tri_map.insert(Some(name.clone()), Some(0), gid);
|
||||
FlownodeContext {
|
||||
schema: HashMap::from([(gid, schema)]),
|
||||
table_repr: tri_map,
|
||||
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,11 +54,11 @@ use crate::expr::{
|
||||
};
|
||||
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
use crate::transform::{DataflowContext, FunctionExtensions};
|
||||
use crate::transform::{FlownodeContext, FunctionExtensions};
|
||||
|
||||
impl TypedExpr {
|
||||
fn from_substrait_agg_grouping(
|
||||
ctx: &mut DataflowContext,
|
||||
ctx: &mut FlownodeContext,
|
||||
groupings: &[Grouping],
|
||||
typ: &RelationType,
|
||||
extensions: &FunctionExtensions,
|
||||
@@ -84,7 +84,7 @@ impl TypedExpr {
|
||||
|
||||
impl AggregateExpr {
|
||||
fn from_substrait_agg_measures(
|
||||
ctx: &mut DataflowContext,
|
||||
ctx: &mut FlownodeContext,
|
||||
measures: &[Measure],
|
||||
typ: &RelationType,
|
||||
extensions: &FunctionExtensions,
|
||||
@@ -218,7 +218,7 @@ impl KeyValPlan {
|
||||
impl TypedPlan {
|
||||
/// Convert AggregateRel into Flow's TypedPlan
|
||||
pub fn from_substrait_agg_rel(
|
||||
ctx: &mut DataflowContext,
|
||||
ctx: &mut FlownodeContext,
|
||||
agg: &proto::AggregateRel,
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
@@ -228,7 +228,7 @@ impl TypedPlan {
|
||||
return not_impl_err!("Aggregate without an input is not supported");
|
||||
};
|
||||
|
||||
let group_expr =
|
||||
let group_exprs =
|
||||
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?;
|
||||
|
||||
let mut aggr_exprs =
|
||||
@@ -236,14 +236,14 @@ impl TypedPlan {
|
||||
|
||||
let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
|
||||
&mut aggr_exprs,
|
||||
&group_expr,
|
||||
&group_exprs,
|
||||
input.typ.column_types.len(),
|
||||
)?;
|
||||
|
||||
let output_type = {
|
||||
let mut output_types = Vec::new();
|
||||
// first append group_expr as key, then aggr_expr as value
|
||||
for expr in &group_expr {
|
||||
for expr in &group_exprs {
|
||||
output_types.push(expr.typ.clone());
|
||||
}
|
||||
|
||||
@@ -252,7 +252,8 @@ impl TypedPlan {
|
||||
aggr.func.signature().output.clone(),
|
||||
));
|
||||
}
|
||||
RelationType::new(output_types)
|
||||
// TODO(discord9): try best to get time
|
||||
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
|
||||
};
|
||||
|
||||
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
|
||||
@@ -365,8 +366,8 @@ mod test {
|
||||
};
|
||||
let expected = TypedPlan {
|
||||
typ: RelationType::new(vec![
|
||||
ColumnType::new(CDT::uint32_datatype(), true),
|
||||
ColumnType::new(CDT::uint32_datatype(), false),
|
||||
ColumnType::new(CDT::uint32_datatype(), true), // col sum(number)
|
||||
ColumnType::new(CDT::uint32_datatype(), false), // col number
|
||||
]),
|
||||
plan: Plan::Mfp {
|
||||
input: Box::new(Plan::Reduce {
|
||||
|
||||
@@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS
|
||||
use crate::expr::{MapFilterProject, TypedExpr};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, RelationType};
|
||||
use crate::transform::{DataflowContext, FunctionExtensions};
|
||||
use crate::transform::{FlownodeContext, FunctionExtensions};
|
||||
|
||||
impl TypedPlan {
|
||||
/// Convert Substrait Plan into Flow's TypedPlan
|
||||
pub fn from_substrait_plan(
|
||||
ctx: &mut DataflowContext,
|
||||
ctx: &mut FlownodeContext,
|
||||
plan: &SubPlan,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
// Register function extension
|
||||
@@ -62,7 +62,7 @@ impl TypedPlan {
|
||||
/// Convert Substrait Rel into Flow's TypedPlan
|
||||
/// TODO: SELECT DISTINCT(does it get compile with something else?)
|
||||
pub fn from_substrait_rel(
|
||||
ctx: &mut DataflowContext,
|
||||
ctx: &mut FlownodeContext,
|
||||
rel: &Rel,
|
||||
extensions: &FunctionExtensions,
|
||||
) -> Result<TypedPlan, Error> {
|
||||
@@ -114,7 +114,28 @@ impl TypedPlan {
|
||||
}
|
||||
Some(RelType::Read(read)) => {
|
||||
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
|
||||
let table_reference = nt.names.clone();
|
||||
let query_ctx = ctx.query_context.clone().unwrap();
|
||||
let table_reference = match nt.names.len() {
|
||||
1 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
query_ctx.current_schema().to_string(),
|
||||
nt.names[0].clone(),
|
||||
],
|
||||
2 => [
|
||||
query_ctx.current_catalog().to_string(),
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
],
|
||||
3 => [
|
||||
nt.names[0].clone(),
|
||||
nt.names[1].clone(),
|
||||
nt.names[2].clone(),
|
||||
],
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: "Expect table to have name",
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
let table = ctx.table(&table_reference)?;
|
||||
let get_table = Plan::Get {
|
||||
id: crate::expr::Id::Global(table.0),
|
||||
|
||||
@@ -31,16 +31,19 @@ use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
|
||||
|
||||
pub struct StandaloneDatanodeManager(pub RegionServer);
|
||||
pub struct StandaloneDatanodeManager {
|
||||
pub region_server: RegionServer,
|
||||
pub flow_server: FlownodeRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NodeManager for StandaloneDatanodeManager {
|
||||
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
|
||||
RegionInvoker::arc(self.0.clone())
|
||||
RegionInvoker::arc(self.region_server.clone())
|
||||
}
|
||||
|
||||
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
|
||||
unimplemented!()
|
||||
self.flow_server.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ common-wal.workspace = true
|
||||
datanode = { workspace = true }
|
||||
datatypes.workspace = true
|
||||
dotenv.workspace = true
|
||||
flow.workspace = true
|
||||
frontend = { workspace = true, features = ["testing"] }
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
|
||||
@@ -35,6 +35,7 @@ use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::logging::LoggingOptions;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use flow::FlownodeBuilder;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
|
||||
@@ -129,6 +130,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
@@ -139,7 +141,19 @@ impl GreptimeDbStandaloneBuilder {
|
||||
)
|
||||
.await;
|
||||
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
)
|
||||
.with_kv_backend(kv_backend.clone());
|
||||
let flownode = Arc::new(flow_builder.build().await);
|
||||
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager {
|
||||
region_server: datanode.region_server(),
|
||||
flow_server: flownode.clone(),
|
||||
});
|
||||
|
||||
let table_id_sequence = Arc::new(
|
||||
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
|
||||
@@ -193,6 +207,11 @@ impl GreptimeDbStandaloneBuilder {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
flownode
|
||||
.set_frontend_invoker(Box::new(instance.clone()))
|
||||
.await;
|
||||
let _node_handle = flownode.run_background();
|
||||
|
||||
procedure_manager.start().await.unwrap();
|
||||
wal_options_allocator.start().await.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user