mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
Compare commits
13 Commits
feat/impl-
...
flow_fix_f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d815bdf770 | ||
|
|
0b324563ac | ||
|
|
ffecb6882e | ||
|
|
c185242997 | ||
|
|
9fda415b0d | ||
|
|
5eaf9816b9 | ||
|
|
6684f8dce3 | ||
|
|
e8660a6f7e | ||
|
|
6659f3cc62 | ||
|
|
d218d65361 | ||
|
|
8f40ba42c1 | ||
|
|
d1ce436442 | ||
|
|
e580ba63ec |
@@ -26,6 +26,7 @@ use common_base::Plugins;
|
|||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_frontend::handler::FrontendInvoker;
|
use common_frontend::handler::FrontendInvoker;
|
||||||
use common_meta::key::TableMetadataManagerRef;
|
use common_meta::key::TableMetadataManagerRef;
|
||||||
|
use common_query::prelude::GREPTIME_TIMESTAMP;
|
||||||
use common_runtime::JoinHandle;
|
use common_runtime::JoinHandle;
|
||||||
use common_telemetry::{debug, info};
|
use common_telemetry::{debug, info};
|
||||||
use datatypes::schema::ColumnSchema;
|
use datatypes::schema::ColumnSchema;
|
||||||
@@ -281,7 +282,7 @@ impl FlownodeManager {
|
|||||||
let schema = meta.schema.column_schemas;
|
let schema = meta.schema.column_schemas;
|
||||||
let is_auto_create = schema
|
let is_auto_create = schema
|
||||||
.last()
|
.last()
|
||||||
.map(|s| s.name == "__ts_placeholder")
|
.map(|s| s.name == GREPTIME_TIMESTAMP)
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
(primary_keys, schema, is_auto_create)
|
(primary_keys, schema, is_auto_create)
|
||||||
} else {
|
} else {
|
||||||
@@ -302,6 +303,7 @@ impl FlownodeManager {
|
|||||||
.clone();
|
.clone();
|
||||||
// TODO(discord9): use default key from schema
|
// TODO(discord9): use default key from schema
|
||||||
let primary_keys = schema
|
let primary_keys = schema
|
||||||
|
.typ()
|
||||||
.keys
|
.keys
|
||||||
.first()
|
.first()
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
@@ -318,18 +320,25 @@ impl FlownodeManager {
|
|||||||
);
|
);
|
||||||
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
|
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
|
||||||
let ts_col = ColumnSchema::new(
|
let ts_col = ColumnSchema::new(
|
||||||
"__ts_placeholder",
|
GREPTIME_TIMESTAMP,
|
||||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
.with_time_index(true);
|
.with_time_index(true);
|
||||||
|
|
||||||
let wout_ts = schema
|
let wout_ts = schema
|
||||||
|
.typ()
|
||||||
.column_types
|
.column_types
|
||||||
|
.clone()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, typ)| {
|
.map(|(idx, typ)| {
|
||||||
ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable)
|
let name = schema
|
||||||
|
.names
|
||||||
|
.get(idx)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or(format!("Col_{}", idx));
|
||||||
|
ColumnSchema::new(name, typ.scalar_type, typ.nullable)
|
||||||
})
|
})
|
||||||
.collect_vec();
|
.collect_vec();
|
||||||
|
|
||||||
@@ -504,26 +513,26 @@ impl FlownodeManager {
|
|||||||
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
|
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
|
||||||
/// TODO(discord9): add flag for subgraph that have input since last run
|
/// TODO(discord9): add flag for subgraph that have input since last run
|
||||||
pub async fn run_available(&self) -> Result<(), Error> {
|
pub async fn run_available(&self) -> Result<(), Error> {
|
||||||
let now = self.tick_manager.tick();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let now = self.tick_manager.tick();
|
||||||
for worker in self.worker_handles.iter() {
|
for worker in self.worker_handles.iter() {
|
||||||
// TODO(discord9): consider how to handle error in individual worker
|
// TODO(discord9): consider how to handle error in individual worker
|
||||||
worker.lock().await.run_available(now).await.unwrap();
|
worker.lock().await.run_available(now).await.unwrap();
|
||||||
}
|
}
|
||||||
// first check how many inputs were sent
|
// first check how many inputs were sent
|
||||||
let send_cnt = match self.node_context.lock().await.flush_all_sender() {
|
match self.node_context.lock().await.flush_all_sender() {
|
||||||
Ok(cnt) => cnt,
|
Ok(_) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
common_telemetry::error!("Flush send buf errors: {:?}", err);
|
common_telemetry::error!("Flush send buf errors: {:?}", err);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// if no inputs
|
// if no thing in send buf then break
|
||||||
if send_cnt == 0 {
|
let buf_len = self.node_context.lock().await.get_send_buf_size();
|
||||||
|
if buf_len == 0 {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
debug!("FlownodeManager::run_available: send_cnt={}", send_cnt);
|
debug!("Send buf len = {}", buf_len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -543,6 +552,8 @@ impl FlownodeManager {
|
|||||||
);
|
);
|
||||||
let table_id = region_id.table_id();
|
let table_id = region_id.table_id();
|
||||||
self.node_context.lock().await.send(table_id, rows)?;
|
self.node_context.lock().await.send(table_id, rows)?;
|
||||||
|
// TODO(discord9): put it in a background task?
|
||||||
|
self.run_available().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -653,21 +664,20 @@ impl FlownodeManager {
|
|||||||
///
|
///
|
||||||
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
|
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
|
||||||
/// TSO coord mess
|
/// TSO coord mess
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FlowTickManager {
|
pub struct FlowTickManager {
|
||||||
start: Instant,
|
start: Instant,
|
||||||
}
|
start_timestamp: repr::Timestamp,
|
||||||
|
|
||||||
impl std::fmt::Debug for FlowTickManager {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
f.debug_struct("FlowTickManager").finish()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowTickManager {
|
impl FlowTickManager {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
FlowTickManager {
|
FlowTickManager {
|
||||||
start: Instant::now(),
|
start: Instant::now(),
|
||||||
|
start_timestamp: SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_millis() as repr::Timestamp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -677,6 +687,6 @@ impl FlowTickManager {
|
|||||||
pub fn tick(&self) -> repr::Timestamp {
|
pub fn tick(&self) -> repr::Timestamp {
|
||||||
let current = Instant::now();
|
let current = Instant::now();
|
||||||
let since_the_epoch = current - self.start;
|
let since_the_epoch = current - self.start;
|
||||||
since_the_epoch.as_millis() as repr::Timestamp
|
since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,13 +14,17 @@
|
|||||||
|
|
||||||
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
|
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
|
use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse};
|
||||||
use api::v1::region::InsertRequests;
|
use api::v1::region::InsertRequests;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
|
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
|
||||||
use common_meta::node_manager::Flownode;
|
use common_meta::node_manager::Flownode;
|
||||||
|
use common_telemetry::debug;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::adapter::FlownodeManager;
|
use crate::adapter::FlownodeManager;
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
@@ -101,12 +105,57 @@ impl Flownode for FlownodeManager {
|
|||||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||||
for write_request in request.requests {
|
for write_request in request.requests {
|
||||||
let region_id = write_request.region_id;
|
let region_id = write_request.region_id;
|
||||||
let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]);
|
let table_id = RegionId::from(region_id).table_id();
|
||||||
|
|
||||||
|
let (insert_schema, rows_proto) = write_request
|
||||||
|
.rows
|
||||||
|
.map(|r| (r.schema, r.rows))
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
// TODO(discord9): reconsider time assignment mechanism
|
// TODO(discord9): reconsider time assignment mechanism
|
||||||
let now = self.tick_manager.tick();
|
let now = self.tick_manager.tick();
|
||||||
|
|
||||||
|
let fetch_order = {
|
||||||
|
let ctx = self.node_context.lock().await;
|
||||||
|
let table_col_names = ctx
|
||||||
|
.table_repr
|
||||||
|
.get_by_table_id(&table_id)
|
||||||
|
.map(|r| r.1)
|
||||||
|
.and_then(|id| ctx.schema.get(&id))
|
||||||
|
.map(|desc| &desc.names)
|
||||||
|
.context(UnexpectedSnafu {
|
||||||
|
err_msg: format!("Table not found: {}", table_id),
|
||||||
|
})?;
|
||||||
|
let name_to_col = HashMap::<_, _>::from_iter(
|
||||||
|
insert_schema
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, name)| (&name.column_name, i)),
|
||||||
|
);
|
||||||
|
let fetch_order: Vec<usize> = table_col_names
|
||||||
|
.iter()
|
||||||
|
.map(|names| {
|
||||||
|
name_to_col.get(names).copied().context(UnexpectedSnafu {
|
||||||
|
err_msg: format!("Column not found: {}", names),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.try_collect()?;
|
||||||
|
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
|
||||||
|
debug!("Reordering columns: {:?}", fetch_order)
|
||||||
|
}
|
||||||
|
fetch_order
|
||||||
|
};
|
||||||
|
|
||||||
let rows: Vec<DiffRow> = rows_proto
|
let rows: Vec<DiffRow> = rows_proto
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(repr::Row::from)
|
.map(|r| {
|
||||||
|
let r = repr::Row::from(r);
|
||||||
|
let reordered = fetch_order
|
||||||
|
.iter()
|
||||||
|
.map(|&i| r.inner[i].clone())
|
||||||
|
.collect_vec();
|
||||||
|
repr::Row::new(reordered)
|
||||||
|
})
|
||||||
.map(|r| (r, now, 1))
|
.map(|r| (r, now, 1))
|
||||||
.collect_vec();
|
.collect_vec();
|
||||||
self.handle_write_request(region_id.into(), rows)
|
self.handle_write_request(region_id.into(), rows)
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
|
|||||||
use crate::adapter::{FlowId, TableName, TableSource};
|
use crate::adapter::{FlowId, TableName, TableSource};
|
||||||
use crate::expr::error::InternalSnafu;
|
use crate::expr::error::InternalSnafu;
|
||||||
use crate::expr::GlobalId;
|
use crate::expr::GlobalId;
|
||||||
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
|
use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP};
|
||||||
|
|
||||||
/// A context that holds the information of the dataflow
|
/// A context that holds the information of the dataflow
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
@@ -51,10 +51,8 @@ pub struct FlownodeContext {
|
|||||||
mpsc::UnboundedReceiver<DiffRow>,
|
mpsc::UnboundedReceiver<DiffRow>,
|
||||||
),
|
),
|
||||||
>,
|
>,
|
||||||
/// store source in buffer for each source table, in case broadcast channel is full
|
|
||||||
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
|
|
||||||
/// the schema of the table, query from metasrv or inferred from TypedPlan
|
/// the schema of the table, query from metasrv or inferred from TypedPlan
|
||||||
pub schema: HashMap<GlobalId, RelationType>,
|
pub schema: HashMap<GlobalId, RelationDesc>,
|
||||||
/// All the tables that have been registered in the worker
|
/// All the tables that have been registered in the worker
|
||||||
pub table_repr: IdToNameMap,
|
pub table_repr: IdToNameMap,
|
||||||
pub query_context: Option<Arc<QueryContext>>,
|
pub query_context: Option<Arc<QueryContext>>,
|
||||||
@@ -109,6 +107,7 @@ impl SourceSender {
|
|||||||
}
|
}
|
||||||
if row_cnt > 0 {
|
if row_cnt > 0 {
|
||||||
debug!("Send {} rows", row_cnt);
|
debug!("Send {} rows", row_cnt);
|
||||||
|
debug!("Send buf len = {}", self.send_buf.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(row_cnt)
|
Ok(row_cnt)
|
||||||
@@ -140,12 +139,19 @@ impl FlownodeContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// flush all sender's buf
|
/// flush all sender's buf
|
||||||
|
///
|
||||||
|
/// return numbers being sent
|
||||||
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
|
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
|
||||||
self.source_sender
|
self.source_sender
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.map(|(_table_id, src_sender)| src_sender.try_send_all())
|
.map(|(_table_id, src_sender)| src_sender.try_send_all())
|
||||||
.try_fold(0, |acc, x| x.map(|x| x + acc))
|
.try_fold(0, |acc, x| x.map(|x| x + acc))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the sum number of rows in all send buf
|
||||||
|
pub fn get_send_buf_size(&self) -> usize {
|
||||||
|
self.source_sender.values().map(|v| v.send_buf.len()).sum()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlownodeContext {
|
impl FlownodeContext {
|
||||||
@@ -226,7 +232,7 @@ impl FlownodeContext {
|
|||||||
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
|
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
|
||||||
///
|
///
|
||||||
/// Returns an error if no table has been registered with the provided names
|
/// Returns an error if no table has been registered with the provided names
|
||||||
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> {
|
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
|
||||||
let id = self
|
let id = self
|
||||||
.table_repr
|
.table_repr
|
||||||
.get_by_name(name)
|
.get_by_name(name)
|
||||||
@@ -297,7 +303,7 @@ impl FlownodeContext {
|
|||||||
.get_by_name(table_name)
|
.get_by_name(table_name)
|
||||||
.map(|(_, gid)| gid)
|
.map(|(_, gid)| gid)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
self.schema.insert(gid, schema);
|
self.schema.insert(gid, schema.into_unnamed());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,6 @@
|
|||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
|
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
|
||||||
use common_meta::key::table_name::{TableNameKey, TableNameManager};
|
use common_meta::key::table_name::{TableNameKey, TableNameManager};
|
||||||
use itertools::Itertools;
|
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use table::metadata::TableId;
|
use table::metadata::TableId;
|
||||||
|
|
||||||
@@ -25,7 +24,7 @@ use crate::adapter::error::{
|
|||||||
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::adapter::TableName;
|
use crate::adapter::TableName;
|
||||||
use crate::repr::{self, ColumnType, RelationType};
|
use crate::repr::{self, ColumnType, RelationDesc, RelationType};
|
||||||
|
|
||||||
/// mapping of table name <-> table id should be query from tableinfo manager
|
/// mapping of table name <-> table id should be query from tableinfo manager
|
||||||
pub struct TableSource {
|
pub struct TableSource {
|
||||||
@@ -107,7 +106,7 @@ impl TableSource {
|
|||||||
pub async fn get_table_name_schema(
|
pub async fn get_table_name_schema(
|
||||||
&self,
|
&self,
|
||||||
table_id: &TableId,
|
table_id: &TableId,
|
||||||
) -> Result<(TableName, RelationType), Error> {
|
) -> Result<(TableName, RelationDesc), Error> {
|
||||||
let table_info_value = self
|
let table_info_value = self
|
||||||
.get_table_info_value(table_id)
|
.get_table_info_value(table_id)
|
||||||
.await?
|
.await?
|
||||||
@@ -123,14 +122,20 @@ impl TableSource {
|
|||||||
];
|
];
|
||||||
|
|
||||||
let raw_schema = table_info_value.table_info.meta.schema;
|
let raw_schema = table_info_value.table_info.meta.schema;
|
||||||
let column_types = raw_schema
|
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
|
||||||
.column_schemas
|
.column_schemas
|
||||||
|
.clone()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|col| ColumnType {
|
.map(|col| {
|
||||||
nullable: col.is_nullable(),
|
(
|
||||||
scalar_type: col.data_type,
|
ColumnType {
|
||||||
|
nullable: col.is_nullable(),
|
||||||
|
scalar_type: col.data_type,
|
||||||
|
},
|
||||||
|
col.name,
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.collect_vec();
|
.unzip();
|
||||||
|
|
||||||
let key = table_info_value.table_info.meta.primary_key_indices;
|
let key = table_info_value.table_info.meta.primary_key_indices;
|
||||||
let keys = vec![repr::Key::from(key)];
|
let keys = vec![repr::Key::from(key)];
|
||||||
@@ -138,10 +143,13 @@ impl TableSource {
|
|||||||
let time_index = raw_schema.timestamp_index;
|
let time_index = raw_schema.timestamp_index;
|
||||||
Ok((
|
Ok((
|
||||||
table_name,
|
table_name,
|
||||||
RelationType {
|
RelationDesc {
|
||||||
column_types,
|
typ: RelationType {
|
||||||
keys,
|
column_types,
|
||||||
time_index,
|
keys,
|
||||||
|
time_index,
|
||||||
|
},
|
||||||
|
names: col_names,
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -124,9 +124,13 @@ fn mfp_subgraph(
|
|||||||
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
|
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
|
||||||
// 2. Output the updates.
|
// 2. Output the updates.
|
||||||
// 3. Truncate all updates within that range.
|
// 3. Truncate all updates within that range.
|
||||||
let from = arrange.read().last_compaction_time().map(|n| n + 1);
|
let from = arrange.read().last_compaction_time();
|
||||||
let from = from.unwrap_or(repr::Timestamp::MIN);
|
let from = from.unwrap_or(repr::Timestamp::MIN);
|
||||||
let output_kv = arrange.read().get_updates_in_range(from..=now);
|
let range = (
|
||||||
|
std::ops::Bound::Excluded(from),
|
||||||
|
std::ops::Bound::Included(now),
|
||||||
|
);
|
||||||
|
let output_kv = arrange.read().get_updates_in_range(range);
|
||||||
// the output is expected to be key -> empty val
|
// the output is expected to be key -> empty val
|
||||||
let output = output_kv
|
let output = output_kv
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu};
|
|||||||
use crate::compute::render::{Context, SubgraphArg};
|
use crate::compute::render::{Context, SubgraphArg};
|
||||||
use crate::compute::state::Scheduler;
|
use crate::compute::state::Scheduler;
|
||||||
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
|
||||||
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
|
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
|
||||||
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
|
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
|
||||||
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
|
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
|
||||||
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
|
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
|
||||||
@@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange(
|
|||||||
// Deal with output:
|
// Deal with output:
|
||||||
|
|
||||||
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
|
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
|
||||||
let from = arrange.read().last_compaction_time().map(|n| n + 1);
|
let from = arrange.read().last_compaction_time();
|
||||||
let from = from.unwrap_or(repr::Timestamp::MIN);
|
let from = from.unwrap_or(repr::Timestamp::MIN);
|
||||||
let output_kv = arrange.read().get_updates_in_range(from..=now);
|
let range = (
|
||||||
|
std::ops::Bound::Excluded(from),
|
||||||
|
std::ops::Bound::Included(now),
|
||||||
|
);
|
||||||
|
let output_kv = arrange.read().get_updates_in_range(range);
|
||||||
|
|
||||||
// 2. Truncate all updates stored in arrangement within that range.
|
// 2. Truncate all updates stored in arrangement within that range.
|
||||||
let run_compaction = || {
|
let run_compaction = || {
|
||||||
@@ -397,6 +401,24 @@ fn reduce_accum_subgraph(
|
|||||||
// TODO(discord9): consider key-based lock
|
// TODO(discord9): consider key-based lock
|
||||||
let mut arrange = arrange.write();
|
let mut arrange = arrange.write();
|
||||||
for (key, value_diffs) in key_to_vals {
|
for (key, value_diffs) in key_to_vals {
|
||||||
|
if let Some(expire_man) = &arrange.get_expire_state() {
|
||||||
|
let mut is_expired = false;
|
||||||
|
err_collector.run(|| {
|
||||||
|
if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
|
||||||
|
is_expired = true;
|
||||||
|
DataAlreadyExpiredSnafu {
|
||||||
|
expired_by: expired,
|
||||||
|
}
|
||||||
|
.fail()
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if is_expired {
|
||||||
|
// errors already collected, we can just continue to next key
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
let col_diffs = {
|
let col_diffs = {
|
||||||
let row_len = value_diffs[0].0.len();
|
let row_len = value_diffs[0].0.len();
|
||||||
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
|
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
|
||||||
|
|||||||
@@ -55,9 +55,12 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
|||||||
.df
|
.df
|
||||||
.add_subgraph_source("source", send_port, move |_ctx, send| {
|
.add_subgraph_source("source", send_port, move |_ctx, send| {
|
||||||
let now = *now.borrow();
|
let now = *now.borrow();
|
||||||
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
|
// write lock to prevent unexpected mutation
|
||||||
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
|
let mut arranged = arrange_handler_inner.write();
|
||||||
|
let arr = arranged.get_updates_in_range(..=now);
|
||||||
|
err_collector.run(|| arranged.compact_to(now));
|
||||||
|
|
||||||
|
debug!("Call source");
|
||||||
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
|
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
|
||||||
let mut to_send = Vec::new();
|
let mut to_send = Vec::new();
|
||||||
let mut to_arrange = Vec::new();
|
let mut to_arrange = Vec::new();
|
||||||
@@ -77,10 +80,10 @@ impl<'referred, 'df> Context<'referred, 'df> {
|
|||||||
to_arrange.len()
|
to_arrange.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
|
err_collector.run(|| arranged.apply_updates(now, to_arrange));
|
||||||
send.give(all);
|
send.give(all);
|
||||||
// always schedule source to run at next tick
|
// always schedule source to run at now so we can repeatedly run source if needed
|
||||||
inner_schd.schedule_at(now + 1);
|
inner_schd.schedule_at(now);
|
||||||
});
|
});
|
||||||
schd.set_cur_subgraph(sub);
|
schd.set_cur_subgraph(sub);
|
||||||
let arranged = Arranged::new(arrange_handler);
|
let arranged = Arranged::new(arrange_handler);
|
||||||
|
|||||||
@@ -100,4 +100,11 @@ pub enum EvalError {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Incoming data already expired by {} ms", expired_by))]
|
||||||
|
DataAlreadyExpired {
|
||||||
|
expired_by: i64,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -278,9 +278,9 @@ impl UnaryFunc {
|
|||||||
start_time,
|
start_time,
|
||||||
} => {
|
} => {
|
||||||
let ts = get_ts_as_millisecond(arg)?;
|
let ts = get_ts_as_millisecond(arg)?;
|
||||||
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
|
let start_time = start_time.map(|t| t.val());
|
||||||
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
||||||
let window_start = start_time + (ts - start_time) / window_size * window_size;
|
let window_start = get_window_start(ts, window_size, start_time);
|
||||||
|
|
||||||
let ret = Timestamp::new_millisecond(window_start);
|
let ret = Timestamp::new_millisecond(window_start);
|
||||||
Ok(Value::from(ret))
|
Ok(Value::from(ret))
|
||||||
@@ -290,9 +290,9 @@ impl UnaryFunc {
|
|||||||
start_time,
|
start_time,
|
||||||
} => {
|
} => {
|
||||||
let ts = get_ts_as_millisecond(arg)?;
|
let ts = get_ts_as_millisecond(arg)?;
|
||||||
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
|
let start_time = start_time.map(|t| t.val());
|
||||||
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
|
||||||
let window_start = start_time + (ts - start_time) / window_size * window_size;
|
let window_start = get_window_start(ts, window_size, start_time);
|
||||||
|
|
||||||
let window_end = window_start + window_size;
|
let window_end = window_start + window_size;
|
||||||
let ret = Timestamp::new_millisecond(window_end);
|
let ret = Timestamp::new_millisecond(window_end);
|
||||||
@@ -302,6 +302,35 @@ impl UnaryFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_window_start(
|
||||||
|
ts: repr::Timestamp,
|
||||||
|
window_size: repr::Duration,
|
||||||
|
start_time: Option<repr::Timestamp>,
|
||||||
|
) -> repr::Timestamp {
|
||||||
|
let start_time = start_time.unwrap_or(0);
|
||||||
|
// left close right open
|
||||||
|
if ts >= start_time {
|
||||||
|
start_time + (ts - start_time) / window_size * window_size
|
||||||
|
} else {
|
||||||
|
start_time + (ts - start_time) / window_size * window_size
|
||||||
|
- if ((start_time - ts) % window_size) != 0 {
|
||||||
|
window_size
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_window_start() {
|
||||||
|
assert_eq!(get_window_start(1, 3, None), 0);
|
||||||
|
assert_eq!(get_window_start(3, 3, None), 3);
|
||||||
|
assert_eq!(get_window_start(0, 3, None), 0);
|
||||||
|
|
||||||
|
assert_eq!(get_window_start(-1, 3, None), -3);
|
||||||
|
assert_eq!(get_window_start(-3, 3, None), -3);
|
||||||
|
}
|
||||||
|
|
||||||
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
|
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
|
||||||
let ts = if let Some(ts) = arg.as_timestamp() {
|
let ts = if let Some(ts) = arg.as_timestamp() {
|
||||||
ts.convert_to(TimeUnit::Millisecond)
|
ts.convert_to(TimeUnit::Millisecond)
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ impl TypedExpr {
|
|||||||
|
|
||||||
impl TypedExpr {
|
impl TypedExpr {
|
||||||
/// expand multi-value expression to multiple expressions with new indices
|
/// expand multi-value expression to multiple expressions with new indices
|
||||||
|
///
|
||||||
|
/// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling`
|
||||||
pub fn expand_multi_value(
|
pub fn expand_multi_value(
|
||||||
input_typ: &RelationType,
|
input_typ: &RelationType,
|
||||||
exprs: &[TypedExpr],
|
exprs: &[TypedExpr],
|
||||||
|
|||||||
@@ -262,6 +262,19 @@ impl RelationType {
|
|||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return relation describe with column names
|
||||||
|
pub fn into_named(self, names: Vec<ColumnName>) -> RelationDesc {
|
||||||
|
RelationDesc { typ: self, names }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return relation describe without column names
|
||||||
|
pub fn into_unnamed(self) -> RelationDesc {
|
||||||
|
RelationDesc {
|
||||||
|
typ: self,
|
||||||
|
names: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The type of a `Value`
|
/// The type of a `Value`
|
||||||
@@ -325,8 +338,8 @@ fn return_true() -> bool {
|
|||||||
/// Individual column names are optional.
|
/// Individual column names are optional.
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
|
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
|
||||||
pub struct RelationDesc {
|
pub struct RelationDesc {
|
||||||
typ: RelationType,
|
pub typ: RelationType,
|
||||||
names: Vec<ColumnName>,
|
pub names: Vec<ColumnName>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RelationDesc {
|
impl RelationDesc {
|
||||||
|
|||||||
@@ -211,7 +211,7 @@ mod test {
|
|||||||
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
|
||||||
|
|
||||||
tri_map.insert(Some(name.clone()), Some(1024), gid);
|
tri_map.insert(Some(name.clone()), Some(1024), gid);
|
||||||
schemas.insert(gid, schema);
|
schemas.insert(gid, schema.into_unnamed());
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -225,7 +225,7 @@ mod test {
|
|||||||
ColumnType::new(CDT::uint32_datatype(), false),
|
ColumnType::new(CDT::uint32_datatype(), false),
|
||||||
ColumnType::new(CDT::datetime_datatype(), false),
|
ColumnType::new(CDT::datetime_datatype(), false),
|
||||||
]);
|
]);
|
||||||
schemas.insert(gid, schema);
|
schemas.insert(gid, schema.into_unnamed());
|
||||||
tri_map.insert(Some(name.clone()), Some(1025), gid);
|
tri_map.insert(Some(name.clone()), Some(1025), gid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -435,6 +435,236 @@ mod test {
|
|||||||
use crate::repr::{self, ColumnType, RelationType};
|
use crate::repr::{self, ColumnType, RelationType};
|
||||||
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
|
||||||
|
|
||||||
|
/// TODO(discord9): add more illegal sql tests
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_tumble_composite() {
|
||||||
|
let engine = create_test_query_engine();
|
||||||
|
let sql =
|
||||||
|
"SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
|
||||||
|
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||||
|
|
||||||
|
let mut ctx = create_test_ctx();
|
||||||
|
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||||
|
|
||||||
|
let aggr_exprs = vec![
|
||||||
|
AggregateExpr {
|
||||||
|
func: AggregateFunc::SumUInt32,
|
||||||
|
expr: ScalarExpr::Column(0),
|
||||||
|
distinct: false,
|
||||||
|
},
|
||||||
|
AggregateExpr {
|
||||||
|
func: AggregateFunc::Count,
|
||||||
|
expr: ScalarExpr::Column(0),
|
||||||
|
distinct: false,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let avg_expr = ScalarExpr::If {
|
||||||
|
cond: Box::new(ScalarExpr::Column(4).call_binary(
|
||||||
|
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
|
||||||
|
BinaryFunc::NotEq,
|
||||||
|
)),
|
||||||
|
then: Box::new(ScalarExpr::Column(3).call_binary(
|
||||||
|
ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
|
||||||
|
BinaryFunc::DivUInt64,
|
||||||
|
)),
|
||||||
|
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
|
||||||
|
};
|
||||||
|
let expected = TypedPlan {
|
||||||
|
// TODO(discord9): mfp indirectly ref to key columns
|
||||||
|
/*
|
||||||
|
.with_key(vec![1])
|
||||||
|
.with_time_index(Some(0)),*/
|
||||||
|
plan: Plan::Mfp {
|
||||||
|
input: Box::new(
|
||||||
|
Plan::Reduce {
|
||||||
|
input: Box::new(
|
||||||
|
Plan::Get {
|
||||||
|
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||||
|
}
|
||||||
|
.with_types(RelationType::new(vec![
|
||||||
|
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||||
|
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||||
|
])),
|
||||||
|
),
|
||||||
|
key_val_plan: KeyValPlan {
|
||||||
|
key_plan: MapFilterProject::new(2)
|
||||||
|
.map(vec![
|
||||||
|
ScalarExpr::Column(1).call_unary(
|
||||||
|
UnaryFunc::TumbleWindowFloor {
|
||||||
|
window_size: Interval::from_month_day_nano(
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
3_600_000_000_000,
|
||||||
|
),
|
||||||
|
start_time: None,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
ScalarExpr::Column(1).call_unary(
|
||||||
|
UnaryFunc::TumbleWindowCeiling {
|
||||||
|
window_size: Interval::from_month_day_nano(
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
3_600_000_000_000,
|
||||||
|
),
|
||||||
|
start_time: None,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
ScalarExpr::Column(0),
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![2, 3, 4])
|
||||||
|
.unwrap()
|
||||||
|
.into_safe(),
|
||||||
|
val_plan: MapFilterProject::new(2)
|
||||||
|
.project(vec![0, 1])
|
||||||
|
.unwrap()
|
||||||
|
.into_safe(),
|
||||||
|
},
|
||||||
|
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||||
|
full_aggrs: aggr_exprs.clone(),
|
||||||
|
simple_aggrs: vec![
|
||||||
|
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
|
||||||
|
AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
|
||||||
|
],
|
||||||
|
distinct_aggrs: vec![],
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
.with_types(
|
||||||
|
RelationType::new(vec![
|
||||||
|
// keys
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window start(time index)
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window end(pk)
|
||||||
|
ColumnType::new(CDT::uint32_datatype(), false), // number(pk)
|
||||||
|
// values
|
||||||
|
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
|
||||||
|
ColumnType::new(CDT::int64_datatype(), true), // avg.count(number)
|
||||||
|
])
|
||||||
|
.with_key(vec![1, 2])
|
||||||
|
.with_time_index(Some(0)),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
mfp: MapFilterProject::new(5)
|
||||||
|
.map(vec![
|
||||||
|
avg_expr,
|
||||||
|
ScalarExpr::Column(2), // number(pk)
|
||||||
|
ScalarExpr::Column(5), // avg.sum(number)
|
||||||
|
ScalarExpr::Column(0), // window start
|
||||||
|
ScalarExpr::Column(1), // window end
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![6, 7, 8, 9])
|
||||||
|
.unwrap(),
|
||||||
|
},
|
||||||
|
typ: RelationType::new(vec![
|
||||||
|
ColumnType::new(CDT::uint32_datatype(), false), // number
|
||||||
|
ColumnType::new(CDT::uint64_datatype(), true), // avg(number)
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||||
|
]),
|
||||||
|
};
|
||||||
|
assert_eq!(flow_plan, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_tumble_parse_optional() {
|
||||||
|
let engine = create_test_query_engine();
|
||||||
|
let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')";
|
||||||
|
let plan = sql_to_substrait(engine.clone(), sql).await;
|
||||||
|
|
||||||
|
let mut ctx = create_test_ctx();
|
||||||
|
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
|
||||||
|
|
||||||
|
let aggr_expr = AggregateExpr {
|
||||||
|
func: AggregateFunc::SumUInt32,
|
||||||
|
expr: ScalarExpr::Column(0),
|
||||||
|
distinct: false,
|
||||||
|
};
|
||||||
|
let expected = TypedPlan {
|
||||||
|
typ: RelationType::new(vec![
|
||||||
|
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||||
|
]),
|
||||||
|
// TODO(discord9): mfp indirectly ref to key columns
|
||||||
|
/*
|
||||||
|
.with_key(vec![1])
|
||||||
|
.with_time_index(Some(0)),*/
|
||||||
|
plan: Plan::Mfp {
|
||||||
|
input: Box::new(
|
||||||
|
Plan::Reduce {
|
||||||
|
input: Box::new(
|
||||||
|
Plan::Get {
|
||||||
|
id: crate::expr::Id::Global(GlobalId::User(1)),
|
||||||
|
}
|
||||||
|
.with_types(RelationType::new(vec![
|
||||||
|
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
|
||||||
|
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
|
||||||
|
])),
|
||||||
|
),
|
||||||
|
key_val_plan: KeyValPlan {
|
||||||
|
key_plan: MapFilterProject::new(2)
|
||||||
|
.map(vec![
|
||||||
|
ScalarExpr::Column(1).call_unary(
|
||||||
|
UnaryFunc::TumbleWindowFloor {
|
||||||
|
window_size: Interval::from_month_day_nano(
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
3_600_000_000_000,
|
||||||
|
),
|
||||||
|
start_time: None,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
ScalarExpr::Column(1).call_unary(
|
||||||
|
UnaryFunc::TumbleWindowCeiling {
|
||||||
|
window_size: Interval::from_month_day_nano(
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
3_600_000_000_000,
|
||||||
|
),
|
||||||
|
start_time: None,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![2, 3])
|
||||||
|
.unwrap()
|
||||||
|
.into_safe(),
|
||||||
|
val_plan: MapFilterProject::new(2)
|
||||||
|
.project(vec![0, 1])
|
||||||
|
.unwrap()
|
||||||
|
.into_safe(),
|
||||||
|
},
|
||||||
|
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||||
|
full_aggrs: vec![aggr_expr.clone()],
|
||||||
|
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
|
||||||
|
distinct_aggrs: vec![],
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
.with_types(
|
||||||
|
RelationType::new(vec![
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window start
|
||||||
|
ColumnType::new(CDT::datetime_datatype(), false), // window end
|
||||||
|
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
|
||||||
|
])
|
||||||
|
.with_key(vec![1])
|
||||||
|
.with_time_index(Some(0)),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
mfp: MapFilterProject::new(3)
|
||||||
|
.map(vec![
|
||||||
|
ScalarExpr::Column(2),
|
||||||
|
ScalarExpr::Column(3),
|
||||||
|
ScalarExpr::Column(0),
|
||||||
|
ScalarExpr::Column(1),
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![4, 5, 6])
|
||||||
|
.unwrap(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
assert_eq!(flow_plan, expected);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_tumble_parse() {
|
async fn test_tumble_parse() {
|
||||||
let engine = create_test_query_engine();
|
let engine = create_test_query_engine();
|
||||||
|
|||||||
@@ -101,8 +101,7 @@ impl TypedExpr {
|
|||||||
.unzip();
|
.unzip();
|
||||||
|
|
||||||
match arg_len {
|
match arg_len {
|
||||||
// because variadic function can also have 1 arguments, we need to check if it's a variadic function first
|
1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => {
|
||||||
1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {
|
|
||||||
let func = UnaryFunc::from_str_and_type(fn_name, None)?;
|
let func = UnaryFunc::from_str_and_type(fn_name, None)?;
|
||||||
let arg = arg_exprs[0].clone();
|
let arg = arg_exprs[0].clone();
|
||||||
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
|
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
|
||||||
@@ -124,8 +123,13 @@ impl TypedExpr {
|
|||||||
|
|
||||||
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
|
Ok(TypedExpr::new(arg.call_unary(func), ret_type))
|
||||||
}
|
}
|
||||||
// because variadic function can also have 2 arguments, we need to check if it's a variadic function first
|
2 if BinaryFunc::from_str_expr_and_type(
|
||||||
2 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => {
|
fn_name,
|
||||||
|
&arg_exprs,
|
||||||
|
arg_types.get(0..2).expect("arg have 2 elements"),
|
||||||
|
)
|
||||||
|
.is_ok() =>
|
||||||
|
{
|
||||||
let (func, signature) =
|
let (func, signature) =
|
||||||
BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?;
|
BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?;
|
||||||
|
|
||||||
|
|||||||
@@ -269,7 +269,7 @@ impl TypedPlan {
|
|||||||
id: crate::expr::Id::Global(table.0),
|
id: crate::expr::Id::Global(table.0),
|
||||||
};
|
};
|
||||||
let get_table = TypedPlan {
|
let get_table = TypedPlan {
|
||||||
typ: table.1,
|
typ: table.1.typ().clone(),
|
||||||
plan: get_table,
|
plan: get_table,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet};
|
|||||||
use std::ops::Bound;
|
use std::ops::Bound;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_telemetry::debug;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use smallvec::{smallvec, SmallVec};
|
use smallvec::{smallvec, SmallVec};
|
||||||
@@ -86,7 +87,7 @@ impl KeyExpiryManager {
|
|||||||
///
|
///
|
||||||
/// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired.
|
/// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired.
|
||||||
/// - If it's not expired, return None
|
/// - If it's not expired, return None
|
||||||
pub fn update_event_ts(
|
pub fn get_expire_duration_and_update_event_ts(
|
||||||
&mut self,
|
&mut self,
|
||||||
now: Timestamp,
|
now: Timestamp,
|
||||||
row: &Row,
|
row: &Row,
|
||||||
@@ -95,6 +96,33 @@ impl KeyExpiryManager {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
self.event_ts_to_key
|
||||||
|
.entry(event_ts)
|
||||||
|
.or_default()
|
||||||
|
.insert(row.clone());
|
||||||
|
|
||||||
|
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
|
||||||
|
if expire_time > event_ts {
|
||||||
|
// return how much time it's expired
|
||||||
|
return Ok(Some(expire_time - event_ts));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the expire duration of a key, if it's expired by now.
|
||||||
|
///
|
||||||
|
/// Return None if the key is not expired
|
||||||
|
pub fn get_expire_duration(
|
||||||
|
&self,
|
||||||
|
now: Timestamp,
|
||||||
|
row: &Row,
|
||||||
|
) -> Result<Option<Duration>, EvalError> {
|
||||||
|
let Some(event_ts) = self.extract_event_ts(row)? else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
|
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
|
||||||
if expire_time > event_ts {
|
if expire_time > event_ts {
|
||||||
// return how much time it's expired
|
// return how much time it's expired
|
||||||
@@ -102,10 +130,6 @@ impl KeyExpiryManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.event_ts_to_key
|
|
||||||
.entry(event_ts)
|
|
||||||
.or_default()
|
|
||||||
.insert(row.clone());
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -189,6 +213,10 @@ impl Arrangement {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
|
||||||
|
self.expire_state.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
|
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
|
||||||
self.expire_state = Some(expire_state);
|
self.expire_state = Some(expire_state);
|
||||||
}
|
}
|
||||||
@@ -208,8 +236,12 @@ impl Arrangement {
|
|||||||
for ((key, val), update_ts, diff) in updates {
|
for ((key, val), update_ts, diff) in updates {
|
||||||
// check if the key is expired
|
// check if the key is expired
|
||||||
if let Some(s) = &mut self.expire_state {
|
if let Some(s) = &mut self.expire_state {
|
||||||
if let Some(expired_by) = s.update_event_ts(now, &key)? {
|
if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? {
|
||||||
max_expired_by = max_expired_by.max(Some(expired_by));
|
max_expired_by = max_expired_by.max(Some(expired_by));
|
||||||
|
debug!(
|
||||||
|
"Expired key: {:?}, expired by: {:?} with time being now={}",
|
||||||
|
key, expired_by, now
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -335,7 +367,9 @@ impl Arrangement {
|
|||||||
for (key, updates) in batch {
|
for (key, updates) in batch {
|
||||||
// check if the key is expired
|
// check if the key is expired
|
||||||
if let Some(s) = &mut self.expire_state {
|
if let Some(s) = &mut self.expire_state {
|
||||||
if let Some(expired_by) = s.update_event_ts(now, &key)? {
|
if let Some(expired_by) =
|
||||||
|
s.get_expire_duration_and_update_event_ts(now, &key)?
|
||||||
|
{
|
||||||
max_expired_by = max_expired_by.max(Some(expired_by));
|
max_expired_by = max_expired_by.max(Some(expired_by));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -540,6 +574,10 @@ impl ArrangeHandler {
|
|||||||
pub fn set_full_arrangement(&self, full: bool) {
|
pub fn set_full_arrangement(&self, full: bool) {
|
||||||
self.write().full_arrangement = full;
|
self.write().full_arrangement = full;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_full_arrangement(&self) -> bool {
|
||||||
|
self.read().full_arrangement
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user