chore: refactor& remove TODOs

This commit is contained in:
discord9
2024-05-09 16:51:19 +08:00
parent 684850f451
commit e840bb469d
19 changed files with 700 additions and 562 deletions

View File

@@ -4,10 +4,6 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[[bin]]
name = "footprint"
path = "bin/mem_footprint.rs"
[lints]
workspace = true

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {}

View File

@@ -14,10 +14,12 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
use std::borrow::{Borrow, BorrowMut};
#![warn(unused_imports)]
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use api::v1::flow::flow_server::Flow;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::memory::MemoryCatalogManager;
@@ -29,7 +31,7 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::{debug, info};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
@@ -50,7 +52,9 @@ use tokio::task::LocalSet;
use crate::adapter::error::{
EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap};
use crate::adapter::parse_expr::{parse_duration, parse_fixed};
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::{Context, DataflowState, ErrCollector};
@@ -61,14 +65,17 @@ use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
mod flownode_impl;
mod parse_expr;
mod server;
mod standalone;
#[cfg(test)]
mod tests;
mod util;
mod worker;
mod node_context;
mod table_source;
use error::Error;
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
@@ -180,35 +187,17 @@ pub struct FlownodeManager {
/// The query engine that will be used to parse the query and convert it to a dataflow plan
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableInfoSource,
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlowNodeContext>,
node_context: Mutex<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
tick_manager: FlowTickManager,
node_id: Option<u32>,
}
/// Building FlownodeManager
impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager task");
common_runtime::spawn_bg(async move {
self.run().await;
})
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
self.run_available().await;
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
/// set frontend invoker
pub async fn set_frontend_invoker(
self: &Arc<Self>,
@@ -223,11 +212,11 @@ impl FlownodeManager {
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
let srv_map = TableInfoSource::new(
let srv_map = TableSource::new(
table_meta.table_info_manager().clone(),
table_meta.table_name_manager().clone(),
);
let node_context = FlowNodeContext::default();
let node_context = FlownodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlownodeManager {
@@ -236,6 +225,7 @@ impl FlownodeManager {
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
flow_err_collectors: Default::default(),
tick_manager,
node_id,
}
@@ -259,135 +249,13 @@ impl FlownodeManager {
}
}
/// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager
/// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager
#[test]
fn check_is_send() {
fn is_send<T: Send + Sync>() {}
is_send::<FlownodeManager>();
}
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableInfoSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
impl TableInfoSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
TableInfoSource {
table_info_manager,
table_name_manager,
}
}
pub async fn get_table_id_from_proto_name(
&self,
name: &greptime_proto::v1::TableName,
) -> Result<TableId, Error> {
self.table_name_manager
.get(TableNameKey::new(
&name.catalog_name,
&name.schema_name,
&name.table_name,
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})
.map(|id| id.unwrap().table_id())
}
/// If the table havn't been created in database, the tableId returned would be null
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})?
.map(|id| id.table_id());
Ok(ret)
}
/// query metasrv about the table name and table id
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
self.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})
.map(|name| name.unwrap().table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
/// query metasrv about the table name and table id
pub async fn get_table_info_value(
&self,
table_id: &TableId,
) -> Result<Option<TableInfoValue>, Error> {
Ok(self
.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})?
.map(|v| v.into_inner()))
}
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let table_name = table_info_value.table_name();
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
.column_schemas
.into_iter()
.map(|col| ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
})
.collect_vec();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok((
table_name,
RelationType {
column_types,
keys,
time_index,
},
))
}
}
#[derive(Debug)]
pub enum DiffRequest {
Insert(Vec<(Row, repr::Timestamp)>),
Delete(Vec<(Row, repr::Timestamp)>),
}
/// iterate through the diff row and from from continuous diff row with same diff type
/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
for (row, ts, diff) in rows {
@@ -409,30 +277,8 @@ pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
reqs
}
/// This impl block contains methods to send writeback requests to frontend
impl FlownodeManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
worker.lock().await.run_available(now).await;
}
}
/// send write request to related source sender
pub async fn handle_write_request(
&self,
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
Ok(())
}
/// TODO(discord9): merge all same type of diff row into one requests
///
/// Return the number of requests it made
@@ -448,70 +294,70 @@ impl FlownodeManager {
}
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
/*let table_id = self
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema) = if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(&table_name)
.await?;
let table_info = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.unwrap();
let primary_keys = table_info
.table_info
.meta
.primary_key_indices
.into_iter()
.map(|i| {
table_info.table_info.meta.schema.column_schemas[i]
.name
.clone()
})
.collect_vec();
let schema = table_info.table_info.meta.schema.column_schemas;
*/
let primary_keys = vec![];
let (schema_wout_ts, with_ts) = {
let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
.map(|x| x.1)
{
let table_info = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
let ts_col = ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
let wout_ts = schema
.column_types
let meta = table_info.table_info.meta;
let primary_keys = meta
.primary_key_indices
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
})
.map(|i| meta.schema.column_schemas[i].name.clone())
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(ts_col);
(wout_ts, with_ts)
let schema = meta.schema.column_schemas;
(primary_keys, schema)
} else {
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
let primary_keys = vec![];
let with_ts = {
let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
.map(|x| x.1)
.unwrap();
let schema = node_ctx
.schema
.get(&gid)
.with_context(|| TableNotFoundSnafu {
name: format!("Table name = {:?}", table_name),
})?
.clone();
let ts_col = ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
let wout_ts = schema
.column_types
.into_iter()
.enumerate()
.map(|(idx, typ)| {
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
})
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(ts_col);
with_ts
};
(primary_keys, with_ts)
};
let _proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?;
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?;
info!(
debug!(
"Sending {} writeback requests to table {}",
reqs.len(),
table_name.join(".")
@@ -533,7 +379,7 @@ impl FlownodeManager {
let req = RowInsertRequest {
table_name,
rows: Some(v1::Rows {
schema: proto_schema_with_ts.clone(),
schema: proto_schema.clone(),
rows: rows_proto,
}),
};
@@ -565,7 +411,7 @@ impl FlownodeManager {
let req = RowDeleteRequest {
table_name,
rows: Some(v1::Rows {
schema: proto_schema_with_ts.clone(),
schema: proto_schema.clone(),
rows: rows_proto,
}),
};
@@ -609,7 +455,57 @@ impl FlownodeManager {
}
output
}
}
/// Flow Runtime related methods
impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
common_runtime::spawn_bg(async move {
self.run().await;
})
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
self.run_available().await;
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
worker.lock().await.run_available(now).await;
}
}
/// send write request to related source sender
pub async fn handle_write_request(
&self,
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
Ok(())
}
}
/// Create&Remove flow
impl FlownodeManager {
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {
@@ -662,14 +558,12 @@ impl FlownodeManager {
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
// TODO(discord9): pass the actual `QueryContext` in here
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
info!("Flow Plan is {:?}", flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
// TODO(discord9): parse `expire_when`
let expire_when = expire_when
.and_then(|s| {
if s.is_empty() || s.split_whitespace().join("").is_empty() {
@@ -701,7 +595,11 @@ impl FlownodeManager {
.iter()
.map(|id| node_ctx.get_source_by_global_id(id).map(|s| s.subscribe()))
.collect::<Result<Vec<_>, _>>()?;
let err_collector = ErrCollector::default();
self.flow_err_collectors
.write()
.await
.insert(flow_id, err_collector.clone());
let handle = &self.worker_handles[0].lock().await;
handle
.create_flow(
@@ -713,6 +611,7 @@ impl FlownodeManager {
source_senders,
expire_when,
create_if_not_exist,
err_collector,
)
.await?;
info!("Successfully create flow with id={}", flow_id);
@@ -720,278 +619,10 @@ impl FlownodeManager {
}
}
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlowNodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or infered from TypedPlan
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: TriMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlowNodeContext {
// return number of rows it actuall send(including what's in the buffer)
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
Ok(row_cnt)
}
}
impl FlowNodeContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corrseponding broadcast sender/receiver
fn register_task_src_sink(
&mut self,
task_id: FlowId,
source_table_ids: &[TableId],
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
.insert(task_id);
}
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
}
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}
pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?
.1
.with_context(|| TableNotFoundSnafu {
name: format!("Table Id = {:?}", id),
})?;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})
}
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?;
self.sink_receiver
.get(&table_name)
.map(|(s, _r)| s.clone())
.with_context(|| TableNotFoundSnafu {
name: table_name.join("."),
})
}
}
impl FlowNodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
let id = self
.table_repr
.get_by_name(name)
.map(|(_tid, gid)| gid)
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// require at least one of `table_name` or `table_id` to be `Some`
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableInfoSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
// if we can find by table name/id. not assign it
if let Some(gid) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
.map(|(_, gid)| gid)
.or_else(|| {
table_id
.and_then(|id| self.table_repr.get_by_table_id(&id))
.map(|(_, gid)| gid)
})
{
Ok(gid)
} else {
let global_id = self.new_global_id();
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
self.table_repr.insert(table_name, table_id, global_id);
Ok(global_id)
}
}
/// Assign a schema to a table
///
/// TODO(discord9): error handling
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
}
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default, Debug)]
pub struct TriMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
}
impl TriMap {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
name.clone()
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
self.name_to_global_id.get(name).map(|global_id| {
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
(*id, *global_id)
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
self.id_to_global_id.get(id).map(|global_id| {
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
(name.clone(), *global_id)
})
}
pub fn get_by_global_id(
&self,
global_id: &GlobalId,
) -> Option<(Option<TableName>, Option<TableId>)> {
self.global_id_to_name_id.get(global_id).cloned()
}
}
/// FlowTickManager is a manager for flow tick
/// FlowTickManager is a manager for flow tick, which trakc flow execution progress
///
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
/// TSO coord mess
#[derive(Clone)]
pub struct FlowTickManager {
anchor: Anchor,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! impl `FlowNode` trait for FlowNodeManager so standalone can easily call them
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
@@ -26,6 +26,7 @@ use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
.unwrap_err()

View File

@@ -0,0 +1,332 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Node context, prone to change with every incoming requests
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
use hydroflow::scheduled::graph::Hydroflow;
use itertools::Itertools;
use minstant::Anchor;
use prost::bytes::buf;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
use tokio::task::LocalSet;
use crate::adapter::error::{
Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::parse_expr::{parse_duration, parse_fixed};
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP};
use crate::transform::sql_to_flow_plan;
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or infered from TypedPlan
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
// return number of rows it actuall send(including what's in the buffer)
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
Ok(row_cnt)
}
}
impl FlownodeContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corrseponding broadcast sender/receiver
pub fn register_task_src_sink(
&mut self,
task_id: FlowId,
source_table_ids: &[TableId],
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
.insert(task_id);
}
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
}
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}
pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?
.1
.with_context(|| TableNotFoundSnafu {
name: format!("Table Id = {:?}", id),
})?;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})
}
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?;
self.sink_receiver
.get(&table_name)
.map(|(s, _r)| s.clone())
.with_context(|| TableNotFoundSnafu {
name: table_name.join("."),
})
}
}
impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
let id = self
.table_repr
.get_by_name(name)
.map(|(_tid, gid)| gid)
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// require at least one of `table_name` or `table_id` to be `Some`
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
// if we can find by table name/id. not assign it
if let Some(gid) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
.map(|(_, gid)| gid)
.or_else(|| {
table_id
.and_then(|id| self.table_repr.get_by_table_id(&id))
.map(|(_, gid)| gid)
})
{
Ok(gid)
} else {
let global_id = self.new_global_id();
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
self.table_repr.insert(table_name, table_id, global_id);
Ok(global_id)
}
}
/// Assign a schema to a table
///
/// TODO(discord9): error handling
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
}
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default, Debug)]
pub struct IdToNameMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
}
impl IdToNameMap {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
name.clone()
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
self.name_to_global_id.get(name).map(|global_id| {
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
(*id, *global_id)
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
self.id_to_global_id.get(id).map(|global_id| {
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
(name.clone(), *global_id)
})
}
pub fn get_by_global_id(
&self,
global_id: &GlobalId,
) -> Option<(Option<TableName>, Option<TableId>)> {
self.global_id_to_name_id.get(global_id).cloned()
}
}

View File

@@ -37,6 +37,7 @@ use crate::adapter::{FlownodeManager, FlownodeManagerRef};
use crate::repr::{self, DiffRow};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
#[derive(Clone)]
pub struct FlowService {
pub manager: FlownodeManagerRef,
@@ -86,19 +87,19 @@ impl flow_server::Flow for FlowService {
}
}
pub struct FlowNodeServer {
pub struct FlownodeServer {
pub shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
pub flow_service: FlowService,
}
impl FlowNodeServer {
impl FlownodeServer {
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
flow_server::FlowServer::new(self.flow_service.clone())
}
}
#[async_trait::async_trait]
impl servers::server::Server for FlowNodeServer {
impl servers::server::Server for FlownodeServer {
async fn shutdown(&self) -> Result<(), servers::error::Error> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {

View File

@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! How to query table information from database
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
use hydroflow::scheduled::graph::Hydroflow;
use itertools::Itertools;
use minstant::Anchor;
use prost::bytes::buf;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
use tokio::task::LocalSet;
use crate::adapter::error::{
Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap};
use crate::adapter::parse_expr::{parse_duration, parse_fixed};
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::adapter::TableName;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP};
use crate::transform::sql_to_flow_plan;
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
impl TableSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
TableSource {
table_info_manager,
table_name_manager,
}
}
pub async fn get_table_id_from_proto_name(
&self,
name: &greptime_proto::v1::TableName,
) -> Result<TableId, Error> {
self.table_name_manager
.get(TableNameKey::new(
&name.catalog_name,
&name.schema_name,
&name.table_name,
))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})
.map(|id| id.unwrap().table_id())
}
/// If the table havn't been created in database, the tableId returned would be null
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})?
.map(|id| id.table_id());
Ok(ret)
}
/// query metasrv about the table name and table id
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
self.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})
.map(|name| name.unwrap().table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
/// query metasrv about the table name and table id
pub async fn get_table_info_value(
&self,
table_id: &TableId,
) -> Result<Option<TableInfoValue>, Error> {
Ok(self
.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})?
.map(|v| v.into_inner()))
}
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let table_name = table_info_value.table_name();
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
.column_schemas
.into_iter()
.map(|col| ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
})
.collect_vec();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok((
table_name,
RelationType {
column_types,
keys,
time_index,
},
))
}
}

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Somewhere integration-ish(more like mock) test for adapter module
//! Mock test for adapter module
//! TODO(discord9): write mock test
use common_meta::key::table_info::TableInfoKey;
use common_meta::kv_backend::memory::MemoryKvBackend;

View File

@@ -14,11 +14,14 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
use common_error::ext::BoxedError;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
use itertools::Itertools;
use snafu::ResultExt;
use crate::adapter::error::Error;
use crate::adapter::error::{Error, ExternalSnafu};
/// TODO(discord9): error handling
/// convert `ColumnSchema` lists to it's corrsponding proto type
pub fn column_schemas_to_proto(
column_schemas: Vec<ColumnSchema>,
primary_keys: &[String],
@@ -28,9 +31,10 @@ pub fn column_schemas_to_proto(
.map(|c| {
ColumnDataTypeWrapper::try_from(c.data_type.clone())
.map(|w| w.to_parts())
.unwrap()
.map_err(BoxedError::new)
.with_context(|_| ExternalSnafu)
})
.collect::<Vec<_>>();
.try_collect()?;
let ret = column_schemas
.iter()

View File

@@ -107,12 +107,6 @@ pub struct WorkerHandle {
itc_client: Mutex<InterThreadCallClient>,
}
#[test]
fn check_if_send_sync() {
fn check<T: Send + Sync>() {}
check::<WorkerHandle>();
}
impl WorkerHandle {
/// create task, return task id
///
@@ -127,6 +121,7 @@ impl WorkerHandle {
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let req = Request::Create {
task_id,
@@ -137,6 +132,7 @@ impl WorkerHandle {
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
};
let ret = self.itc_client.lock().await.call_blocking(req).await?;
@@ -235,9 +231,10 @@ impl<'s> Worker<'s> {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
// TODO(discord9): set expire duration for all arrangment and compare to sys timestamp instead
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let _ = expire_when;
if create_if_not_exist {
@@ -247,7 +244,10 @@ impl<'s> Worker<'s> {
}
}
let mut cur_task_state = ActiveDataflowState::<'s>::default();
let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
..Default::default()
};
{
let mut ctx = cur_task_state.new_ctx(sink_id);
@@ -305,6 +305,7 @@ impl<'s> Worker<'s> {
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
} => {
let task_create_result = self.create_flow(
task_id,
@@ -315,6 +316,7 @@ impl<'s> Worker<'s> {
src_recvs,
expire_when,
create_if_not_exist,
err_collector,
);
Some((
req_id,
@@ -352,6 +354,7 @@ enum Request {
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
err_collector: ErrCollector,
},
Remove {
task_id: FlowId,
@@ -494,6 +497,7 @@ mod test {
vec![rx],
None,
true,
ErrCollector::default(),
)
.await
.unwrap();

View File

@@ -145,8 +145,6 @@ fn mfp_subgraph(
/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
/// return the output updates **And** possibly any number of errors that occurred during the evaluation
///
/// TODO(discord9): deal with primary key overwrite issue
fn eval_mfp_core(
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,

View File

@@ -21,6 +21,7 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::{Mutex, RwLock};
use crate::compute::render::Context;
@@ -152,9 +153,14 @@ pub struct ErrCollector {
}
impl ErrCollector {
pub async fn get_all(&self) -> Vec<EvalError> {
self.inner.lock().await.drain(..).collect_vec()
}
pub fn is_empty(&self) -> bool {
self.inner.blocking_lock().is_empty()
}
pub fn push_err(&self, err: EvalError) {
self.inner.blocking_lock().push_back(err)
}

View File

@@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::{Location, Snafu};
fn is_send_sync() {
fn check<T: Send + Sync>() {}
check::<EvalError>();
}
/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column

View File

@@ -51,7 +51,7 @@ pub type DiffRow = (Row, Timestamp, Diff);
/// Row with key-value pair, timestamp and diff
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// broadcast channel capacity, set to a arbitrary value
/// broadcast channel capacity
pub const BROADCAST_CAP: usize = 1024;
/// Convert a value that is or can be converted to Datetime to internal timestamp

View File

@@ -29,7 +29,7 @@ use crate::adapter::error::{
Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu,
NotImplementedSnafu, TableNotFoundSnafu,
};
use crate::adapter::FlowNodeContext;
use crate::adapter::FlownodeContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;
@@ -94,10 +94,8 @@ impl FunctionExtensions {
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
///
/// TODO(discord9): check if use empty `QueryContext` influence anything
pub async fn sql_to_flow_plan(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
@@ -138,10 +136,10 @@ mod test {
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::adapter::TriMap;
use crate::adapter::IdToNameMap;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> FlowNodeContext {
pub fn create_test_ctx() -> FlownodeContext {
let gid = GlobalId::User(0);
let name = [
"greptime".to_string(),
@@ -149,9 +147,9 @@ mod test {
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
let mut tri_map = TriMap::new();
let mut tri_map = IdToNameMap::new();
tri_map.insert(Some(name.clone()), Some(0), gid);
FlowNodeContext {
FlownodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),

View File

@@ -54,11 +54,11 @@ use crate::expr::{
};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{FlowNodeContext, FunctionExtensions};
use crate::transform::{FlownodeContext, FunctionExtensions};
impl TypedExpr {
fn from_substrait_agg_grouping(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
groupings: &[Grouping],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -84,7 +84,7 @@ impl TypedExpr {
impl AggregateExpr {
fn from_substrait_agg_measures(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
measures: &[Measure],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -218,7 +218,7 @@ impl KeyValPlan {
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
pub fn from_substrait_agg_rel(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
agg: &proto::AggregateRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {

View File

@@ -100,7 +100,7 @@ impl TypedExpr {
},
)
.unzip();
info!("Function: {:?}", f);
match arg_len {
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {

View File

@@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{FlowNodeContext, FunctionExtensions};
use crate::transform::{FlownodeContext, FunctionExtensions};
impl TypedPlan {
/// Convert Substrait Plan into Flow's TypedPlan
pub fn from_substrait_plan(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
plan: &SubPlan,
) -> Result<TypedPlan, Error> {
// Register function extension
@@ -62,7 +62,7 @@ impl TypedPlan {
/// Convert Substrait Rel into Flow's TypedPlan
/// TODO: SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut FlowNodeContext,
ctx: &mut FlownodeContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {

View File

@@ -210,7 +210,7 @@ impl GreptimeDbStandaloneBuilder {
flownode
.set_frontend_invoker(Box::new(instance.clone()))
.await;
flownode.run_background().await.unwrap();
let _node_handle = flownode.run_background();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();