From a17a7f4e47bfe1b7159f687765da41fe819b3bd2 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 8 May 2024 17:43:08 +0800 Subject: [PATCH] feat: working poc demo...ish --- src/flow/src/adapter.rs | 85 +++++++++++++++++++-------- src/flow/src/compute/render/map.rs | 2 + src/flow/src/compute/render/reduce.rs | 15 ++++- src/flow/src/repr.rs | 5 ++ src/frontend/src/instance.rs | 1 + 5 files changed, 80 insertions(+), 28 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index afc204e39e..13e6135818 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -31,6 +31,7 @@ use common_meta::kv_backend::KvBackendRef; use common_runtime::JoinHandle; use common_telemetry::info; use datatypes::schema::ColumnSchema; +use datatypes::value::Value; use greptime_proto::v1; use hydroflow::scheduled::graph::Hydroflow; use itertools::Itertools; @@ -41,7 +42,7 @@ use serde::{Deserialize, Serialize}; use session::context::QueryContext; use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; use tokio::task::LocalSet; @@ -202,7 +203,7 @@ impl FlownodeManager { // TODO(discord9): only run when new inputs arrive or scheduled to self.run_available().await; // TODO(discord9): error handling - let _ = self.send_writeback_requests().await; + self.send_writeback_requests().await.unwrap(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } @@ -297,14 +298,17 @@ impl TableInfoSource { .map(|id| id.unwrap().table_id()) } - pub async fn get_table_id_from_name(&self, name: &TableName) -> Result { - self.table_name_manager + /// If the table havn't been created in database, the tableId returned would be null + pub async fn get_table_id_from_name(&self, name: &TableName) -> Result, Error> { + let ret = self + .table_name_manager .get(TableNameKey::new(&name[0], &name[1], &name[2])) .await .with_context(|_| TableNotFoundMetaSnafu { msg: format!("Table name = {:?}, couldn't found table id", name), - }) - .map(|id| id.unwrap().table_id()) + })? + .map(|id| id.table_id()); + Ok(ret) } /// query metasrv about the table name and table id @@ -378,26 +382,26 @@ impl TableInfoSource { #[derive(Debug)] pub enum DiffRequest { - Insert(Vec), - Delete(Vec), + Insert(Vec<(Row, repr::Timestamp)>), + Delete(Vec<(Row, repr::Timestamp)>), } /// iterate through the diff row and from from continuous diff row with same diff type pub fn diff_row_to_request(rows: Vec) -> Vec { let mut reqs = Vec::new(); - for (row, _t, diff) in rows { + for (row, ts, diff) in rows { let last = reqs.last_mut(); match (last, diff) { (Some(DiffRequest::Insert(rows)), 1) => { - rows.push(row); + rows.push((row, ts)); } - (Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![row])), + (Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), (Some(DiffRequest::Delete(rows)), -1) => { - rows.push(row); + rows.push((row, ts)); } - (Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![row])), - (None, 1) => reqs.push(DiffRequest::Insert(vec![row])), - (None, -1) => reqs.push(DiffRequest::Delete(vec![row])), + (Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), + (None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), + (None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), _ => {} } } @@ -468,7 +472,7 @@ impl FlownodeManager { */ let primary_keys = vec![]; - let schema = { + let (schema_wout_ts, with_ts) = { let node_ctx = self.node_context.lock().await; let gid: GlobalId = node_ctx .table_repr @@ -483,32 +487,53 @@ impl FlownodeManager { })? .clone(); - schema + let ts_col = ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + + let wout_ts = schema .column_types .into_iter() .enumerate() .map(|(idx, typ)| { ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) }) - .collect_vec() + .collect_vec(); + let mut with_ts = wout_ts.clone(); + with_ts.push(ts_col); + (wout_ts, with_ts) }; - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + let proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?; + + let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?; + info!( "Sending {} writeback requests to table {}", reqs.len(), table_name.join(".") ); + for req in reqs { match req { DiffRequest::Insert(insert) => { - let rows_proto: Vec = - insert.into_iter().map(|row| row.into()).collect::>(); + let rows_proto: Vec = insert + .into_iter() + .map(|(mut row, _ts)| { + row.extend(Some(Value::from( + common_time::Timestamp::new_millisecond(0), + ))); + row.into() + }) + .collect::>(); let table_name = table_name.last().unwrap().clone(); let req = RowInsertRequest { table_name, rows: Some(v1::Rows { - schema: proto_schema.clone(), + schema: proto_schema_with_ts.clone(), rows: rows_proto, }), }; @@ -526,13 +551,21 @@ impl FlownodeManager { .with_context(|_| ExternalSnafu {})?; } DiffRequest::Delete(remove) => { - let rows_proto: Vec = - remove.into_iter().map(|row| row.into()).collect::>(); + info!("original remove rows={:?}", remove); + let rows_proto: Vec = remove + .into_iter() + .map(|(mut row, _ts)| { + row.extend(Some(Value::from( + common_time::Timestamp::new_millisecond(0), + ))); + row.into() + }) + .collect::>(); let table_name = table_name.last().unwrap().clone(); let req = RowDeleteRequest { table_name, rows: Some(v1::Rows { - schema: proto_schema.clone(), + schema: proto_schema_with_ts.clone(), rows: rows_proto, }), }; @@ -632,7 +665,7 @@ impl FlownodeManager { node_ctx.query_context = Some(QueryContext::with("greptime", "public")); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; - + info!("Flow Plan is {:?}", flow_plan); node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; // TODO(discord9): parse `expire_when` diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 4264693391..8be2b570ed 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -145,6 +145,8 @@ fn mfp_subgraph( /// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator, /// return the output updates **And** possibly any number of errors that occurred during the evaluation +/// +/// TODO(discord9): deal with primary key overwrite issue fn eval_mfp_core( input: impl IntoIterator, mfp_plan: &MfpPlan, diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index a024021ada..f0003fe258 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::ops::Range; +use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::value::{ListValue, Value}; use hydroflow::scheduled::graph_ext::GraphExt; @@ -378,7 +379,7 @@ fn reduce_accum_subgraph( let mut all_updates = Vec::with_capacity(key_to_vals.len()); let mut all_outputs = Vec::with_capacity(key_to_vals.len()); - + let null_vals = (0..full_aggrs.len()).map(|_| Value::Null); // lock the arrange for write for the rest of function body // so to prevent wide race condition since we are going to update the arrangement by write after read // TODO(discord9): consider key-based lock @@ -394,7 +395,17 @@ fn reduce_accum_subgraph( None => continue, } }; - let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); + let (accums, old_ts, _) = arrange.get(now, &key).unwrap_or_default(); + if !accums.is_empty() { + info!("Get old_ts as: {old_ts}"); + // first explictly remove old key(because arrange use pk as only pk, but + // database use pk+ts as true pk for dedup, so we need to manualy remove old values first) + let mut del_key_val = key.clone(); + // adding null as values so the output length are correct + del_key_val.extend(null_vals.clone()); + + all_outputs.push((del_key_val, now, -1)); + } let accums = accums.inner; // deser accums from offsets diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 94f2c15497..1269b1f076 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -107,6 +107,11 @@ impl Row { Self { inner: vec![] } } + /// Returns true if the Row contains no elements. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + /// Create a row from a vector of values pub fn new(row: Vec) -> Self { Self { inner: row } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 096c3de247..4cd23e64f7 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -242,6 +242,7 @@ impl FrontendInvoker for Instance { requests: RowInsertRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { + info!("Received row inserts: {:?}", requests); self.inserter .handle_row_inserts(requests, ctx, &self.statement_executor) .await