Compare commits

...

42 Commits

Author SHA1 Message Date
discord9
93fcb7454c chore: after rebase 2024-05-10 15:11:17 +08:00
discord9
cfe28b6974 chore: debug log 2024-05-10 14:52:22 +08:00
discord9
9ec6107988 refactor: remove unused imports 2024-05-10 14:52:22 +08:00
discord9
e840bb469d chore: refactor& remove TODOs 2024-05-10 14:52:22 +08:00
discord9
684850f451 fix: get true QueryContext& dedup code 2024-05-10 14:52:22 +08:00
discord9
76aadb2223 chore: cleanup debug log&fix after rebase 2024-05-10 14:52:22 +08:00
discord9
f360b2e812 tests: fix tests 2024-05-10 14:52:22 +08:00
discord9
138a2aba7f fix: allow empty expire when 2024-05-10 14:52:22 +08:00
discord9
8f6462c0b0 feat: parse expire when 2024-05-10 14:52:22 +08:00
discord9
46d0b3cd64 fix: table name trying best to get full name 2024-05-10 14:52:22 +08:00
discord9
a17a7f4e47 feat: working poc demo...ish 2024-05-10 14:52:22 +08:00
discord9
50335dd53c write back seems didn't work 2024-05-10 14:52:22 +08:00
discord9
abaf881f06 refactor: tableName as array 2024-05-10 14:52:22 +08:00
discord9
e1a8215394 tests(WIP): get demo working 2024-05-10 14:52:22 +08:00
discord9
d7942a1a00 fix: make worker handle async 2024-05-10 14:52:22 +08:00
discord9
a6727e2e8d refactor: use table name for sink table 2024-05-10 14:52:22 +08:00
discord9
d5bdbedcd6 refactor: rwlock for frontend invoker&async lock 2024-05-10 14:51:44 +08:00
discord9
878737f781 feat: integrate flow to standalone(untested) 2024-05-10 14:51:44 +08:00
discord9
d88cff6f51 feat: impl Flownode for FlowNodeManager 2024-05-10 14:51:44 +08:00
discord9
e7801abd0c feat(WIP): simple parser 2024-05-10 14:51:44 +08:00
discord9
d7a132a02f chore: remove some TODO done 2024-05-10 14:51:44 +08:00
discord9
a3417f50cf refactor: rename some task to flow 2024-05-10 14:51:44 +08:00
discord9
099f414f63 chore: rename some task to flow 2024-05-10 14:51:44 +08:00
discord9
c22185abce refactor: make worker sync only and separate thread&test 2024-05-10 14:51:44 +08:00
discord9
e33afa53f4 feat: grpc trait&Server trait 2024-05-10 14:51:44 +08:00
discord9
7eaf471808 feat(WIP): main loop 2024-05-10 14:51:44 +08:00
discord9
acba753500 chore: remove unused 2024-05-10 14:51:44 +08:00
discord9
5736373820 feat impl grpc server 2024-05-10 14:51:44 +08:00
discord9
74dee25688 feat: new() for FlowNodeManager 2024-05-10 14:51:44 +08:00
discord9
edcbc89c38 feat: gen write back req 2024-05-10 14:51:44 +08:00
discord9
e88a40b58b refactor: use seperate Worker 2024-05-10 14:51:44 +08:00
discord9
c7647759be feat(WIP): FlowWorker: !Send 2024-05-10 14:51:44 +08:00
discord9
d8a191a2db refactor: FlowNodeContext 2024-05-10 14:51:44 +08:00
discord9
ea40691c71 chore: some unfinished tests 2024-05-10 14:51:44 +08:00
discord9
640674b9bc feat: get table schema 2024-05-10 14:51:44 +08:00
discord9
3fb3fb18c2 tests: add one for Send 2024-05-10 14:51:44 +08:00
discord9
1067d3453d feat: dataflow adapter 2024-05-10 14:51:44 +08:00
discord9
57e3912aca feat(WIP): FlowNodeManager 2024-05-10 14:51:44 +08:00
discord9
ebcfb0f1d7 feat: now types of errors 2024-05-10 14:51:44 +08:00
discord9
6442c96847 feat: render src/sink 2024-05-10 14:51:44 +08:00
discord9
b19febc97c feat: sql_to_flow_plan 2024-05-10 14:51:44 +08:00
discord9
8240a1ace1 feat: find all used collection 2024-05-10 14:51:44 +08:00
37 changed files with 2792 additions and 88 deletions

44
Cargo.lock generated
View File

@@ -1611,6 +1611,7 @@ dependencies = [
"either",
"etcd-client",
"file-engine",
"flow",
"frontend",
"futures",
"human-panic",
@@ -2459,6 +2460,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling"
version = "0.14.4"
@@ -3506,11 +3517,16 @@ name = "flow"
version = "0.7.2"
dependencies = [
"api",
"async-trait",
"catalog",
"common-base",
"common-catalog",
"common-decimal",
"common-error",
"common-frontend",
"common-macro",
"common-meta",
"common-runtime",
"common-telemetry",
"common-time",
"datafusion-common",
@@ -3518,8 +3534,12 @@ dependencies = [
"datafusion-substrait",
"datatypes",
"enum_dispatch",
"futures",
"greptime-proto",
"hydroflow",
"itertools 0.10.5",
"minstant",
"nom",
"num-traits",
"prost 0.12.4",
"query",
@@ -3529,6 +3549,7 @@ dependencies = [
"session",
"smallvec",
"snafu",
"store-api",
"strum 0.25.0",
"substrait 0.7.2",
"table",
@@ -5397,6 +5418,16 @@ dependencies = [
"adler",
]
[[package]]
name = "minstant"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db"
dependencies = [
"ctor",
"web-time 1.1.0",
]
[[package]]
name = "mio"
version = "0.8.11"
@@ -10289,6 +10320,7 @@ dependencies = [
"datanode",
"datatypes",
"dotenv",
"flow",
"frontend",
"futures",
"futures-util",
@@ -11011,7 +11043,7 @@ dependencies = [
"tracing-core",
"tracing-log 0.2.0",
"tracing-subscriber",
"web-time",
"web-time 0.2.4",
]
[[package]]
@@ -11769,6 +11801,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webbrowser"
version = "0.8.15"

View File

@@ -204,6 +204,7 @@ common-wal = { path = "src/common/wal" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }

View File

@@ -45,6 +45,7 @@ datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
file-engine.workspace = true
flow.workspace = true
frontend.workspace = true
futures.workspace = true
human-panic = "1.2.2"

View File

@@ -40,6 +40,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -405,11 +406,26 @@ impl StartCommand {
)
.await;
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_builder = FlownodeBuilder::new(
Default::default(),
fe_plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build().await);
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
});
let table_id_sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -427,8 +443,6 @@ impl StartCommand {
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
@@ -456,6 +470,12 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
// flow server need to be able to use frontend to write insert requests back
flownode
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
let _handle = flownode.clone().run_background();
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await

View File

@@ -128,6 +128,7 @@ impl TableInfoValue {
}
}
#[derive(Clone)]
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}

View File

@@ -9,9 +9,13 @@ workspace = true
[dependencies]
api.workspace = true
catalog.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
@@ -19,17 +23,28 @@ datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
async-trait.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
prost.workspace = true
query.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tonic.workspace = true

View File

@@ -14,5 +14,648 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
#![warn(unused_imports)]
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_runtime::JoinHandle;
use common_telemetry::{debug, info};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
use itertools::Itertools;
use minstant::Anchor;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{oneshot, Mutex, RwLock};
use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::parse_expr::parse_fixed;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::expr::GlobalId;
use crate::repr::{self, DiffRow, Row};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
mod flownode_impl;
mod parse_expr;
mod server;
#[cfg(test)]
mod tests;
mod util;
mod worker;
pub(crate) mod node_context;
mod table_source;
use error::Error;
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];
/// Options for flow node
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
/// rpc address
pub rpc_addr: String,
}
/// Flownode Builder
pub struct FlownodeBuilder {
opts: FlownodeOptions,
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
table_meta: TableMetadataManagerRef,
catalog_manager: Arc<KvBackendCatalogManager>,
}
impl FlownodeBuilder {
/// init flownode builder
pub fn new(
opts: FlownodeOptions,
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: Arc<KvBackendCatalogManager>,
) -> Self {
Self {
opts,
plugins,
kv_backend: None,
table_meta,
catalog_manager,
}
}
/// set kv backend
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
..self
}
}
/// TODO(discord9): error handling
pub async fn build(self) -> FlownodeManager {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode only translate plan with resolved table source.
self.catalog_manager.clone(),
None,
None,
None,
false,
self.plugins.clone(),
);
let query_engine = query_engine_factory.query_engine();
let (tx, rx) = oneshot::channel();
let _handle = std::thread::spawn(move || {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone());
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.unwrap();
info!("Flow Node Manager started");
man
}
}
/// This function will create a new thread for flow worker and return a handle to the flow node manager
pub fn start_flow_node_with_one_worker(
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> FlownodeManager {
let (tx, rx) = oneshot::channel();
let _handle = std::thread::spawn(move || {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlownodeManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
worker.run();
});
rx.blocking_recv().unwrap()
}
/// Arc-ed FlowNodeManager, cheaper to clone
pub type FlownodeManagerRef = Arc<FlownodeManager>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
pub struct FlownodeManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
tick_manager: FlowTickManager,
node_id: Option<u32>,
}
/// Building FlownodeManager
impl FlownodeManager {
/// set frontend invoker
pub async fn set_frontend_invoker(
self: &Arc<Self>,
frontend: Box<dyn FrontendInvoker + Send + Sync>,
) {
*self.frontend_invoker.write().await = Some(frontend);
}
/// Create **without** setting `frontend_invoker`
pub fn new(
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
let srv_map = TableSource::new(
table_meta.table_info_manager().clone(),
table_meta.table_name_manager().clone(),
);
let node_context = FlownodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlownodeManager {
worker_handles,
query_engine,
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
flow_err_collectors: Default::default(),
tick_manager,
node_id,
}
}
/// Create a flownode manager with one worker
pub fn new_with_worker<'s>(
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> (Self, Worker<'s>) {
let mut zelf = Self::new(node_id, query_engine, table_meta);
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
(zelf, worker)
}
/// add a worker handler to manager, meaning this corrseponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(Mutex::new(handle));
}
}
#[derive(Debug)]
pub enum DiffRequest {
Insert(Vec<(Row, repr::Timestamp)>),
Delete(Vec<(Row, repr::Timestamp)>),
}
/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
for (row, ts, diff) in rows {
let last = reqs.last_mut();
match (last, diff) {
(Some(DiffRequest::Insert(rows)), 1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
(Some(DiffRequest::Delete(rows)), -1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
_ => {}
}
}
reqs
}
/// This impl block contains methods to send writeback requests to frontend
impl FlownodeManager {
/// TODO(discord9): merge all same type of diff row into one requests
///
/// Return the number of requests it made
pub async fn send_writeback_requests(&self) -> Result<usize, Error> {
let all_reqs = self.generate_writeback_request().await;
if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
return Ok(0);
}
let mut req_cnt = 0;
for (table_name, reqs) in all_reqs {
if reqs.is_empty() {
continue;
}
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema) = if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(&table_name)
.await?
{
let table_info = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.unwrap();
let meta = table_info.table_info.meta;
let primary_keys = meta
.primary_key_indices
.into_iter()
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let schema = meta.schema.column_schemas;
(primary_keys, schema)
} else {
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
let (primary_keys, schema) = {
let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
.map(|x| x.1)
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.collect_vec()
})
.unwrap_or_default();
let ts_col = ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
let wout_ts = schema
.column_types
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
})
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(ts_col);
(primary_keys, with_ts)
};
(primary_keys, schema)
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
debug!(
"Sending {} writeback requests to table {}, reqs={:?}",
reqs.len(),
table_name.join("."),
reqs
);
for req in reqs {
match req {
DiffRequest::Insert(insert) => {
let rows_proto: Vec<v1::Row> = insert
.into_iter()
.map(|(mut row, _ts)| {
row.extend(Some(Value::from(
common_time::Timestamp::new_millisecond(0),
)));
row.into()
})
.collect::<Vec<_>>();
let table_name = table_name.last().unwrap().clone();
let req = RowInsertRequest {
table_name,
rows: Some(v1::Rows {
schema: proto_schema.clone(),
rows: rows_proto,
}),
};
req_cnt += 1;
self.frontend_invoker
.read()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {
reason: "Expect a frontend invoker for flownode to write back",
})?
.row_inserts(RowInsertRequests { inserts: vec![req] }, ctx.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| ExternalSnafu {})?;
}
DiffRequest::Delete(remove) => {
info!("original remove rows={:?}", remove);
let rows_proto: Vec<v1::Row> = remove
.into_iter()
.map(|(mut row, _ts)| {
row.extend(Some(Value::from(
common_time::Timestamp::new_millisecond(0),
)));
row.into()
})
.collect::<Vec<_>>();
let table_name = table_name.last().unwrap().clone();
let req = RowDeleteRequest {
table_name,
rows: Some(v1::Rows {
schema: proto_schema.clone(),
rows: rows_proto,
}),
};
req_cnt += 1;
self.frontend_invoker
.read()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {
reason: "Expect a frontend invoker for flownode to write back",
})?
.row_deletes(RowDeleteRequests { deletes: vec![req] }, ctx.clone())
.await
.map_err(BoxedError::new)
.with_context(|_| ExternalSnafu {})?;
}
}
}
}
Ok(req_cnt)
}
/// Generate writeback request for all sink table
pub async fn generate_writeback_request(&self) -> BTreeMap<TableName, Vec<DiffRequest>> {
let mut output = BTreeMap::new();
for (name, sink_recv) in self
.node_context
.lock()
.await
.sink_receiver
.iter_mut()
.map(|(n, (_s, r))| (n, r))
{
let mut rows = Vec::new();
while let Ok(row) = sink_recv.try_recv() {
rows.push(row);
}
let reqs = diff_row_to_request(rows);
output.insert(name.clone(), reqs);
}
output
}
}
/// Flow Runtime related methods
impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
common_runtime::spawn_bg(async move {
self.run().await;
})
}
/// log all flow errors
pub async fn log_all_errors(&self) {
for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
let all_errors = f_err.get_all().await;
if !all_errors.is_empty() {
let all_errors = all_errors
.into_iter()
.map(|i| format!("{:?}", i))
.join("\n");
common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
}
}
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
self.run_available().await;
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
worker.lock().await.run_available(now).await;
}
}
/// send write request to related source sender
pub async fn handle_write_request(
&self,
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
debug!("Send {} rows to table {}", rows.len(), table_id);
self.node_context.lock().await.send(table_id, rows)?;
Ok(())
}
}
/// Create&Remove flow
impl FlownodeManager {
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {
let handle = handle.lock().await;
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
break;
}
}
Ok(())
}
/// Return task id if a new task is created, otherwise return None
///
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_when expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exist: bool,
expire_when: Option<String>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
if create_if_not_exist {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
return Ok(None);
}
}
}
let mut node_ctx = self.node_context.lock().await;
// assign global id to source and sink table
for source in source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await?;
}
node_ctx
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await?;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
let expire_when = expire_when
.and_then(|s| {
if s.is_empty() || s.split_whitespace().join("").is_empty() {
None
} else {
Some(s)
}
})
.map(|d| {
let d = d.as_ref();
parse_fixed(d)
.map(|(_, n)| n)
.map_err(|err| err.to_string())
})
.transpose()
.map_err(|err| UnexpectedSnafu { reason: err }.build())?;
let _ = comment;
let _ = flow_options;
// TODO(discord9): add more than one handles
let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
let source_ids = source_table_ids
.iter()
.map(|id| node_ctx.table_repr.get_by_table_id(id).unwrap().1)
.collect_vec();
let source_senders = source_ids
.iter()
.map(|id| node_ctx.get_source_by_global_id(id).map(|s| s.subscribe()))
.collect::<Result<Vec<_>, _>>()?;
let err_collector = ErrCollector::default();
self.flow_err_collectors
.write()
.await
.insert(flow_id, err_collector.clone());
let handle = &self.worker_handles[0].lock().await;
handle
.create_flow(
flow_id,
flow_plan,
sink_id,
sink_sender,
&source_ids,
source_senders,
expire_when,
create_if_not_exist,
err_collector,
)
.await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))
}
}
/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
///
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
/// TSO coord mess
#[derive(Clone)]
pub struct FlowTickManager {
anchor: Anchor,
}
impl std::fmt::Debug for FlowTickManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowTickManager").finish()
}
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {
anchor: Anchor::new(),
}
}
/// Return the current timestamp in milliseconds
pub fn tick(&self) -> repr::Timestamp {
(minstant::Instant::now().as_unix_nanos(&self.anchor) / 1_000_000) as repr::Timestamp
}
}

View File

@@ -16,12 +16,11 @@
use std::any::Any;
use common_error::ext::BoxedError;
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
@@ -32,6 +31,11 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error"))]
External {
location: Location,
source: BoxedError,
},
/// TODO(discord9): add detailed location of column
#[snafu(display("Failed to eval stream"))]
Eval {
@@ -42,6 +46,13 @@ pub enum Error {
#[snafu(display("Table not found: {name}"))]
TableNotFound { name: String, location: Location },
#[snafu(display("Table not found: {msg}, meta error: {source}"))]
TableNotFoundMeta {
source: common_meta::error::Error,
msg: String,
location: Location,
},
#[snafu(display("Table already exist: {name}"))]
TableAlreadyExist { name: String, location: Location },
@@ -52,6 +63,24 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid query plan: {source}"))]
InvalidQueryPlan {
source: query::error::Error,
location: Location,
},
#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
InvalidQueryProst {
inner: api::DecodeError,
location: Location,
},
#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
InvalidQuerySubstrait {
source: substrait::error::Error,
location: Location,
},
#[snafu(display("Invalid query: {reason}"))]
InvalidQuery { reason: String, location: Location },
@@ -80,6 +109,9 @@ pub enum Error {
context: String,
location: Location,
},
#[snafu(display("Unexpected: {reason}"))]
Unexpected { reason: String, location: Location },
}
/// Result type for flow module
@@ -92,14 +124,20 @@ impl ErrorExt for Error {
StatusCode::Internal
}
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } => StatusCode::TableNotFound,
&Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => {
StatusCode::PlanQuery
Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => {
StatusCode::TableNotFound
}
Self::NoProtoType { .. } => StatusCode::Unexpected,
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected,
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
}
&Self::External { .. } => StatusCode::Unknown,
}
}

View File

@@ -0,0 +1,117 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
.unwrap_err()
}
#[async_trait::async_trait]
impl Flownode for FlownodeManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_when,
comment,
sql,
flow_options,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),
Some(comment),
sql,
flow_options,
query_ctx,
)
.await
.map_err(to_meta_err)?;
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
Ok(Default::default())
}
None => UnexpectedSnafu {
err_msg: "Missing request body",
}
.fail(),
_ => UnexpectedSnafu {
err_msg: "Invalid request body.",
}
.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
for write_request in request.requests {
let region_id = write_request.region_id;
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(repr::Row::from)
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows)
.await
.map_err(to_meta_err)?;
}
Ok(Default::default())
}
}

View File

@@ -0,0 +1,300 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Node context, prone to change with every incoming requests
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or infered from TypedPlan
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
// return number of rows it actuall send(including what's in the buffer)
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
Ok(row_cnt)
}
}
impl FlownodeContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corrseponding broadcast sender/receiver
pub fn register_task_src_sink(
&mut self,
task_id: FlowId,
source_table_ids: &[TableId],
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
.insert(task_id);
}
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
}
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}
pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?
.1
.with_context(|| TableNotFoundSnafu {
name: format!("Table Id = {:?}", id),
})?;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})
}
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?;
self.sink_receiver
.get(&table_name)
.map(|(s, _r)| s.clone())
.with_context(|| TableNotFoundSnafu {
name: table_name.join("."),
})
}
}
impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
let id = self
.table_repr
.get_by_name(name)
.map(|(_tid, gid)| gid)
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// require at least one of `table_name` or `table_id` to be `Some`
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
// if we can find by table name/id. not assign it
if let Some(gid) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
.map(|(_, gid)| gid)
.or_else(|| {
table_id
.and_then(|id| self.table_repr.get_by_table_id(&id))
.map(|(_, gid)| gid)
})
{
Ok(gid)
} else {
let global_id = self.new_global_id();
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
self.table_repr.insert(table_name, table_id, global_id);
Ok(global_id)
}
}
/// Assign a schema to a table
///
/// TODO(discord9): error handling
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
}
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default, Debug)]
pub struct IdToNameMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
}
impl IdToNameMap {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
name.clone()
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
self.name_to_global_id.get(name).map(|global_id| {
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
(*id, *global_id)
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
self.id_to_global_id.get(id).map(|global_id| {
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
(name.clone(), *global_id)
})
}
pub fn get_by_global_id(
&self,
global_id: &GlobalId,
) -> Option<(Option<TableName>, Option<TableId>)> {
self.global_id_to_name_id.get(global_id).cloned()
}
}

View File

@@ -0,0 +1,245 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! parse expr like "ts <= now() - interval '5 m'"
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{alphanumeric1, digit0, multispace0};
use nom::combinator::peek;
use nom::sequence::tuple;
use nom::IResult;
use crate::repr;
#[test]
fn test_parse_duration() {
let input = "1 h 5 m 42 second";
let (remain, ttl) = parse_duration(input).unwrap();
assert_eq!(remain, "");
assert_eq!(ttl, (3600 + 5 * 60 + 42) * 1000);
}
#[test]
fn test_parse_fixed() {
let input = "timestamp < now() - INTERVAL '5m 42s'";
let (remain, ttl) = parse_fixed(input).unwrap();
assert_eq!(remain, "");
assert_eq!(ttl, (5 * 60 + 42) * 1000);
}
pub fn parse_fixed(input: &str) -> IResult<&str, i64> {
let (r, _) = tuple((
multispace0,
tag_no_case("timestamp"),
multispace0,
tag("<"),
multispace0,
tag_no_case("now()"),
multispace0,
tag("-"),
multispace0,
tag_no_case("interval"),
multispace0,
))(input)?;
tuple((tag("'"), parse_duration, tag("'")))(r).map(|(r, (_, ttl, _))| (r, ttl))
}
/// parse duration and return ttl, currently only support time part of psql interval type
pub fn parse_duration(input: &str) -> IResult<&str, i64> {
let mut intervals = vec![];
let mut remain = input;
while peek(parse_quality)(remain).is_ok() {
let (r, number) = parse_quality(remain)?;
let (r, unit) = parse_time_unit(r)?;
intervals.push((number, unit));
remain = r;
}
let mut total = 0;
for (number, unit) in intervals {
let number = match unit {
TimeUnit::Second => number,
TimeUnit::Minute => number * 60,
TimeUnit::Hour => number * 60 * 60,
};
total += number;
}
total *= 1000;
Ok((remain, total))
}
enum Expr {
Col(String),
Now,
Duration(repr::Duration),
Binary {
left: Box<Expr>,
op: String,
right: Box<Expr>,
},
}
fn parse_expr(input: &str) -> IResult<&str, Expr> {
parse_expr_bp(input, 0)
}
/// a simple pratt parser
fn parse_expr_bp(input: &str, min_bp: u8) -> IResult<&str, Expr> {
let (mut input, mut lhs): (&str, Expr) = parse_item(input)?;
loop {
let (r, op) = parse_op(input)?;
let (_, (l_bp, r_bp)) = infix_binding_power(op)?;
if l_bp < min_bp {
return Ok((input, lhs));
}
let (r, rhs) = parse_expr_bp(r, r_bp)?;
input = r;
lhs = Expr::Binary {
left: Box::new(lhs),
op: op.to_string(),
right: Box::new(rhs),
};
}
}
fn parse_op(input: &str) -> IResult<&str, &str> {
alt((parse_add_sub, parse_cmp))(input)
}
fn parse_item(input: &str) -> IResult<&str, Expr> {
if let Ok((r, name)) = parse_col_name(input) {
Ok((r, Expr::Col(name.to_string())))
} else if let Ok((r, _now)) = parse_now(input) {
Ok((r, Expr::Now))
} else if let Ok((_r, _num)) = parse_quality(input) {
todo!()
} else {
todo!()
}
}
fn infix_binding_power(op: &str) -> IResult<&str, (u8, u8)> {
let ret = match op {
"<" | ">" | "<=" | ">=" => (1, 2),
"+" | "-" => (3, 4),
_ => {
return Err(nom::Err::Error(nom::error::Error::new(
op,
nom::error::ErrorKind::Fail,
)))
}
};
Ok((op, ret))
}
fn parse_col_name(input: &str) -> IResult<&str, &str> {
tuple((multispace0, alphanumeric1, multispace0))(input).map(|(r, (_, name, _))| (r, name))
}
fn parse_now(input: &str) -> IResult<&str, &str> {
tag_no_case("now()")(input)
}
fn parse_add_sub(input: &str) -> IResult<&str, &str> {
tuple((multispace0, alt((tag("+"), tag("-"))), multispace0))(input)
.map(|(r, (_, op, _))| (r, op))
}
fn parse_cmp(input: &str) -> IResult<&str, &str> {
tuple((
multispace0,
alt((tag("<="), tag(">="), tag("<"), tag(">"))),
multispace0,
))(input)
.map(|(r, (_, op, _))| (r, op))
}
/// parse a number with optional sign
fn parse_quality(input: &str) -> IResult<&str, repr::Duration> {
tuple((
multispace0,
alt((tag("+"), tag("-"), tag(""))),
digit0,
multispace0,
))(input)
.map(|(r, (_, sign, name, _))| (r, sign, name))
.and_then(|(r, sign, name)| {
let num = name.parse::<repr::Duration>().map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
})?;
let num = match sign {
"+" => num,
"-" => -num,
_ => num,
};
Ok((r, num))
})
}
#[derive(Debug, Clone)]
enum TimeUnit {
Second,
Minute,
Hour,
}
#[derive(Debug, Clone)]
enum DateUnit {
Day,
Month,
Year,
}
fn parse_time_unit(input: &str) -> IResult<&str, TimeUnit> {
fn to_second(input: &str) -> IResult<&str, TimeUnit> {
alt((
tag_no_case("second"),
tag_no_case("seconds"),
tag_no_case("S"),
))(input)
.map(move |(r, _)| (r, TimeUnit::Second))
}
fn to_minute(input: &str) -> IResult<&str, TimeUnit> {
alt((
tag_no_case("minute"),
tag_no_case("minutes"),
tag_no_case("m"),
))(input)
.map(move |(r, _)| (r, TimeUnit::Minute))
}
fn to_hour(input: &str) -> IResult<&str, TimeUnit> {
alt((tag_no_case("hour"), tag_no_case("hours"), tag_no_case("h")))(input)
.map(move |(r, _)| (r, TimeUnit::Hour))
}
tuple((
multispace0,
alt((
to_second, to_minute,
to_hour, /*
tag_no_case("day"),
tag_no_case("days"),
tag_no_case("d"),
tag_no_case("month"),
tag_no_case("months"),
tag_no_case("m"),
tag_no_case("year"),
tag_no_case("years"),
tag_no_case("y"),
*/
)),
multispace0,
))(input)
.map(|(r, (_, unit, _))| (r, unit))
}

View File

@@ -0,0 +1,147 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of grpc service for flow node
use std::net::SocketAddr;
use common_meta::node_manager::Flownode;
use common_telemetry::tracing::info;
use futures::FutureExt;
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use snafu::{ensure, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{oneshot, Mutex};
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlownodeManagerRef;
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)]
pub struct FlowService {
pub manager: FlownodeManagerRef,
}
#[async_trait::async_trait]
impl flow_server::Flow for FlowService {
async fn handle_create_remove(
&self,
request: Request<FlowRequest>,
) -> Result<Response<FlowResponse>, Status> {
let request = request.into_inner();
self.manager
.handle(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
}
async fn handle_mirror_request(
&self,
request: Request<InsertRequests>,
) -> Result<Response<FlowResponse>, Status> {
let request = request.into_inner();
// TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
let request = api::v1::region::InsertRequests {
requests: request
.requests
.into_iter()
.map(|insert| api::v1::region::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
})
.collect_vec(),
};
self.manager
.handle_inserts(request)
.await
.map(Response::new)
.map_err(|e| {
let msg = format!("failed to handle request: {:?}", e);
Status::internal(msg)
})
}
}
pub struct FlownodeServer {
pub shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
pub flow_service: FlowService,
}
impl FlownodeServer {
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
flow_server::FlowServer::new(self.flow_service.clone())
}
}
#[async_trait::async_trait]
impl servers::server::Server for FlownodeServer {
async fn shutdown(&self) -> Result<(), servers::error::Error> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
}
info!("Shutdown flow node server");
Ok(())
}
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
let (tx, rx) = oneshot::channel::<()>();
let (incoming, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu { server: "flow" }
);
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming =
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("flow server is bound to {}", addr);
*shutdown_tx = Some(tx);
(incoming, addr)
};
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
let _handle = common_runtime::spawn_bg(async move {
let _result = builder
.serve_with_incoming_shutdown(incoming, rx.map(drop))
.await
.context(StartGrpcSnafu);
});
// TODO(discord9): better place for dataflow to run per second
let manager_ref = self.flow_service.manager.clone();
let _handle = manager_ref.clone().run_background();
Ok(addr)
}
fn name(&self) -> &str {
FLOW_NODE_SERVER_NAME
}
}

View File

@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! How to query table information from database
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::adapter::error::{Error, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::adapter::TableName;
use crate::repr::{self, ColumnType, RelationType};
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
impl TableSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
TableSource {
table_info_manager,
table_name_manager,
}
}
pub async fn get_table_id_from_proto_name(
&self,
name: &greptime_proto::v1::TableName,
) -> Result<TableId, Error> {
self.table_name_manager
.get(TableNameKey::new(
&name.catalog_name,
&name.schema_name,
&name.table_name,
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})
.map(|id| id.unwrap().table_id())
}
/// If the table havn't been created in database, the tableId returned would be null
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})?
.map(|id| id.table_id());
Ok(ret)
}
/// query metasrv about the table name and table id
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
self.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})
.map(|name| name.unwrap().table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
/// query metasrv about the table name and table id
pub async fn get_table_info_value(
&self,
table_id: &TableId,
) -> Result<Option<TableInfoValue>, Error> {
Ok(self
.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})?
.map(|v| v.into_inner()))
}
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let table_name = table_info_value.table_name();
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
.column_schemas
.into_iter()
.map(|col| ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
})
.collect_vec();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok((
table_name,
RelationType {
column_types,
keys,
time_index,
},
))
}
}

View File

@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Mock test for adapter module
//! TODO(discord9): write mock test
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::ConcreteDataType;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
use super::*;
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();
let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name(table_name)
.meta(meta)
.build()
.unwrap()
}
/// Create a mock harness for flow node manager
///
/// containing several default table info and schema
fn mock_harness_flow_node_manager() {}

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::ColumnDataTypeWrapper;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use common_error::ext::BoxedError;
use datatypes::schema::ColumnSchema;
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::error::{Error, ExternalSnafu};
/// convert `ColumnSchema` lists to it's corrsponding proto type
pub fn column_schemas_to_proto(
column_schemas: Vec<ColumnSchema>,
primary_keys: &[String],
) -> Result<Vec<api::v1::ColumnSchema>, Error> {
let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
.iter()
.map(|c| {
ColumnDataTypeWrapper::try_from(c.data_type.clone())
.map(|w| w.to_parts())
.map_err(BoxedError::new)
.with_context(|_| ExternalSnafu)
})
.try_collect()?;
let ret = column_schemas
.iter()
.zip(column_datatypes)
.map(|(schema, datatype)| {
let semantic_type = if schema.is_time_index() {
SemanticType::Timestamp
} else if primary_keys.contains(&schema.name) {
SemanticType::Tag
} else {
SemanticType::Field
} as i32;
api::v1::ColumnSchema {
column_name: schema.name.clone(),
datatype: datatype.0 as i32,
semantic_type,
datatype_extension: datatype.1,
}
})
.collect();
Ok(ret)
}

View File

@@ -0,0 +1,509 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! For single-thread flow worker
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use hydroflow::scheduled::graph::Hydroflow;
use snafu::ResultExt;
use tokio::sync::{broadcast, mpsc, Mutex};
use crate::adapter::error::{Error, EvalSnafu};
use crate::adapter::FlowId;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow};
pub type SharedBuf = Arc<Mutex<VecDeque<DiffRow>>>;
/// Create both worker(`!Send`) and worker handle(`Send + Sync`)
pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
let (itc_client, itc_server) = create_inter_thread_call();
let worker_handle = WorkerHandle {
itc_client: Mutex::new(itc_client),
};
let worker = Worker {
task_states: BTreeMap::new(),
itc_server: Arc::new(Mutex::new(itc_server)),
};
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Hydroflow>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
}
}
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
df: Hydroflow::new(),
state: DataflowState::default(),
err_collector: ErrCollector::default(),
}
}
}
impl<'subgraph> ActiveDataflowState<'subgraph> {
/// Create a new render context, assigned with given global id
pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph>
where
'subgraph: 'ctx,
{
Context {
id: global_id,
df: &mut self.df,
compute_state: &mut self.state,
err_collector: self.err_collector.clone(),
input_collection: Default::default(),
local_scope: Default::default(),
}
}
pub fn set_current_ts(&mut self, ts: repr::Timestamp) {
self.state.set_current_ts(ts);
}
/// Run all available subgraph
///
/// return true if any subgraph actually executed
pub fn run_available(&mut self) -> bool {
self.state.run_available_with_schedule(&mut self.df)
}
}
#[derive(Debug)]
pub struct WorkerHandle {
itc_client: Mutex<InterThreadCallClient>,
}
impl WorkerHandle {
/// create task, return task id
///
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
task_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let req = Request::Create {
task_id,
plan,
sink_id,
sink_sender,
source_ids: source_ids.to_vec(),
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
};
let ret = self.itc_client.lock().await.call_blocking(req).await?;
if let Response::Create {
result: task_create_result,
} = ret
{
task_create_result
} else {
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Response::Create, found {ret:?}"
),
}
.fail()
.with_context(|_| EvalSnafu {})
}
}
/// remove task, return task id
pub async fn remove_flow(&self, task_id: FlowId) -> Result<bool, Error> {
let req = Request::Remove { task_id };
let ret = self.itc_client.lock().await.call_blocking(req).await?;
if let Response::Remove { result } = ret {
Ok(result)
} else {
InternalSnafu {
reason: format!("Flow Node/Worker failed, expect Response::Remove, found {ret:?}"),
}
.fail()
.with_context(|_| EvalSnafu {})
}
}
/// trigger running the worker, will not block, and will run the worker parallelly
///
/// will set the current timestamp to `now` for all dataflows before running them
pub async fn run_available(&self, now: repr::Timestamp) {
self.itc_client
.lock()
.await
.call_non_blocking(Request::RunAvail { now })
.await;
}
pub async fn contains_flow(&self, task_id: FlowId) -> Result<bool, Error> {
let req = Request::ContainTask { task_id };
let ret = self
.itc_client
.lock()
.await
.call_blocking(req)
.await
.unwrap();
if let Response::ContainTask {
result: task_contain_result,
} = ret
{
Ok(task_contain_result)
} else {
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}"
),
}
.fail()
.with_context(|_| EvalSnafu {})
}
}
/// shutdown the worker
pub async fn shutdown(&self) {
self.itc_client
.lock()
.await
.call_non_blocking(Request::Shutdown)
.await;
}
}
/// The actual worker that does the work and contain active state
#[derive(Debug)]
pub struct Worker<'subgraph> {
/// Task states
pub(crate) task_states: BTreeMap<FlowId, ActiveDataflowState<'subgraph>>,
itc_server: Arc<Mutex<InterThreadCallServer>>,
}
impl<'s> Worker<'s> {
#[allow(clippy::too_many_arguments)]
pub fn create_flow(
&mut self,
task_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let _ = expire_when;
if create_if_not_exist {
// check if the task already exists
if self.task_states.contains_key(&task_id) {
return Ok(None);
}
}
let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
..Default::default()
};
{
let mut ctx = cur_task_state.new_ctx(sink_id);
for (source_id, src_recv) in source_ids.iter().zip(src_recvs) {
let bundle = ctx.render_source(src_recv)?;
ctx.insert_global(*source_id, bundle);
}
let rendered = ctx.render_plan(plan.plan)?;
ctx.render_unbounded_sink(rendered, sink_sender);
}
self.task_states.insert(task_id, cur_task_state);
Ok(Some(task_id))
}
/// remove task, return true if a task is removed
pub fn remove_flow(&mut self, task_id: FlowId) -> bool {
self.task_states.remove(&task_id).is_some()
}
/// Run the worker, blocking, until shutdown signal is received
pub fn run(&mut self) {
loop {
let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap();
let ret = self.handle_req(req_id, req);
match ret {
Ok(Some((id, resp))) => {
self.itc_server.blocking_lock().resp(id, resp);
}
Ok(None) => continue,
Err(()) => {
break;
}
}
}
}
/// run with tick acquired from tick manager(usually means system time)
/// TODO(discord9): better tick management
pub fn run_tick(&mut self, now: repr::Timestamp) {
for (_task_id, task_state) in self.task_states.iter_mut() {
task_state.set_current_ts(now);
task_state.run_available();
}
}
/// handle request, return response if any, Err if receive shutdown signal
fn handle_req(&mut self, req_id: usize, req: Request) -> Result<Option<(usize, Response)>, ()> {
let ret = match req {
Request::Create {
task_id,
plan,
sink_id,
sink_sender,
source_ids,
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
} => {
let task_create_result = self.create_flow(
task_id,
plan,
sink_id,
sink_sender,
&source_ids,
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
);
Some((
req_id,
Response::Create {
result: task_create_result,
},
))
}
Request::Remove { task_id } => {
let ret = self.remove_flow(task_id);
Some((req_id, Response::Remove { result: ret }))
}
Request::RunAvail { now } => {
self.run_tick(now);
None
}
Request::ContainTask { task_id } => {
let ret = self.task_states.contains_key(&task_id);
Some((req_id, Response::ContainTask { result: ret }))
}
Request::Shutdown => return Err(()),
};
Ok(ret)
}
}
#[derive(Debug)]
enum Request {
Create {
task_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
},
Remove {
task_id: FlowId,
},
/// Trigger the worker to run, useful after input buffer is full
RunAvail {
now: repr::Timestamp,
},
ContainTask {
task_id: FlowId,
},
Shutdown,
}
#[derive(Debug)]
enum Response {
Create {
result: Result<Option<FlowId>, Error>,
// TODO(discord9): add flow err_collector
},
Remove {
result: bool,
},
ContainTask {
result: bool,
},
}
fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) {
let (arg_send, arg_recv) = mpsc::unbounded_channel();
let (ret_send, ret_recv) = mpsc::unbounded_channel();
let client = InterThreadCallClient {
call_id: Arc::new(Mutex::new(0)),
arg_sender: arg_send,
ret_recv,
};
let server = InterThreadCallServer {
arg_recv,
ret_sender: ret_send,
};
(client, server)
}
#[derive(Debug)]
struct InterThreadCallClient {
call_id: Arc<Mutex<usize>>,
arg_sender: mpsc::UnboundedSender<(usize, Request)>,
ret_recv: mpsc::UnboundedReceiver<(usize, Response)>,
}
impl InterThreadCallClient {
/// call without expecting responses or blocking
async fn call_non_blocking(&self, req: Request) {
let call_id = {
let mut call_id = self.call_id.lock().await;
*call_id += 1;
*call_id
};
self.arg_sender.send((call_id, req)).unwrap();
}
/// call blocking, and return the result
async fn call_blocking(&mut self, req: Request) -> Result<Response, Error> {
let call_id = {
let mut call_id = self.call_id.lock().await;
*call_id += 1;
*call_id
};
self.arg_sender.send((call_id, req)).unwrap();
// TODO(discord9): better inter thread call impl
let (ret_call_id, ret) = self.ret_recv.recv().await.unwrap();
if ret_call_id != call_id {
return InternalSnafu {
reason: "call id mismatch, worker/worker handler should be in sync",
}
.fail()
.with_context(|_| EvalSnafu {});
}
Ok(ret)
}
}
#[derive(Debug)]
struct InterThreadCallServer {
pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>,
pub ret_sender: mpsc::UnboundedSender<(usize, Response)>,
}
impl InterThreadCallServer {
pub async fn recv(&mut self) -> Option<(usize, Request)> {
self.arg_recv.recv().await
}
pub fn blocking_recv(&mut self) -> Option<(usize, Request)> {
self.arg_recv.blocking_recv()
}
/// Send response back to the client
pub fn resp(&self, call_id: usize, resp: Response) {
self.ret_sender.send((call_id, resp)).unwrap();
}
}
#[cfg(test)]
mod test {
use tokio::sync::oneshot;
use super::*;
use crate::adapter::FlowTickManager;
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};
#[tokio::test]
pub async fn test_simple_get_with_worker_and_handle() {
let flow_tick = FlowTickManager::new();
let (tx, rx) = oneshot::channel();
let worker_thread_handle = std::thread::spawn(move || {
let (handle, mut worker) = create_worker();
tx.send(handle).unwrap();
worker.run();
});
let handle = rx.await.unwrap();
let src_ids = vec![GlobalId::User(1)];
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
let (task_id, plan) = (
1,
TypedPlan {
plan: Plan::Get {
id: Id::Global(GlobalId::User(1)),
},
typ: RelationType::new(vec![]),
},
);
handle
.create_flow(
task_id,
plan,
GlobalId::User(1),
sink_tx,
&src_ids,
vec![rx],
None,
true,
ErrCollector::default(),
)
.await
.unwrap();
tx.send((Row::empty(), 0, 0)).unwrap();
handle.run_available(flow_tick.tick()).await;
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
handle.shutdown().await;
worker_thread_handle.join().unwrap();
}
}

View File

@@ -17,3 +17,7 @@
mod render;
mod state;
mod types;
pub(crate) use render::Context;
pub(crate) use state::DataflowState;
pub(crate) use types::ErrCollector;

View File

@@ -45,6 +45,7 @@ use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
mod map;
mod reduce;
mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
@@ -52,13 +53,15 @@ pub struct Context<'referred, 'df> {
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
/// TODO(discord9): remove extra clone by counting usage and remove it on last usage?
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
/// used by `Get`/`Let` Plan for getting/setting local variables
///
/// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
pub local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
// Collect all errors in this operator's evaluation
err_collector: ErrCollector,
pub err_collector: ErrCollector,
}
impl<'referred, 'df> Drop for Context<'referred, 'df> {
@@ -235,7 +238,7 @@ mod test {
for now in time_range {
state.set_current_ts(now);
state.run_available_with_schedule(df);
assert!(state.get_err_collector().inner.borrow().is_empty());
assert!(state.get_err_collector().is_empty());
if let Some(expected) = expected.get(&now) {
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
} else {

View File

@@ -153,7 +153,7 @@ fn eval_mfp_core(
) -> Vec<KeyValDiffRow> {
let mut all_updates = Vec::new();
for (mut row, _sys_time, diff) in input.into_iter() {
// this updates is expected to be only zero to two rows
// this updates is expected to be only zero, one or two rows
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
// TODO(discord9): refactor error handling
// Expect error in a single row to not interrupt the whole evaluation

View File

@@ -80,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> {
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
let data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
reduce_subgraph(
&reduce_arrange,
@@ -378,9 +382,8 @@ fn reduce_accum_subgraph(
let mut all_updates = Vec::with_capacity(key_to_vals.len());
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
// lock the arrange for write for the rest of function body
// so to prevent wide race condition since we are going to update the arrangement by write after read
// so to prevent wired race condition since we are going to update the arrangement by write after read
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {
@@ -395,6 +398,7 @@ fn reduce_accum_subgraph(
}
};
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accums = accums.inner;
// deser accums from offsets

View File

@@ -0,0 +1,161 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Source and Sink for the dataflow
use std::collections::{BTreeMap, VecDeque};
use common_telemetry::{debug, info};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
/// Render a source which comes from brocast channel into the dataflow
/// will immediately send updates not greater than `now` and buffer the rest in arrangement
pub fn render_source(
&mut self,
mut src_recv: broadcast::Receiver<DiffRow>,
) -> Result<CollectionBundle, Error> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source");
let arrange_handler = self.compute_state.new_arrange(None);
let arrange_handler_inner =
arrange_handler
.clone_future_only()
.with_context(|| PlanSnafu {
reason: "No write is expected at this point",
})?;
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
let sub = self
.df
.add_subgraph_source("source", send_port, move |_ctx, send| {
let now = *now.borrow();
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
// TODO(discord9): handling tokio broadcast error
while let Ok((r, t, d)) = src_recv.try_recv() {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
}
}
let all = prev_avail.chain(to_send).collect_vec();
if !all.is_empty() || !to_arrange.is_empty() {
debug!(
"All send: {} rows, not yet send: {} rows",
all.len(),
to_arrange.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick
inner_schd.schedule_at(now + 1);
});
schd.set_cur_subgraph(sub);
let arranged = Arranged::new(arrange_handler);
arranged.writer.borrow_mut().replace(sub);
let arranged = BTreeMap::from([(vec![], arranged)]);
Ok(CollectionBundle {
collection: Collection::from_port(recv_port),
arranged,
})
}
pub fn render_unbounded_sink(
&mut self,
bundle: CollectionBundle,
sender: mpsc::UnboundedSender<DiffRow>,
) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let _sink = self.df.add_subgraph_sink(
"UnboundedSink",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
for row in data.into_iter().flat_map(|i| i.into_iter()) {
// if the sender is closed, stop sending
if sender.is_closed() {
break;
}
// TODO(discord9): handling tokio error
let _ = sender.send(row);
}
},
);
}
/// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full
pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender<DiffRow>) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let mut buf = VecDeque::with_capacity(1000);
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let sink = self
.df
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
if sender.len() >= BROADCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BROADCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
}
}
// if buffer is not empty, schedule the next run at next tick
// so the buffer can be drained as soon as possible
if !buf.is_empty() {
inner_schd.schedule_at(*now.borrow() + 1);
}
});
schd.set_cur_subgraph(sink);
}
}

View File

@@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Default)]
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule

View File

@@ -21,7 +21,8 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use tokio::sync::RwLock;
use itertools::Itertools;
use tokio::sync::{Mutex, RwLock};
use crate::compute::render::Context;
use crate::expr::{EvalError, ScalarExpr};
@@ -146,14 +147,22 @@ impl CollectionBundle {
///
/// Using a `VecDeque` to preserve the order of errors
/// when running dataflow continuously and need errors in order
#[derive(Default, Clone)]
#[derive(Debug, Default, Clone)]
pub struct ErrCollector {
pub inner: Rc<RefCell<VecDeque<EvalError>>>,
pub inner: Arc<Mutex<VecDeque<EvalError>>>,
}
impl ErrCollector {
pub async fn get_all(&self) -> Vec<EvalError> {
self.inner.lock().await.drain(..).collect_vec()
}
pub fn is_empty(&self) -> bool {
self.inner.blocking_lock().is_empty()
}
pub fn push_err(&self, err: EvalError) {
self.inner.borrow_mut().push_back(err)
self.inner.blocking_lock().push_back(err)
}
pub fn run<F, R>(&self, f: F) -> Option<R>

View File

@@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::{Location, Snafu};
fn is_send_sync() {
fn check<T: Send + Sync>() {}
check::<EvalError>();
}
/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column

View File

@@ -17,6 +17,7 @@
use std::collections::HashMap;
use std::sync::OnceLock;
use common_telemetry::debug;
use common_time::DateTime;
use datafusion_expr::Operator;
use datafusion_substrait::logical_plan::consumer::name_to_op;
@@ -206,8 +207,9 @@ impl UnaryFunc {
from: arg_ty,
to: to.clone(),
}
})?;
Ok(res)
});
debug!("Cast to type: {to:?}, result: {:?}", res);
res
}
}
}

View File

@@ -16,7 +16,7 @@
use serde::{Deserialize, Serialize};
/// Global id's scope is in Current Worker, and is cross-dataflow
/// Global id's scope is in Current Flow node, and is cross-dataflow
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum GlobalId {
/// System namespace.

View File

@@ -206,12 +206,16 @@ impl AggregateFunc {
.fail();
}
};
let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype);
let input_type = if matches!(generic_fn, GenericFn::Count) {
ConcreteDataType::null_datatype()
} else {
arg_type.unwrap_or_else(ConcreteDataType::null_datatype)
};
rule.get(&(generic_fn, input_type.clone()))
.cloned()
.with_context(|| InvalidQuerySnafu {
reason: format!(
"No specialization found for binary function {:?} with input type {:?}",
"No specialization found for aggregate function {:?} with input type {:?}",
generic_fn, input_type
),
})

View File

@@ -30,3 +30,8 @@ mod plan;
mod repr;
mod transform;
mod utils;
pub use adapter::{
start_flow_node_with_one_worker, FlownodeBuilder, FlownodeManager, FlownodeManagerRef,
FlownodeOptions,
};

View File

@@ -18,12 +18,15 @@
mod join;
mod reduce;
use std::collections::BTreeSet;
use datatypes::arrow::ipc::Map;
use serde::{Deserialize, Serialize};
use crate::adapter::error::Error;
use crate::expr::{
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr,
AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
TypedExpr,
};
use crate::plan::join::JoinPlan;
pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
@@ -71,6 +74,7 @@ impl TypedPlan {
let mfp = MapFilterProject::new(input_arity)
.map(exprs)?
.project(input_arity..input_arity + output_arity)?;
let out_typ = self.typ.apply_mfp(&mfp, &expr_typs);
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {
Plan::Mfp {
@@ -85,8 +89,7 @@ impl TypedPlan {
mfp,
},
};
let typ = RelationType::new(expr_typs);
Ok(TypedPlan { typ, plan })
Ok(TypedPlan { typ: out_typ, plan })
}
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
@@ -182,3 +185,45 @@ pub enum Plan {
consolidate_output: bool,
},
}
impl Plan {
/// Find all the used collection in the plan
pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
match plan {
Plan::Get { id } => {
match id {
Id::Local(_) => (),
Id::Global(g) => {
used.insert(*g);
}
};
}
Plan::Let { value, body, .. } => {
recur_find_use(value, used);
recur_find_use(body, used);
}
Plan::Mfp { input, .. } => {
recur_find_use(input, used);
}
Plan::Reduce { input, .. } => {
recur_find_use(input, used);
}
Plan::Join { inputs, .. } => {
for input in inputs {
recur_find_use(input, used);
}
}
Plan::Union { inputs, .. } => {
for input in inputs {
recur_find_use(input, used);
}
}
_ => {}
}
}
let mut ret = Default::default();
recur_find_use(self, &mut ret);
ret
}
}

View File

@@ -27,7 +27,7 @@ use datatypes::types::cast;
use datatypes::types::cast::CastOption;
use datatypes::value::Value;
use itertools::Itertools;
pub(crate) use relation::{ColumnType, RelationDesc, RelationType};
pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff);
/// Row with key-value pair, timestamp and diff
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// broadcast channel capacity
pub const BROADCAST_CAP: usize = 1024;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
@@ -104,6 +107,11 @@ impl Row {
Self { inner: vec![] }
}
/// Returns true if the Row contains no elements.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Create a row from a vector of values
pub fn new(row: Vec<Value>) -> Self {
Self { inner: row }

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::adapter::error::{InvalidQuerySnafu, Result};
use crate::expr::MapFilterProject;
/// a set of column indices that are "keys" for the collection.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
@@ -75,6 +79,12 @@ impl Key {
}
}
impl Default for Key {
fn default() -> Self {
Self::new()
}
}
/// The type of a relation.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct RelationType {
@@ -96,6 +106,49 @@ pub struct RelationType {
}
impl RelationType {
pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Self {
let all_types = self
.column_types
.iter()
.chain(expr_typs.iter())
.cloned()
.collect_vec();
let mfp_out_types = mfp
.projection
.iter()
.map(|i| all_types[*i].clone())
.collect_vec();
let old_to_new_col = BTreeMap::from_iter(
mfp.projection
.clone()
.into_iter()
.enumerate()
.map(|(new, old)| (old, new)),
);
// since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
let keys = self
.keys
.iter()
.filter_map(|key| {
key.column_indices
.iter()
.map(|old| old_to_new_col.get(old).cloned())
.collect::<Option<Vec<_>>>()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
.map(Key::from)
})
.collect_vec();
let time_index = self
.time_index
.and_then(|old| old_to_new_col.get(&old).cloned());
Self {
column_types: mfp_out_types,
keys,
time_index,
}
}
/// Constructs a `RelationType` representing the relation with no columns and
/// no keys.
pub fn empty() -> Self {

View File

@@ -14,11 +14,24 @@
//! Transform Substrait into execution plan
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType as CDT;
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::QueryEngine;
use session::context::QueryContext;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu};
use crate::adapter::error::{
Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu,
NotImplementedSnafu, TableNotFoundSnafu,
};
use crate::adapter::FlownodeContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;
/// a simple macro to generate a not implemented error
macro_rules! not_impl_err {
@@ -44,7 +57,7 @@ mod literal;
mod plan;
use literal::{from_substrait_literal, from_substrait_type};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration;
@@ -79,38 +92,32 @@ impl FunctionExtensions {
}
}
/// A context that holds the information of the dataflow
pub struct DataflowContext {
/// `id` refer to any source table in the dataflow, and `name` is the name of the table
/// which is a `Vec<String>` in substrait
id_to_name: HashMap<GlobalId, Vec<String>>,
/// see `id_to_name`
name_to_id: HashMap<Vec<String>, GlobalId>,
/// the schema of the table
schema: HashMap<GlobalId, RelationType>,
}
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
pub async fn sql_to_flow_plan(
ctx: &mut FlownodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let query_ctx = ctx.query_context.clone().unwrap();
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?;
let plan = engine
.planner()
.plan(stmt, query_ctx)
.await
.context(InvalidQueryPlanSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
impl DataflowContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &Vec<String>) -> Result<(GlobalId, RelationType), Error> {
let id = self
.name_to_id
.get(name)
.copied()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}
.encode(&plan)
.context(InvalidQuerySubstraitSnafu)?;
let sub_plan = substrait::substrait_proto::proto::Plan::decode(bytes)
.map_err(|inner| InvalidQueryProstSnafu { inner }.build())?;
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?;
Ok(flow_plan)
}
#[cfg(test)]
@@ -129,17 +136,24 @@ mod test {
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> DataflowContext {
pub fn create_test_ctx() -> FlownodeContext {
let gid = GlobalId::User(0);
let name = vec!["numbers".to_string()];
let name = [
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
DataflowContext {
id_to_name: HashMap::from([(gid, name.clone())]),
name_to_id: HashMap::from([(name.clone(), gid)]),
let mut tri_map = IdToNameMap::new();
tri_map.insert(Some(name.clone()), Some(0), gid);
FlownodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
}

View File

@@ -54,11 +54,11 @@ use crate::expr::{
};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{FlownodeContext, FunctionExtensions};
impl TypedExpr {
fn from_substrait_agg_grouping(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
groupings: &[Grouping],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -84,7 +84,7 @@ impl TypedExpr {
impl AggregateExpr {
fn from_substrait_agg_measures(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
measures: &[Measure],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -218,7 +218,7 @@ impl KeyValPlan {
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
pub fn from_substrait_agg_rel(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
agg: &proto::AggregateRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
@@ -228,7 +228,7 @@ impl TypedPlan {
return not_impl_err!("Aggregate without an input is not supported");
};
let group_expr =
let group_exprs =
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?;
let mut aggr_exprs =
@@ -236,14 +236,14 @@ impl TypedPlan {
let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
&mut aggr_exprs,
&group_expr,
&group_exprs,
input.typ.column_types.len(),
)?;
let output_type = {
let mut output_types = Vec::new();
// first append group_expr as key, then aggr_expr as value
for expr in &group_expr {
for expr in &group_exprs {
output_types.push(expr.typ.clone());
}
@@ -252,7 +252,8 @@ impl TypedPlan {
aggr.func.signature().output.clone(),
));
}
RelationType::new(output_types)
// TODO(discord9): try best to get time
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
};
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
@@ -365,8 +366,8 @@ mod test {
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), true),
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::uint32_datatype(), true), // col sum(number)
ColumnType::new(CDT::uint32_datatype(), false), // col number
]),
plan: Plan::Mfp {
input: Box::new(Plan::Reduce {

View File

@@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{FlownodeContext, FunctionExtensions};
impl TypedPlan {
/// Convert Substrait Plan into Flow's TypedPlan
pub fn from_substrait_plan(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
plan: &SubPlan,
) -> Result<TypedPlan, Error> {
// Register function extension
@@ -62,7 +62,7 @@ impl TypedPlan {
/// Convert Substrait Rel into Flow's TypedPlan
/// TODO: SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
@@ -114,7 +114,28 @@ impl TypedPlan {
}
Some(RelType::Read(read)) => {
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
let table_reference = nt.names.clone();
let query_ctx = ctx.query_context.clone().unwrap();
let table_reference = match nt.names.len() {
1 => [
query_ctx.current_catalog().to_string(),
query_ctx.current_schema().to_string(),
nt.names[0].clone(),
],
2 => [
query_ctx.current_catalog().to_string(),
nt.names[0].clone(),
nt.names[1].clone(),
],
3 => [
nt.names[0].clone(),
nt.names[1].clone(),
nt.names[2].clone(),
],
_ => InvalidQuerySnafu {
reason: "Expect table to have name",
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),

View File

@@ -31,16 +31,19 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
pub struct StandaloneDatanodeManager(pub RegionServer);
pub struct StandaloneDatanodeManager {
pub region_server: RegionServer,
pub flow_server: FlownodeRef,
}
#[async_trait]
impl NodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
RegionInvoker::arc(self.0.clone())
RegionInvoker::arc(self.region_server.clone())
}
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
self.flow_server.clone()
}
}

View File

@@ -38,6 +38,7 @@ common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true

View File

@@ -35,6 +35,7 @@ use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::FlownodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
@@ -129,6 +130,7 @@ impl GreptimeDbStandaloneBuilder {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager = KvBackendCatalogManager::new(
@@ -139,7 +141,19 @@ impl GreptimeDbStandaloneBuilder {
)
.await;
let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build().await);
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
});
let table_id_sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -193,6 +207,11 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
flownode
.set_frontend_invoker(Box::new(instance.clone()))
.await;
let _node_handle = flownode.run_background();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();