feat: dataflow adapter

This commit is contained in:
discord9
2024-04-26 15:20:24 +08:00
parent 57e3912aca
commit 1067d3453d
7 changed files with 394 additions and 67 deletions

34
Cargo.lock generated
View File

@@ -2459,6 +2459,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling"
version = "0.14.4"
@@ -3521,6 +3531,7 @@ dependencies = [
"enum_dispatch",
"hydroflow",
"itertools 0.10.5",
"minstant",
"num-traits",
"prost 0.12.4",
"query",
@@ -3530,6 +3541,7 @@ dependencies = [
"session",
"smallvec",
"snafu",
"store-api",
"strum 0.25.0",
"substrait 0.7.2",
"table",
@@ -5398,6 +5410,16 @@ dependencies = [
"adler",
]
[[package]]
name = "minstant"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db"
dependencies = [
"ctor",
"web-time 1.1.0",
]
[[package]]
name = "mio"
version = "0.8.11"
@@ -11012,7 +11034,7 @@ dependencies = [
"tracing-core",
"tracing-log 0.2.0",
"tracing-subscriber",
"web-time",
"web-time 0.2.4",
]
[[package]]
@@ -11770,6 +11792,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webbrowser"
version = "0.8.15"

View File

@@ -24,6 +24,7 @@ enum_dispatch = "0.3"
common-meta.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
minstant = "0.1.7"
num-traits = "0.2"
prost.workspace = true
query.workspace = true
@@ -32,6 +33,7 @@ servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
substrait.workspace = true
table.workspace = true

View File

@@ -14,25 +14,29 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::table_name::TableNameManager;
use hydroflow::scheduled::graph::Hydroflow;
use minstant::Anchor;
use prost::bytes::buf;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast;
use tokio::task::LocalSet;
use crate::adapter::error::TableNotFoundSnafu;
use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{DiffRow, RelationType};
use crate::repr::{self, DiffRow, RelationType, Row, BOARDCAST_CAP};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
@@ -43,38 +47,45 @@ use error::Error;
pub type TaskId = u64;
pub type TableName = Vec<String>;
/// broadcast channel capacity, set to a arbitrary value
pub const BOARDCAST_CAP: usize = 1024;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
pub struct FlowNodeManager<'subgraph> {
/// The state of all tasks in the flow node
/// This is also the only field that's not `Send` in the struct
pub task_states: BTreeMap<TaskId, ActiveDataflowState<'subgraph>>,
pub local_set: LocalSet,
// TODO: catalog/tableinfo manager for query schema and translate sql to plan
query_engine: Arc<dyn QueryEngine>,
srv_map: TableIdNameMapper,
/// contains mapping from table name to global id, and table schema
worker_context: FlowWorkerContext,
tick_manager: FlowTickManager,
}
#[test]
fn check() {
fn is_send<T: Send>() {}
is_send::<FlowWorkerContext>();
}
/// mapping of table name <-> table id should be query from tableinfo manager
struct TableNameIdMapping {
pub struct TableIdNameMapper {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
/// for query `TableName -> TableId` mapping
table_name_manager: TableNameManager,
// a in memory cache, will be invalid if necessary
}
impl TableNameIdMapping {
pub async fn get_table_id(&self, table_name: TableName) -> Result<TableId, Error> {
todo!()
}
pub async fn get_table_name(&self, table_id: TableId) -> Result<TableName, Error> {
todo!()
impl TableIdNameMapper {
/// query metasrv about the table name and table id
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
self.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})
.map(|name| name.unwrap().table_name())
.map(|name| vec![name.catalog_name, name.schema_name, name.table_name])
}
}
@@ -110,9 +121,106 @@ impl<'subgraph> ActiveDataflowState<'subgraph> {
local_scope: Default::default(),
}
}
pub fn set_current_ts(&mut self, ts: repr::Timestamp) {
self.state.set_current_ts(ts);
}
/// Run all available subgraph
///
/// return true if any subgraph actually executed
pub fn run_available(&mut self) -> bool {
self.state.run_available_with_schedule(&mut self.df)
}
}
pub enum DiffRequest {
Insert(Vec<Row>),
Delete(Vec<Row>),
}
/// iterate through the diff row and from from continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
for (row, _t, diff) in rows {
let last = reqs.last_mut();
match (last, diff) {
(Some(DiffRequest::Insert(rows)), 1) => {
rows.push(row);
}
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![row])),
(Some(DiffRequest::Delete(rows)), -1) => {
rows.push(row);
}
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![row])),
_ => (),
}
}
reqs
}
impl<'s> FlowNodeManager<'s> {
/// blocking run the dataflow's grpc service & execution in a `LocalSet`
///
/// the idomic way to run the dataflow
/// is spawn a new thread, then create a flow node manager, and run the dataflow
/// using this method
pub fn run_dataflow(self, rt: tokio::runtime::Runtime, local_set: LocalSet) {
local_set.block_on(&rt, async move {
// TODO(discord9): might place grpc service on another thread?
let zelf = self;
todo!("main loop");
});
}
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
/// TODO(discord9): add flag for subgraph that have input since last run
pub fn run_available(&mut self) {
let now = self.tick_manager.tick();
for (task_id, task_state) in self.task_states.iter_mut() {
task_state.set_current_ts(now);
task_state.run_available();
// if there is work done, check for new data in the sink
while task_state.run_available() {
let sink_table_name = self.worker_context.task_to_sink.get(task_id).unwrap();
let sink_buf = self
.worker_context
.sink_buffer
.get_mut(sink_table_name)
.unwrap();
let sink_recv = self
.worker_context
.sink_receiver
.get_mut(sink_table_name)
.unwrap();
// TODO(discord9): handle lagging eror
while let Ok(row) = sink_recv.1.try_recv() {
sink_buf.push_back(row);
}
}
}
}
/// Take everything in sink buffer and construct write request which should be send to the frontend
pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec<DiffRow>)> {
std::mem::take(&mut self.worker_context.sink_buffer)
.into_iter()
.map(|(name, buf)| (name, buf.into_iter().collect()))
.collect()
}
/// send write request to related source sender
pub async fn handle_write_request(
&mut self,
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
self.worker_context.send(table_id, rows)
}
/// Return task id if a new task is created, otherwise return None
///
/// steps to create task:
@@ -125,7 +233,7 @@ impl<'s> FlowNodeManager<'s> {
&mut self,
task_id: TaskId,
sink_table_id: TableId,
source_table_ids: SmallVec<[TableId; 2]>,
source_table_ids: &[TableId],
create_if_not_exist: bool,
expire_when: Option<String>,
comment: Option<String>,
@@ -138,6 +246,15 @@ impl<'s> FlowNodeManager<'s> {
return Ok(None);
}
}
// assign global id to source and sink table
for source in source_table_ids
.iter()
.chain(std::iter::once(&sink_table_id))
{
self.worker_context
.assign_global_id_to_table(&self.srv_map, *source)
.await;
}
// construct a active dataflow state with it
let flow_plan =
@@ -145,8 +262,7 @@ impl<'s> FlowNodeManager<'s> {
// TODO(discord9): parse `expire_when`
let _sink_gid =
self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?;
self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?;
Ok(Some(task_id))
}
@@ -158,21 +274,30 @@ impl<'s> FlowNodeManager<'s> {
task_id: TaskId,
plan: TypedPlan,
sink_table_id: TableId,
source_table_ids: SmallVec<[TableId; 2]>,
) -> Result<GlobalId, Error> {
source_table_ids: &[TableId],
) -> Result<(), Error> {
let mut cur_task_state = ActiveDataflowState::<'s>::default();
// 1. render sources
let source_global_ids = source_table_ids
.iter()
.map(|id| self.worker_context.assign_global_id_to_table(*id))
.collect::<Vec<_>>();
let sink_global_id = self.worker_context.assign_global_id_to_table(sink_table_id);
{
let sink_global_id = self
.worker_context
.table_repr
.get_by_table_id(&sink_table_id)
.with_context(|| TableNotFoundSnafu {
name: sink_table_id.to_string(),
})?
.1;
let mut ctx = cur_task_state.new_ctx(sink_global_id);
// rendering source now that we have the context
for source in source_global_ids {
for source in source_table_ids {
let source = self
.worker_context
.table_repr
.get_by_table_id(source)
.with_context(|| TableNotFoundSnafu {
name: source.to_string(),
})?
.1;
let source_sender = self.worker_context.get_source_by_global_id(&source)?;
let recv = source_sender.subscribe();
let bundle = ctx.render_source(recv)?;
@@ -184,15 +309,18 @@ impl<'s> FlowNodeManager<'s> {
ctx.render_sink(rendered_dataflow, sink_sender);
}
// what is wrong with lifetime? ctx is short live than cur_task_state
self.task_states.insert(task_id, cur_task_state);
Ok(sink_global_id)
Ok(())
}
}
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlowWorkerContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<TaskId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub task_to_sink: BTreeMap<TaskId, TableName>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
@@ -200,27 +328,89 @@ pub struct FlowWorkerContext {
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver:
BTreeMap<TableName, (broadcast::Sender<DiffRow>, broadcast::Receiver<DiffRow>)>,
/// `id` refer to any source table in the dataflow, and `name` is the name of the table
/// which is a `Vec<String>` in substrait
pub id_to_name: HashMap<GlobalId, Vec<String>>,
/// see `id_to_name`
pub name_to_id: HashMap<Vec<String>, GlobalId>,
/// store sink buffer for each sink table, used for sending data back to the frontend
pub sink_buffer: BTreeMap<TableName, VecDeque<DiffRow>>,
/// the schema of the table
pub schema: HashMap<GlobalId, RelationType>,
/// All the tables that have been registered in the worker
pub table_repr: TriMap,
}
impl FlowWorkerContext {
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<(), Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
for row in rows {
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
Ok(())
}
}
impl FlowWorkerContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corrseponding broadcast sender/receiver
fn register_task_src_sink(
&mut self,
task_id: TaskId,
source_table_ids: &[TableId],
sink_table_id: TableId,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
.insert(task_id);
}
let sink_table_name = self.table_repr.get_by_table_id(&sink_table_id).unwrap().0;
self.task_to_sink.insert(task_id, sink_table_name);
}
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.insert(table_id, broadcast::channel(BOARDCAST_CAP).0);
.entry(table_id)
.or_insert_with(|| broadcast::channel(BOARDCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(|| broadcast::channel(BOARDCAST_CAP));
}
pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
let table_id = self.get_table_name_id(id)?.1;
let table_id = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.1;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
@@ -232,7 +422,13 @@ impl FlowWorkerContext {
&self,
id: &GlobalId,
) -> Result<broadcast::Sender<DiffRow>, Error> {
let table_name = self.get_table_name_id(id)?.0;
let table_name = self
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0;
self.sink_receiver
.get(&table_name)
.map(|(s, r)| s.clone())
@@ -248,9 +444,9 @@ impl FlowWorkerContext {
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &Vec<String>) -> Result<(GlobalId, RelationType), Error> {
let id = self
.name_to_id
.get(name)
.copied()
.table_repr
.get_by_name(name)
.map(|(_tid, gid)| gid)
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
@@ -264,25 +460,87 @@ impl FlowWorkerContext {
Ok((id, schema))
}
pub fn table_from_table_id(&self, id: &GlobalId) -> Result<(GlobalId, RelationType), Error> {
todo!()
}
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
/// merely creating a mapping from table id to global id
pub fn assign_global_id_to_table(&self, table_id: TableId) -> GlobalId {
todo!()
}
/// get table name by global id
pub fn get_table_name_id(&self, id: &GlobalId) -> Result<(TableName, TableId), Error> {
todo!()
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableIdNameMapper,
table_id: TableId,
) -> GlobalId {
if let Some((_name, gid)) = self.table_repr.get_by_table_id(&table_id) {
gid
} else {
let global_id = self.new_global_id();
let table_name = srv_map.get_table_name(&table_id).await.unwrap();
self.table_repr.insert(table_name, table_id, global_id);
global_id
}
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
todo!()
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)
}
}
/// A tri-directional map that maps table name, table id, and global id
#[derive(Default)]
pub struct TriMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (TableName, TableId)>,
}
impl TriMap {
pub fn new() -> Self {
Default::default()
}
pub fn insert(&mut self, name: TableName, id: TableId, global_id: GlobalId) {
self.name_to_global_id.insert(name.clone(), global_id);
self.id_to_global_id.insert(id, global_id);
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(TableId, GlobalId)> {
self.name_to_global_id.get(name).and_then(|global_id| {
self.global_id_to_name_id
.get(global_id)
.map(|(_name, id)| (*id, *global_id))
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(TableName, GlobalId)> {
self.id_to_global_id.get(id).and_then(|global_id| {
self.global_id_to_name_id
.get(global_id)
.map(|(name, _id)| (name.clone(), *global_id))
})
}
pub fn get_by_global_id(&self, global_id: &GlobalId) -> Option<(TableName, TableId)> {
self.global_id_to_name_id
.get(global_id)
.map(|(name, id)| (name.clone(), *id))
}
}
/// FlowTickManager is a manager for flow tick
pub struct FlowTickManager {
anchor: Anchor,
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {
anchor: Anchor::new(),
}
}
/// Return the current timestamp in milliseconds
pub fn tick(&self) -> repr::Timestamp {
(minstant::Instant::now().as_unix_nanos(&self.anchor) / 1_000_000) as repr::Timestamp
}
}

View File

@@ -42,6 +42,13 @@ pub enum Error {
#[snafu(display("Table not found: {name}"))]
TableNotFound { name: String, location: Location },
#[snafu(display("Table not found: {msg}, meta error: {source}"))]
TableNotFoundMeta {
source: common_meta::error::Error,
msg: String,
location: Location,
},
#[snafu(display("Table already exist: {name}"))]
TableAlreadyExist { name: String, location: Location },
@@ -110,7 +117,9 @@ impl ErrorExt for Error {
StatusCode::Internal
}
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } => StatusCode::TableNotFound,
Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => {
StatusCode::TableNotFound
}
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }

View File

@@ -25,7 +25,7 @@ use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::repr::{DiffRow, Row};
use crate::repr::{DiffRow, Row, BOARDCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
@@ -84,14 +84,36 @@ impl<'referred, 'df> Context<'referred, 'df> {
arranged: _,
} = bundle;
let mut buf = VecDeque::with_capacity(1000);
self.df
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let sink = self
.df
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
while let Some(row) = buf.pop_front() {
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
if sender.len() >= BOARDCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BOARDCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
}
}
// if buffer is not empty, schedule the next run at next tick
// so the buffer can be drained as soon as possible
if !buf.is_empty() {
inner_schd.schedule_at(*now.borrow() + 1);
}
});
schd.set_cur_subgraph(sink);
}
}

View File

@@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff);
/// Row with key-value pair, timestamp and diff
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// broadcast channel capacity, set to a arbitrary value
pub const BOARDCAST_CAP: usize = 1024;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`

View File

@@ -137,17 +137,18 @@ mod test {
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
use crate::adapter::TriMap;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> FlowWorkerContext {
let gid = GlobalId::User(0);
let name = vec!["numbers".to_string()];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
let mut tri_map = TriMap::new();
tri_map.insert(name.clone(), 0, gid);
FlowWorkerContext {
id_to_name: HashMap::from([(gid, name.clone())]),
name_to_id: HashMap::from([(name.clone(), gid)]),
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
..Default::default()
}
}