mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 21:10:38 +00:00
feat: working poc demo...ish
This commit is contained in:
@@ -31,6 +31,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::info;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use itertools::Itertools;
|
||||
@@ -41,7 +42,7 @@ use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContext;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{ConcreteDataType, RegionId};
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
|
||||
use tokio::task::LocalSet;
|
||||
@@ -202,7 +203,7 @@ impl FlownodeManager {
|
||||
// TODO(discord9): only run when new inputs arrive or scheduled to
|
||||
self.run_available().await;
|
||||
// TODO(discord9): error handling
|
||||
let _ = self.send_writeback_requests().await;
|
||||
self.send_writeback_requests().await.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
@@ -297,14 +298,17 @@ impl TableInfoSource {
|
||||
.map(|id| id.unwrap().table_id())
|
||||
}
|
||||
|
||||
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
|
||||
self.table_name_manager
|
||||
/// If the table havn't been created in database, the tableId returned would be null
|
||||
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
|
||||
let ret = self
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})
|
||||
.map(|id| id.unwrap().table_id())
|
||||
})?
|
||||
.map(|id| id.table_id());
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// query metasrv about the table name and table id
|
||||
@@ -378,26 +382,26 @@ impl TableInfoSource {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DiffRequest {
|
||||
Insert(Vec<Row>),
|
||||
Delete(Vec<Row>),
|
||||
Insert(Vec<(Row, repr::Timestamp)>),
|
||||
Delete(Vec<(Row, repr::Timestamp)>),
|
||||
}
|
||||
|
||||
/// iterate through the diff row and from from continuous diff row with same diff type
|
||||
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
|
||||
let mut reqs = Vec::new();
|
||||
for (row, _t, diff) in rows {
|
||||
for (row, ts, diff) in rows {
|
||||
let last = reqs.last_mut();
|
||||
match (last, diff) {
|
||||
(Some(DiffRequest::Insert(rows)), 1) => {
|
||||
rows.push(row);
|
||||
rows.push((row, ts));
|
||||
}
|
||||
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![row])),
|
||||
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
|
||||
(Some(DiffRequest::Delete(rows)), -1) => {
|
||||
rows.push(row);
|
||||
rows.push((row, ts));
|
||||
}
|
||||
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![row])),
|
||||
(None, 1) => reqs.push(DiffRequest::Insert(vec![row])),
|
||||
(None, -1) => reqs.push(DiffRequest::Delete(vec![row])),
|
||||
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
|
||||
(None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
|
||||
(None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -468,7 +472,7 @@ impl FlownodeManager {
|
||||
*/
|
||||
let primary_keys = vec![];
|
||||
|
||||
let schema = {
|
||||
let (schema_wout_ts, with_ts) = {
|
||||
let node_ctx = self.node_context.lock().await;
|
||||
let gid: GlobalId = node_ctx
|
||||
.table_repr
|
||||
@@ -483,32 +487,53 @@ impl FlownodeManager {
|
||||
})?
|
||||
.clone();
|
||||
|
||||
schema
|
||||
let ts_col = ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_time_index(true);
|
||||
|
||||
let wout_ts = schema
|
||||
.column_types
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
|
||||
})
|
||||
.collect_vec()
|
||||
.collect_vec();
|
||||
let mut with_ts = wout_ts.clone();
|
||||
with_ts.push(ts_col);
|
||||
(wout_ts, with_ts)
|
||||
};
|
||||
|
||||
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
|
||||
let proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?;
|
||||
|
||||
let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?;
|
||||
|
||||
info!(
|
||||
"Sending {} writeback requests to table {}",
|
||||
reqs.len(),
|
||||
table_name.join(".")
|
||||
);
|
||||
|
||||
for req in reqs {
|
||||
match req {
|
||||
DiffRequest::Insert(insert) => {
|
||||
let rows_proto: Vec<v1::Row> =
|
||||
insert.into_iter().map(|row| row.into()).collect::<Vec<_>>();
|
||||
let rows_proto: Vec<v1::Row> = insert
|
||||
.into_iter()
|
||||
.map(|(mut row, _ts)| {
|
||||
row.extend(Some(Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)));
|
||||
row.into()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let table_name = table_name.last().unwrap().clone();
|
||||
let req = RowInsertRequest {
|
||||
table_name,
|
||||
rows: Some(v1::Rows {
|
||||
schema: proto_schema.clone(),
|
||||
schema: proto_schema_with_ts.clone(),
|
||||
rows: rows_proto,
|
||||
}),
|
||||
};
|
||||
@@ -526,13 +551,21 @@ impl FlownodeManager {
|
||||
.with_context(|_| ExternalSnafu {})?;
|
||||
}
|
||||
DiffRequest::Delete(remove) => {
|
||||
let rows_proto: Vec<v1::Row> =
|
||||
remove.into_iter().map(|row| row.into()).collect::<Vec<_>>();
|
||||
info!("original remove rows={:?}", remove);
|
||||
let rows_proto: Vec<v1::Row> = remove
|
||||
.into_iter()
|
||||
.map(|(mut row, _ts)| {
|
||||
row.extend(Some(Value::from(
|
||||
common_time::Timestamp::new_millisecond(0),
|
||||
)));
|
||||
row.into()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let table_name = table_name.last().unwrap().clone();
|
||||
let req = RowDeleteRequest {
|
||||
table_name,
|
||||
rows: Some(v1::Rows {
|
||||
schema: proto_schema.clone(),
|
||||
schema: proto_schema_with_ts.clone(),
|
||||
rows: rows_proto,
|
||||
}),
|
||||
};
|
||||
@@ -632,7 +665,7 @@ impl FlownodeManager {
|
||||
node_ctx.query_context = Some(QueryContext::with("greptime", "public"));
|
||||
// construct a active dataflow state with it
|
||||
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
|
||||
|
||||
info!("Flow Plan is {:?}", flow_plan);
|
||||
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
|
||||
|
||||
// TODO(discord9): parse `expire_when`
|
||||
|
||||
@@ -145,6 +145,8 @@ fn mfp_subgraph(
|
||||
|
||||
/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
|
||||
/// return the output updates **And** possibly any number of errors that occurred during the evaluation
|
||||
///
|
||||
/// TODO(discord9): deal with primary key overwrite issue
|
||||
fn eval_mfp_core(
|
||||
input: impl IntoIterator<Item = DiffRow>,
|
||||
mfp_plan: &MfpPlan,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
|
||||
use common_telemetry::info;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use hydroflow::scheduled::graph_ext::GraphExt;
|
||||
@@ -378,7 +379,7 @@ fn reduce_accum_subgraph(
|
||||
|
||||
let mut all_updates = Vec::with_capacity(key_to_vals.len());
|
||||
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
|
||||
|
||||
let null_vals = (0..full_aggrs.len()).map(|_| Value::Null);
|
||||
// lock the arrange for write for the rest of function body
|
||||
// so to prevent wide race condition since we are going to update the arrangement by write after read
|
||||
// TODO(discord9): consider key-based lock
|
||||
@@ -394,7 +395,17 @@ fn reduce_accum_subgraph(
|
||||
None => continue,
|
||||
}
|
||||
};
|
||||
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
|
||||
let (accums, old_ts, _) = arrange.get(now, &key).unwrap_or_default();
|
||||
if !accums.is_empty() {
|
||||
info!("Get old_ts as: {old_ts}");
|
||||
// first explictly remove old key(because arrange use pk as only pk, but
|
||||
// database use pk+ts as true pk for dedup, so we need to manualy remove old values first)
|
||||
let mut del_key_val = key.clone();
|
||||
// adding null as values so the output length are correct
|
||||
del_key_val.extend(null_vals.clone());
|
||||
|
||||
all_outputs.push((del_key_val, now, -1));
|
||||
}
|
||||
let accums = accums.inner;
|
||||
|
||||
// deser accums from offsets
|
||||
|
||||
@@ -107,6 +107,11 @@ impl Row {
|
||||
Self { inner: vec![] }
|
||||
}
|
||||
|
||||
/// Returns true if the Row contains no elements.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.inner.is_empty()
|
||||
}
|
||||
|
||||
/// Create a row from a vector of values
|
||||
pub fn new(row: Vec<Value>) -> Self {
|
||||
Self { inner: row }
|
||||
|
||||
@@ -242,6 +242,7 @@ impl FrontendInvoker for Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
) -> common_frontend::error::Result<Output> {
|
||||
info!("Received row inserts: {:?}", requests);
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user