feat(flow): render source/sink (#3903)

* feat(flow): render src/sink

* chore: add empty impl

* chore: typos

* refactor: according to review(WIP)

* refactor: reexport df_sbustrait&use to_sub_plan

* fix: add implict location to error enum

* fix: error handling unwrap query_ctx
This commit is contained in:
discord9
2024-05-13 19:58:02 +08:00
committed by GitHub
parent 9d12496aaf
commit be1eb4efb7
27 changed files with 933 additions and 142 deletions

43
Cargo.lock generated
View File

@@ -2477,6 +2477,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling"
version = "0.14.4"
@@ -3788,20 +3798,28 @@ name = "flow"
version = "0.7.2"
dependencies = [
"api",
"async-trait",
"catalog",
"common-base",
"common-catalog",
"common-decimal",
"common-error",
"common-frontend",
"common-macro",
"common-meta",
"common-runtime",
"common-telemetry",
"common-time",
"datafusion-common 37.0.0",
"datafusion-expr 37.0.0",
"datafusion-substrait",
"datatypes",
"enum_dispatch",
"futures",
"greptime-proto",
"hydroflow",
"itertools 0.10.5",
"minstant",
"nom",
"num-traits",
"prost 0.12.4",
"query",
@@ -3811,6 +3829,7 @@ dependencies = [
"session",
"smallvec",
"snafu 0.8.2",
"store-api",
"strum 0.25.0",
"substrait 0.7.2",
"table",
@@ -5690,6 +5709,16 @@ dependencies = [
"adler",
]
[[package]]
name = "minstant"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db"
dependencies = [
"ctor",
"web-time 1.1.0",
]
[[package]]
name = "mio"
version = "0.8.11"
@@ -11334,7 +11363,7 @@ dependencies = [
"tracing-core",
"tracing-log 0.2.0",
"tracing-subscriber",
"web-time",
"web-time 0.2.4",
]
[[package]]
@@ -12092,6 +12121,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webbrowser"
version = "0.8.15"

View File

@@ -206,6 +206,7 @@ common-wal = { path = "src/common/wal" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend" }
index = { path = "src/index" }
log-store = { path = "src/log-store" }

View File

@@ -69,14 +69,21 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
fn encode(&self, plan: &Self::Plan) -> Result<Bytes, Self::Error> {
let mut buf = BytesMut::new();
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let context = SessionContext::new_with_state(session_state);
let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?;
let substrait_plan = self.to_sub_plan(plan)?;
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())
}
}
impl DFLogicalSubstraitConvertor {
pub fn to_sub_plan(&self, plan: &LogicalPlan) -> Result<Box<Plan>, Error> {
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let context = SessionContext::new_with_state(session_state);
to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)
}
}

View File

@@ -23,11 +23,14 @@ use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::catalog::CatalogProviderList;
use datafusion::execution::context::SessionState;
/// Re-export the Substrait module of datafusion,
/// note this is a different version of the `substrait_proto` crate
pub use datafusion_substrait::substrait as substrait_proto_df;
pub use datafusion_substrait::{logical_plan as df_logical_plan, variation_const};
use session::context::QueryContextRef;
pub use substrait_proto;
pub use crate::df_substrait::DFLogicalSubstraitConvertor;
#[async_trait]
pub trait SubstraitPlan {
type Error: std::error::Error;

View File

@@ -9,27 +9,41 @@ workspace = true
[dependencies]
api.workspace = true
catalog.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
async-trait.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
prost.workspace = true
query.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tonic.workspace = true

View File

@@ -16,3 +16,6 @@
//! and communicating with other parts of the database
pub(crate) mod error;
pub(crate) mod node_context;
pub(crate) use node_context::FlownodeContext;

View File

@@ -16,12 +16,11 @@
use std::any::Any;
use common_error::ext::BoxedError;
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
@@ -32,6 +31,16 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error"))]
External {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Internal error"))]
Internal { location: Location, reason: String },
/// TODO(discord9): add detailed location of column
#[snafu(display("Failed to eval stream"))]
Eval {
@@ -47,6 +56,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table not found: {msg}, meta error: {source}"))]
TableNotFoundMeta {
source: common_meta::error::Error,
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Table already exist: {name}"))]
TableAlreadyExist {
name: String,
@@ -62,6 +79,27 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid query plan: {source}"))]
InvalidQueryPlan {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
InvalidQueryProst {
inner: api::DecodeError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
InvalidQuerySubstrait {
source: substrait::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid query: {reason}"))]
InvalidQuery {
reason: String,
@@ -112,6 +150,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected: {reason}"))]
Unexpected {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
/// Result type for flow module
@@ -124,14 +169,21 @@ impl ErrorExt for Error {
StatusCode::Internal
}
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } => StatusCode::TableNotFound,
&Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => {
StatusCode::PlanQuery
Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => {
StatusCode::TableNotFound
}
Self::NoProtoType { .. } => StatusCode::Unexpected,
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected,
&Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported
}
&Self::External { .. } => StatusCode::Unknown,
Self::Internal { .. } => StatusCode::Internal,
}
}

View File

@@ -0,0 +1,315 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Node context, prone to change with every incoming requests
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];
pub struct TableSource {}
impl TableSource {
pub async fn get_table_name_schema(
&self,
_table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
todo!()
}
}
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
// return number of rows it actual send(including what's in the buffer)
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
Ok(row_cnt)
}
}
impl FlownodeContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corresponding broadcast sender/receiver
pub fn register_task_src_sink(
&mut self,
task_id: FlowId,
source_table_ids: &[TableId],
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
.insert(task_id);
}
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
}
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}
pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?
.1
.with_context(|| TableNotFoundSnafu {
name: format!("Table Id = {:?}", id),
})?;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})
}
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?;
self.sink_receiver
.get(&table_name)
.map(|(s, _r)| s.clone())
.with_context(|| TableNotFoundSnafu {
name: table_name.join("."),
})
}
}
impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
let id = self
.table_repr
.get_by_name(name)
.map(|(_tid, gid)| gid)
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// require at least one of `table_name` or `table_id` to be `Some`
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection referred as GlobalId
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
// if we can find by table name/id. not assign it
if let Some(gid) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
.map(|(_, gid)| gid)
.or_else(|| {
table_id
.and_then(|id| self.table_repr.get_by_table_id(&id))
.map(|(_, gid)| gid)
})
{
Ok(gid)
} else {
let global_id = self.new_global_id();
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database havn't assign one yet or we don't need it
self.table_repr.insert(table_name, table_id, global_id);
Ok(global_id)
}
}
/// Assign a schema to a table
///
/// TODO(discord9): error handling
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
}
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default, Debug)]
pub struct IdToNameMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
}
impl IdToNameMap {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
name.clone()
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
self.name_to_global_id.get(name).map(|global_id| {
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
(*id, *global_id)
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
self.id_to_global_id.get(id).map(|global_id| {
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
(name.clone(), *global_id)
})
}
pub fn get_by_global_id(
&self,
global_id: &GlobalId,
) -> Option<(Option<TableName>, Option<TableId>)> {
self.global_id_to_name_id.get(global_id).cloned()
}
}

View File

@@ -17,3 +17,7 @@
mod render;
mod state;
mod types;
pub(crate) use render::Context;
pub(crate) use state::DataflowState;
pub(crate) use types::ErrCollector;

View File

@@ -45,6 +45,7 @@ use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
mod map;
mod reduce;
mod src_sink;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
@@ -52,13 +53,15 @@ pub struct Context<'referred, 'df> {
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
///
/// TODO(discord9): remove extra clone by counting usage and remove it on last usage?
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
/// used by `Get`/`Let` Plan for getting/setting local variables
///
/// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
pub local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
// Collect all errors in this operator's evaluation
err_collector: ErrCollector,
pub err_collector: ErrCollector,
}
impl<'referred, 'df> Drop for Context<'referred, 'df> {
@@ -235,7 +238,7 @@ mod test {
for now in time_range {
state.set_current_ts(now);
state.run_available_with_schedule(df);
assert!(state.get_err_collector().inner.borrow().is_empty());
assert!(state.get_err_collector().is_empty());
if let Some(expected) = expected.get(&now) {
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
} else {

View File

@@ -153,7 +153,7 @@ fn eval_mfp_core(
) -> Vec<KeyValDiffRow> {
let mut all_updates = Vec::new();
for (mut row, _sys_time, diff) in input.into_iter() {
// this updates is expected to be only zero to two rows
// this updates is expected to be only zero, one or two rows
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
// TODO(discord9): refactor error handling
// Expect error in a single row to not interrupt the whole evaluation

View File

@@ -80,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> {
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
let data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
reduce_subgraph(
&reduce_arrange,
@@ -378,9 +382,8 @@ fn reduce_accum_subgraph(
let mut all_updates = Vec::with_capacity(key_to_vals.len());
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
// lock the arrange for write for the rest of function body
// so to prevent wide race condition since we are going to update the arrangement by write after read
// so to prevent wired race condition since we are going to update the arrangement by write after read
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {
@@ -395,6 +398,7 @@ fn reduce_accum_subgraph(
}
};
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accums = accums.inner;
// deser accums from offsets

View File

@@ -0,0 +1,161 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Source and Sink for the dataflow
use std::collections::{BTreeMap, VecDeque};
use common_telemetry::{debug, info};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
use tokio::sync::{broadcast, mpsc};
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
/// Render a source which comes from brocast channel into the dataflow
/// will immediately send updates not greater than `now` and buffer the rest in arrangement
pub fn render_source(
&mut self,
mut src_recv: broadcast::Receiver<DiffRow>,
) -> Result<CollectionBundle, Error> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source");
let arrange_handler = self.compute_state.new_arrange(None);
let arrange_handler_inner =
arrange_handler
.clone_future_only()
.with_context(|| PlanSnafu {
reason: "No write is expected at this point",
})?;
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
let sub = self
.df
.add_subgraph_source("source", send_port, move |_ctx, send| {
let now = *now.borrow();
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
// TODO(discord9): handling tokio broadcast error
while let Ok((r, t, d)) = src_recv.try_recv() {
if t <= now {
to_send.push((r, t, d));
} else {
to_arrange.push(((r, Row::empty()), t, d));
}
}
let all = prev_avail.chain(to_send).collect_vec();
if !all.is_empty() || !to_arrange.is_empty() {
debug!(
"All send: {} rows, not yet send: {} rows",
all.len(),
to_arrange.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick
inner_schd.schedule_at(now + 1);
});
schd.set_cur_subgraph(sub);
let arranged = Arranged::new(arrange_handler);
arranged.writer.borrow_mut().replace(sub);
let arranged = BTreeMap::from([(vec![], arranged)]);
Ok(CollectionBundle {
collection: Collection::from_port(recv_port),
arranged,
})
}
pub fn render_unbounded_sink(
&mut self,
bundle: CollectionBundle,
sender: mpsc::UnboundedSender<DiffRow>,
) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let _sink = self.df.add_subgraph_sink(
"UnboundedSink",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
for row in data.into_iter().flat_map(|i| i.into_iter()) {
// if the sender is closed, stop sending
if sender.is_closed() {
break;
}
// TODO(discord9): handling tokio error
let _ = sender.send(row);
}
},
);
}
/// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full
pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender<DiffRow>) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let mut buf = VecDeque::with_capacity(1000);
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let sink = self
.df
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
if sender.len() >= BROADCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BROADCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
}
}
// if buffer is not empty, schedule the next run at next tick
// so the buffer can be drained as soon as possible
if !buf.is_empty() {
inner_schd.schedule_at(*now.borrow() + 1);
}
});
schd.set_cur_subgraph(sink);
}
}

View File

@@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Default)]
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule

View File

@@ -21,7 +21,8 @@ use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use tokio::sync::RwLock;
use itertools::Itertools;
use tokio::sync::{Mutex, RwLock};
use crate::compute::render::Context;
use crate::expr::{EvalError, ScalarExpr};
@@ -146,14 +147,22 @@ impl CollectionBundle {
///
/// Using a `VecDeque` to preserve the order of errors
/// when running dataflow continuously and need errors in order
#[derive(Default, Clone)]
#[derive(Debug, Default, Clone)]
pub struct ErrCollector {
pub inner: Rc<RefCell<VecDeque<EvalError>>>,
pub inner: Arc<Mutex<VecDeque<EvalError>>>,
}
impl ErrCollector {
pub async fn get_all(&self) -> Vec<EvalError> {
self.inner.lock().await.drain(..).collect_vec()
}
pub fn is_empty(&self) -> bool {
self.inner.blocking_lock().is_empty()
}
pub fn push_err(&self, err: EvalError) {
self.inner.borrow_mut().push_back(err)
self.inner.blocking_lock().push_back(err)
}
pub fn run<F, R>(&self, f: F) -> Option<R>

View File

@@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::{Location, Snafu};
fn is_send_sync() {
fn check<T: Send + Sync>() {}
check::<EvalError>();
}
/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column

View File

@@ -17,9 +17,9 @@
use std::collections::HashMap;
use std::sync::OnceLock;
use common_telemetry::debug;
use common_time::DateTime;
use datafusion_expr::Operator;
use datafusion_substrait::logical_plan::consumer::name_to_op;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
use datatypes::types::cast::CastOption;
@@ -28,6 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use substrait::df_logical_plan::consumer::name_to_op;
use crate::adapter::error::{Error, InvalidQuerySnafu, PlanSnafu};
use crate::expr::error::{
@@ -206,8 +207,9 @@ impl UnaryFunc {
from: arg_ty,
to: to.clone(),
}
})?;
Ok(res)
});
debug!("Cast to type: {to:?}, result: {:?}", res);
res
}
}
}

View File

@@ -16,7 +16,7 @@
use serde::{Deserialize, Serialize};
/// Global id's scope is in Current Worker, and is cross-dataflow
/// Global id's scope is in Current Flow node, and is cross-dataflow
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum GlobalId {
/// System namespace.

View File

@@ -206,12 +206,16 @@ impl AggregateFunc {
.fail();
}
};
let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype);
let input_type = if matches!(generic_fn, GenericFn::Count) {
ConcreteDataType::null_datatype()
} else {
arg_type.unwrap_or_else(ConcreteDataType::null_datatype)
};
rule.get(&(generic_fn, input_type.clone()))
.cloned()
.with_context(|| InvalidQuerySnafu {
reason: format!(
"No specialization found for binary function {:?} with input type {:?}",
"No specialization found for aggregate function {:?} with input type {:?}",
generic_fn, input_type
),
})

View File

@@ -18,12 +18,15 @@
mod join;
mod reduce;
use std::collections::BTreeSet;
use datatypes::arrow::ipc::Map;
use serde::{Deserialize, Serialize};
use crate::adapter::error::Error;
use crate::expr::{
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr,
AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
TypedExpr,
};
use crate::plan::join::JoinPlan;
pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
@@ -71,6 +74,7 @@ impl TypedPlan {
let mfp = MapFilterProject::new(input_arity)
.map(exprs)?
.project(input_arity..input_arity + output_arity)?;
let out_typ = self.typ.apply_mfp(&mfp, &expr_typs)?;
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {
Plan::Mfp {
@@ -85,8 +89,7 @@ impl TypedPlan {
mfp,
},
};
let typ = RelationType::new(expr_typs);
Ok(TypedPlan { typ, plan })
Ok(TypedPlan { typ: out_typ, plan })
}
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
@@ -182,3 +185,45 @@ pub enum Plan {
consolidate_output: bool,
},
}
impl Plan {
/// Find all the used collection in the plan
pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
match plan {
Plan::Get { id } => {
match id {
Id::Local(_) => (),
Id::Global(g) => {
used.insert(*g);
}
};
}
Plan::Let { value, body, .. } => {
recur_find_use(value, used);
recur_find_use(body, used);
}
Plan::Mfp { input, .. } => {
recur_find_use(input, used);
}
Plan::Reduce { input, .. } => {
recur_find_use(input, used);
}
Plan::Join { inputs, .. } => {
for input in inputs {
recur_find_use(input, used);
}
}
Plan::Union { inputs, .. } => {
for input in inputs {
recur_find_use(input, used);
}
}
_ => {}
}
}
let mut ret = Default::default();
recur_find_use(self, &mut ret);
ret
}
}

View File

@@ -27,7 +27,7 @@ use datatypes::types::cast;
use datatypes::types::cast::CastOption;
use datatypes::value::Value;
use itertools::Itertools;
pub(crate) use relation::{ColumnType, RelationDesc, RelationType};
pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff);
/// Row with key-value pair, timestamp and diff
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// broadcast channel capacity
pub const BROADCAST_CAP: usize = 1024;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
@@ -104,6 +107,11 @@ impl Row {
Self { inner: vec![] }
}
/// Returns true if the Row contains no elements.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Create a row from a vector of values
pub fn new(row: Vec<Value>) -> Self {
Self { inner: row }

View File

@@ -12,14 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::prelude::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use std::collections::{BTreeMap, HashMap};
use crate::adapter::error::{InvalidQuerySnafu, Result};
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use crate::adapter::error::{InvalidQuerySnafu, Result, UnexpectedSnafu};
use crate::expr::MapFilterProject;
/// a set of column indices that are "keys" for the collection.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct Key {
/// indicate whose column form key
pub column_indices: Vec<usize>,
@@ -28,9 +32,7 @@ pub struct Key {
impl Key {
/// create a new Key
pub fn new() -> Self {
Self {
column_indices: Vec::new(),
}
Default::default()
}
/// create a new Key from a vector of column indices
@@ -96,6 +98,71 @@ pub struct RelationType {
}
impl RelationType {
/// Trying to apply a mpf on current types, will return a new RelationType
/// with the new types, will also try to preserve keys&time index information
/// if the old key&time index columns are preserve in given mfp
///
/// i.e. old column of size 3, with a mfp's
///
/// project = `[2, 1]`,
///
/// the old key = `[1]`, old time index = `[2]`,
///
/// then new key=`[1]`, new time index=`[0]`
///
/// note that this function will remove empty keys like key=`[]` will be removed
pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Result<Self> {
let all_types = self
.column_types
.iter()
.chain(expr_typs.iter())
.cloned()
.collect_vec();
let mfp_out_types = mfp
.projection
.iter()
.map(|i| {
all_types.get(*i).cloned().with_context(|| UnexpectedSnafu {
reason: format!(
"MFP index out of bound, len is {}, but the index is {}",
all_types.len(),
*i
),
})
})
.try_collect()?;
let old_to_new_col = BTreeMap::from_iter(
mfp.projection
.clone()
.into_iter()
.enumerate()
.map(|(new, old)| (old, new)),
);
// since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
let keys = self
.keys
.iter()
.filter_map(|key| {
key.column_indices
.iter()
.map(|old| old_to_new_col.get(old).cloned())
.collect::<Option<Vec<_>>>()
// remove empty keys
.and_then(|v| if v.is_empty() { None } else { Some(v) })
.map(Key::from)
})
.collect_vec();
let time_index = self
.time_index
.and_then(|old| old_to_new_col.get(&old).cloned());
Ok(Self {
column_types: mfp_out_types,
keys,
time_index,
})
}
/// Constructs a `RelationType` representing the relation with no columns and
/// no keys.
pub fn empty() -> Self {

View File

@@ -14,12 +14,35 @@
//! Transform Substrait into execution plan
use std::collections::HashMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType as CDT;
use literal::{from_substrait_literal, from_substrait_type};
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::QueryEngine;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
/// rename it to `substrait_proto`
use substrait::{
substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor, SubstraitPlan,
};
use substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu};
use crate::adapter::error::{
Error, ExternalSnafu, InvalidQueryPlanSnafu, InvalidQueryProstSnafu,
InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::FlownodeContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;
/// a simple macro to generate a not implemented error
macro_rules! not_impl_err {
($($arg:tt)*) => {
@@ -43,11 +66,6 @@ mod expr;
mod literal;
mod plan;
use literal::{from_substrait_literal, from_substrait_type};
use snafu::OptionExt;
use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration;
/// In Substrait, a function can be define by an u32 anchor, and the anchor can be mapped to a name
///
/// So in substrait plan, a ref to a function can be a single u32 anchor instead of a full name in string
@@ -79,38 +97,34 @@ impl FunctionExtensions {
}
}
/// A context that holds the information of the dataflow
pub struct DataflowContext {
/// `id` refer to any source table in the dataflow, and `name` is the name of the table
/// which is a `Vec<String>` in substrait
id_to_name: HashMap<GlobalId, Vec<String>>,
/// see `id_to_name`
name_to_id: HashMap<Vec<String>, GlobalId>,
/// the schema of the table
schema: HashMap<GlobalId, RelationType>,
}
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
pub async fn sql_to_flow_plan(
ctx: &mut FlownodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let query_ctx = ctx.query_context.clone().ok_or_else(|| {
UnexpectedSnafu {
reason: "Query context is missing",
}
.build()
})?;
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?;
let plan = engine
.planner()
.plan(stmt, query_ctx)
.await
.context(InvalidQueryPlanSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
let sub_plan = DFLogicalSubstraitConvertor {}
.to_sub_plan(&plan)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
impl DataflowContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &Vec<String>) -> Result<(GlobalId, RelationType), Error> {
let id = self
.name_to_id
.get(name)
.copied()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?;
Ok(flow_plan)
}
#[cfg(test)]
@@ -124,22 +138,29 @@ mod test {
use query::plan::LogicalPlan;
use query::QueryEngine;
use session::context::QueryContext;
use substrait::substrait_proto::proto;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use substrait_proto::proto;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> DataflowContext {
pub fn create_test_ctx() -> FlownodeContext {
let gid = GlobalId::User(0);
let name = vec!["numbers".to_string()];
let name = [
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
DataflowContext {
id_to_name: HashMap::from([(gid, name.clone())]),
name_to_id: HashMap::from([(name.clone(), gid)]),
let mut tri_map = IdToNameMap::new();
tri_map.insert(Some(name.clone()), Some(0), gid);
FlownodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
}

View File

@@ -16,11 +16,6 @@ use std::collections::HashMap;
use common_decimal::Decimal128;
use common_time::{Date, Timestamp};
use datafusion_substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
};
use datatypes::arrow::compute::kernels::window;
use datatypes::arrow::ipc::Binary;
use datatypes::data_type::ConcreteDataType as CDT;
@@ -28,21 +23,26 @@ use datatypes::value::Value;
use hydroflow::futures::future::Map;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use substrait::substrait_proto::proto::aggregate_function::AggregationInvocation;
use substrait::substrait_proto::proto::aggregate_rel::{Grouping, Measure};
use substrait::substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait::substrait_proto::proto::expression::literal::LiteralType;
use substrait::substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
use substrait::substrait_proto::proto::expression::{
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
};
use substrait_proto::proto::aggregate_function::AggregationInvocation;
use substrait_proto::proto::aggregate_rel::{Grouping, Measure};
use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
use substrait_proto::proto::expression::{
IfThen, Literal, MaskExpression, RexType, ScalarFunction,
};
use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use substrait::substrait_proto::proto::function_argument::ArgType;
use substrait::substrait_proto::proto::r#type::Kind;
use substrait::substrait_proto::proto::read_rel::ReadType;
use substrait::substrait_proto::proto::rel::RelType;
use substrait::substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel};
use substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use substrait_proto::proto::function_argument::ArgType;
use substrait_proto::proto::r#type::Kind;
use substrait_proto::proto::read_rel::ReadType;
use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel};
use crate::adapter::error::{
DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu,
@@ -54,11 +54,11 @@ use crate::expr::{
};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
impl TypedExpr {
fn from_substrait_agg_grouping(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
groupings: &[Grouping],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -84,7 +84,7 @@ impl TypedExpr {
impl AggregateExpr {
fn from_substrait_agg_measures(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
measures: &[Measure],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -218,7 +218,7 @@ impl KeyValPlan {
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
pub fn from_substrait_agg_rel(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
agg: &proto::AggregateRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
@@ -228,7 +228,7 @@ impl TypedPlan {
return not_impl_err!("Aggregate without an input is not supported");
};
let group_expr =
let group_exprs =
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?;
let mut aggr_exprs =
@@ -236,14 +236,14 @@ impl TypedPlan {
let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
&mut aggr_exprs,
&group_expr,
&group_exprs,
input.typ.column_types.len(),
)?;
let output_type = {
let mut output_types = Vec::new();
// first append group_expr as key, then aggr_expr as value
for expr in &group_expr {
for expr in &group_exprs {
output_types.push(expr.typ.clone());
}
@@ -252,7 +252,8 @@ impl TypedPlan {
aggr.func.signature().output.clone(),
));
}
RelationType::new(output_types)
// TODO(discord9): try best to get time
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
};
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
@@ -365,8 +366,8 @@ mod test {
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), true),
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::uint32_datatype(), true), // col sum(number)
ColumnType::new(CDT::uint32_datatype(), false), // col number
]),
plan: Plan::Mfp {
input: Box::new(Plan::Reduce {

View File

@@ -17,11 +17,11 @@
use datatypes::data_type::ConcreteDataType as CDT;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use substrait::substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait::substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
use substrait::substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction};
use substrait::substrait_proto::proto::function_argument::ArgType;
use substrait::substrait_proto::proto::Expression;
use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction};
use substrait_proto::proto::function_argument::ArgType;
use substrait_proto::proto::Expression;
use crate::adapter::error::{
DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu,
@@ -31,8 +31,7 @@ use crate::expr::{
};
use crate::repr::{ColumnType, RelationType};
use crate::transform::literal::{from_substrait_literal, from_substrait_type};
use crate::transform::FunctionExtensions;
use crate::transform::{substrait_proto, FunctionExtensions};
// TODO: found proper place for this
/// ref to `arrow_schema::datatype` for type name
fn typename_to_cdt(name: &str) -> CDT {

View File

@@ -14,18 +14,19 @@
use common_decimal::Decimal128;
use common_time::{Date, Timestamp};
use datafusion_substrait::variation_const::{
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
};
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use substrait::substrait_proto::proto::expression::literal::LiteralType;
use substrait::substrait_proto::proto::expression::Literal;
use substrait::substrait_proto::proto::r#type::Kind;
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::Literal;
use substrait_proto::proto::r#type::Kind;
use crate::adapter::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::transform::substrait_proto;
/// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null)
pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Error> {
@@ -109,9 +110,7 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
}
/// convert a Substrait type into a ConcreteDataType
pub fn from_substrait_type(
null_type: &substrait::substrait_proto::proto::Type,
) -> Result<CDT, Error> {
pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<CDT, Error> {
if let Some(kind) = &null_type.kind {
match kind {
Kind::Bool(_) => Ok(CDT::boolean_datatype()),

View File

@@ -14,21 +14,23 @@
use itertools::Itertools;
use snafu::OptionExt;
use substrait::substrait_proto::proto::expression::MaskExpression;
use substrait::substrait_proto::proto::read_rel::ReadType;
use substrait::substrait_proto::proto::rel::RelType;
use substrait::substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
use substrait_proto::proto::expression::MaskExpression;
use substrait_proto::proto::read_rel::ReadType;
use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel};
use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::adapter::error::{
Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu,
};
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
impl TypedPlan {
/// Convert Substrait Plan into Flow's TypedPlan
pub fn from_substrait_plan(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
plan: &SubPlan,
) -> Result<TypedPlan, Error> {
// Register function extension
@@ -62,7 +64,7 @@ impl TypedPlan {
/// Convert Substrait Rel into Flow's TypedPlan
/// TODO: SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut DataflowContext,
ctx: &mut FlownodeContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {
@@ -114,7 +116,30 @@ impl TypedPlan {
}
Some(RelType::Read(read)) => {
if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type {
let table_reference = nt.names.clone();
let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
reason: "Query context not found",
})?;
let table_reference = match nt.names.len() {
1 => [
query_ctx.current_catalog().to_string(),
query_ctx.current_schema().to_string(),
nt.names[0].clone(),
],
2 => [
query_ctx.current_catalog().to_string(),
nt.names[0].clone(),
nt.names[1].clone(),
],
3 => [
nt.names[0].clone(),
nt.names[1].clone(),
nt.names[2].clone(),
],
_ => InvalidQuerySnafu {
reason: "Expect table to have name",
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),