Compare commits

..

4 Commits

Author SHA1 Message Date
discord9
08161812ae metrics: better bucket&longer timeout 2025-02-27 17:19:34 +08:00
discord9
33d1ba242f fix: timeout 2025-02-27 17:19:34 +08:00
discord9
40ce94f3bf fix: heartbeat&expire_after unit 2025-02-27 17:19:34 +08:00
discord9
4628119a42 feat: time window in df plan
WIP

test: found out time window expr

chore: pub

tests: also unparsed

tests: rm dup code

feat: frontend client for recording rule

fix: bound edgecase

WIP

WIP

feat: rule engine

feat: add init options& tmp rerounte to rule

fix: dist client get

fix: also not handle mirror write in flownode

chore: clippy
2025-02-27 17:19:34 +08:00
36 changed files with 332 additions and 1815 deletions

1
Cargo.lock generated
View File

@@ -4165,7 +4165,6 @@ dependencies = [
"bytes",
"cache",
"catalog",
"chrono",
"client",
"common-base",
"common-catalog",

View File

@@ -319,7 +319,6 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |

View File

@@ -50,9 +50,6 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true

View File

@@ -445,16 +445,10 @@ impl Pool {
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
// use weak ref here to prevent pool being leaked
let pool_weak = Arc::downgrade(&pool);
loop {
let _ = interval.tick().await;
if let Some(pool) = pool_weak.upgrade() {
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
} else {
// no one is using this pool, so we can also let go
break;
}
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
}
}

View File

@@ -57,10 +57,12 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -230,8 +232,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
}
}
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
@@ -313,7 +315,7 @@ mod tests {
node_id: 2,
};
let key_bytes: Vec<u8> = (&key).into();
let key_bytes: Vec<u8> = key.into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);

View File

@@ -34,7 +34,6 @@ pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod range_stream;

View File

@@ -1,152 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::time::Duration;
use common_telemetry::{debug, error, info, warn};
use tokio::task::JoinHandle;
use tokio::time::{interval, MissedTickBehavior};
use crate::cluster::{NodeInfo, NodeInfoKey};
use crate::error;
use crate::kv_backend::ResettableKvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
/// [NodeExpiryListener] periodically checks all node info in memory and removes
/// expired node info to prevent memory leak.
pub struct NodeExpiryListener {
handle: Mutex<Option<JoinHandle<()>>>,
max_idle_time: Duration,
in_memory: ResettableKvBackendRef,
}
impl Drop for NodeExpiryListener {
fn drop(&mut self) {
self.stop();
}
}
impl NodeExpiryListener {
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
Self {
handle: Mutex::new(None),
max_idle_time,
in_memory,
}
}
async fn start(&self) {
let mut handle = self.handle.lock().unwrap();
if handle.is_none() {
let in_memory = self.in_memory.clone();
let max_idle_time = self.max_idle_time;
let ticker_loop = tokio::spawn(async move {
// Run clean task every minute.
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
error!(e; "Failed to clean expired node");
}
}
});
*handle = Some(ticker_loop);
}
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.abort();
info!("Node expiry listener stopped")
}
}
/// Cleans expired nodes from memory.
async fn clean_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<()> {
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
for key in node_keys {
let key_bytes: Vec<u8> = (&key).into();
if let Err(e) = in_memory.delete(&key_bytes, false).await {
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
} else {
debug!("Deleted expired node key: {:?}", key);
}
}
Ok(())
}
/// Lists expired nodes that have been inactive more than `max_idle_time`.
async fn list_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
let req = RangeRequest::new().with_prefix(prefix);
let current_time_millis = common_time::util::current_time_millis();
let resp = in_memory.range(req).await?;
Ok(resp
.kvs
.into_iter()
.filter_map(move |KeyValue { key, value }| {
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
warn!(e; "Unrecognized node info value");
}) else {
return None;
};
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
{
NodeInfoKey::try_from(key)
.inspect_err(|e| {
warn!(e; "Unrecognized node info key: {:?}", info.peer);
})
.ok()
.inspect(|node_key| {
debug!("Found expired node: {:?}", node_key);
})
} else {
None
}
}))
}
}
#[async_trait::async_trait]
impl LeadershipChangeListener for NodeExpiryListener {
fn name(&self) -> &str {
"NodeExpiryListener"
}
async fn on_leader_start(&self) -> error::Result<()> {
self.start().await;
info!(
"On leader start, node expiry listener started with max idle time: {:?}",
self.max_idle_time
);
Ok(())
}
async fn on_leader_stop(&self) -> error::Result<()> {
self.stop();
info!("On leader stop, node expiry listener stopped");
Ok(())
}
}

View File

@@ -32,5 +32,5 @@ pub mod types;
pub mod value;
pub mod vectors;
pub use arrow::{self, compute};
pub use arrow;
pub use error::{Error, Result};

View File

@@ -16,7 +16,6 @@ async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true

View File

@@ -64,7 +64,7 @@ pub(crate) mod refill;
mod stat;
#[cfg(test)]
mod tests;
pub(crate) mod util;
mod util;
mod worker;
pub(crate) mod node_context;

View File

@@ -155,11 +155,8 @@ impl Flownode for FlowWorkerManager {
#[allow(unreachable_code, unused)]
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
return self
.rule_engine
.handle_inserts(request)
.await
.map_err(to_meta_err(snafu::location!()));
return Ok(Default::default());
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush
@@ -212,15 +209,15 @@ impl Flownode for FlowWorkerManager {
.collect_vec();
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Some utility functions
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;

View File

@@ -54,13 +54,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Time error"))]
Time {
source: common_time::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -256,9 +249,7 @@ impl ErrorExt for Error {
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery
}
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -14,7 +14,6 @@
//! Send heartbeat from flownode to metasrv
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
@@ -25,7 +24,7 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, error, info};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
@@ -66,7 +65,6 @@ pub struct HeartbeatTask {
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -89,25 +87,11 @@ impl HeartbeatTask {
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
}
pub async fn start(&self) -> Result<(), Error> {
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Heartbeat task started multiple times");
return Ok(());
}
self.create_streams().await
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -274,7 +258,7 @@ impl HeartbeatTask {
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.create_streams().await.is_ok() {
if self.start().await.is_ok() {
break;
}
}

View File

@@ -17,14 +17,11 @@
mod engine;
mod frontend_client;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::sync::Arc;
use api::helper::pb_value_to_value_ref;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use common_telemetry::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::error::Result as DfResult;
@@ -33,12 +30,10 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, TableReference};
use datafusion_expr::{ColumnarValue, LogicalPlan};
use datafusion_common::{Column, DFSchema, TableReference};
use datafusion_expr::LogicalPlan;
use datafusion_physical_expr::PhysicalExprRef;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVector;
use datatypes::schema::TIME_INDEX_KEY;
use datatypes::value::Value;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
@@ -46,206 +41,15 @@ use datatypes::vectors::{
};
pub use engine::RecordingRuleEngine;
pub use frontend_client::FrontendClient;
use itertools::Itertools;
use query::parser::QueryLanguageParser;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::util::from_proto_to_data_type;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
use crate::expr::error::DataTypeSnafu;
use crate::Error;
#[derive(Debug, Clone)]
pub struct TimeWindowExpr {
phy_expr: PhysicalExprRef,
column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
}
impl TimeWindowExpr {
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
Ok(Self {
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
})
}
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
let lower_bound =
find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?;
let upper_bound =
find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?;
Ok((lower_bound, upper_bound))
}
/// Find timestamps from rows using time window expr
pub async fn handle_rows(
&self,
rows_list: Vec<api::v1::Rows>,
) -> Result<BTreeSet<Timestamp>, Error> {
let mut time_windows = BTreeSet::new();
for rows in rows_list {
// pick the time index column and use it to eval on `self.expr`
let ts_col_index = rows
.schema
.iter()
.map(|col| col.column_name.clone())
.position(|name| name == self.column_name);
let Some(ts_col_index) = ts_col_index else {
warn!("can't found time index column in schema: {:?}", rows.schema);
continue;
};
let col_schema = &rows.schema[ts_col_index];
let cdt = from_proto_to_data_type(col_schema)?;
let column_values = rows
.rows
.iter()
.map(|row| &row.values[ts_col_index])
.collect_vec();
let mut vector = cdt.create_mutable_vector(column_values.len());
for value in column_values {
let value = pb_value_to_value_ref(value, &None);
vector.try_push_value_ref(value).context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;
}
let vector = vector.to_vector();
let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
let rb =
DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
.with_context(|_e| ArrowSnafu {
context: format!(
"Failed to create record batch from {df_schema:?} and {vector:?}"
),
})?;
let eval_res = self
.phy_expr
.evaluate(&rb)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.phy_expr
),
})?;
let res = columnar_to_ts_vector(&eval_res)?;
for ts in res.into_iter().flatten() {
time_windows.insert(ts);
}
}
Ok(time_windows)
}
}
fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
name,
cdt.as_arrow_type(),
false,
)]));
let df_schema = DFSchema::from_field_specific_qualified_schema(
vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
&arrow_schema,
)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
Ok(df_schema)
}
/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
let val = match columnar {
datafusion_expr::ColumnarValue::Array(array) => {
let ty = array.data_type();
let ty = ConcreteDataType::from_arrow_type(ty);
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
ty.unit()
} else {
return UnexpectedSnafu {
reason: format!("Non-timestamp type: {ty:?}"),
}
.fail();
};
match time_unit {
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec(),
TimeUnit::Millisecond => {
TimestampMillisecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.iter_data()
.map(|d| d.map(|d| d.0))
.collect_vec()
}
}
}
datafusion_expr::ColumnarValue::Scalar(scalar) => {
let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert scalar {scalar:?} to value"),
})?;
let ts = value.as_timestamp().context(UnexpectedSnafu {
reason: format!("Expect Timestamp, found {:?}", value),
})?;
vec![Some(ts)]
}
};
Ok(val)
}
/// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan(
query_ctx: QueryContextRef,
@@ -270,16 +74,27 @@ pub async fn sql_to_df_plan(
Ok(plan)
}
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
async fn find_time_window_expr(
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
/// return `Some("2021-07-01 00:00:00.000")`
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
///
/// Time window expr is a expr that:
/// 1. ref only to a time index column
/// 2. is monotonic increasing
/// 3. show up in GROUP BY clause
///
/// note this plan should only contain one TableScan
pub async fn find_plan_time_window_bound(
plan: &LogicalPlan,
catalog_man: CatalogManagerRef,
current: Timestamp,
query_ctx: QueryContextRef,
) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
engine: QueryEngineRef,
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
// TODO(discord9): find the expr that do time window
let catalog_man = engine.engine_state().catalog_manager();
let mut table_name = None;
// first find the table source in the logical plan
plan.apply(|plan| {
let LogicalPlan::TableScan(table_scan) = plan else {
@@ -332,6 +147,45 @@ async fn find_time_window_expr(
),
})?.unit();
let ts_columns: HashSet<_> = HashSet::from_iter(vec![
format!("{catalog_name}.{schema_name}.{table_name}.{ts_col_name}"),
format!("{schema_name}.{table_name}.{ts_col_name}"),
format!("{table_name}.{ts_col_name}"),
format!("{ts_col_name}"),
]);
let ts_columns: HashSet<_> = ts_columns
.into_iter()
.map(Column::from_qualified_name)
.collect();
let ts_columns_ref: HashSet<&Column> = ts_columns.iter().collect();
// find the time window expr which refers to the time index column
let mut time_window_expr: Option<Expr> = None;
let find_time_window_expr = |plan: &LogicalPlan| {
let LogicalPlan::Aggregate(aggregate) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
for group_expr in &aggregate.group_expr {
let refs = group_expr.column_refs();
if refs.len() != 1 {
continue;
}
let ref_col = refs.iter().next().unwrap();
if ts_columns_ref.contains(ref_col) {
time_window_expr = Some(group_expr.clone());
break;
}
}
Ok(TreeNodeRecursion::Stop)
};
plan.apply(find_time_window_expr)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find time window expr in plan {plan:?}"),
})?;
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
ts_col_name.clone(),
ts_index.data_type.as_arrow_type(),
@@ -346,98 +200,6 @@ async fn find_time_window_expr(
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
// find the time window expr which refers to the time index column
let mut aggr_expr = None;
let mut time_window_expr: Option<Expr> = None;
let find_inner_aggr_expr = |plan: &LogicalPlan| {
if let LogicalPlan::Aggregate(aggregate) = plan {
aggr_expr = Some(aggregate.clone());
};
Ok(TreeNodeRecursion::Continue)
};
plan.apply(find_inner_aggr_expr)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find aggr expr in plan {plan:?}"),
})?;
if let Some(aggregate) = aggr_expr {
for group_expr in &aggregate.group_expr {
let refs = group_expr.column_refs();
if refs.len() != 1 {
continue;
}
let ref_col = refs.iter().next().unwrap();
let index = aggregate.input.schema().maybe_index_of_column(ref_col);
let Some(index) = index else {
continue;
};
let field = aggregate.input.schema().field(index);
let is_time_index = field.metadata().get(TIME_INDEX_KEY) == Some(&"true".to_string());
if is_time_index {
let rewrite_column = group_expr.clone();
let rewritten = rewrite_column
.rewrite(&mut RewriteColumn {
table_name: table_name.to_string(),
})
.with_context(|_| DatafusionSnafu {
context: format!("Rewrite expr failed, expr={:?}", group_expr),
})?
.data;
struct RewriteColumn {
table_name: String,
}
impl TreeNodeRewriter for RewriteColumn {
type Node = Expr;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
let Expr::Column(mut column) = node else {
return Ok(Transformed::no(node));
};
column.relation = Some(TableReference::bare(self.table_name.clone()));
Ok(Transformed::yes(Expr::Column(column)))
}
}
time_window_expr = Some(rewritten);
break;
}
}
Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
} else {
// can't found time window expr, return None
Ok((ts_col_name, None, expected_time_unit, df_schema))
}
}
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
/// return `Some("2021-07-01 00:00:00.000")`
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
///
/// Time window expr is a expr that:
/// 1. ref only to a time index column
/// 2. is monotonic increasing
/// 3. show up in GROUP BY clause
///
/// note this plan should only contain one TableScan
pub async fn find_plan_time_window_bound(
plan: &LogicalPlan,
current: Timestamp,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
// TODO(discord9): find the expr that do time window
let catalog_man = engine.engine_state().catalog_manager();
let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
// cast current to ts_index's type
let new_current = current
.convert_to(expected_time_unit)
@@ -461,7 +223,6 @@ pub async fn find_plan_time_window_bound(
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
@@ -470,6 +231,8 @@ fn find_expr_time_window_lower_bound(
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
@@ -481,8 +244,91 @@ fn find_expr_time_window_lower_bound(
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
let input_time_unit = cur_time_window.unit();
Ok(cur_time_window.convert_to(input_time_unit))
if cur_time_window == current {
return Ok(Some(current));
}
// search to find the lower bound
let mut offset: i64 = 1;
let lower_bound;
let mut upper_bound = Some(current);
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_sub(offset) else {
// no lower bound
return Ok(None);
};
let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
let prev_time_window = eval_ts_to_ts(&phy_expr, df_schema, prev_time_probe)?;
match prev_time_window.cmp(&cur_time_window) {
Ordering::Less => {
lower_bound = Some(prev_time_probe);
break;
}
Ordering::Equal => {
upper_bound = Some(prev_time_probe);
}
Ordering::Greater => {
UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no lower bound
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact lower bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let input_time_unit = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, input_time_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => low = mid + 1,
Ordering::Equal => high = mid,
Ordering::Greater => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
}
}
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit);
Ok(Some(final_lower_bound_for_time_window))
}
/// Find the upper bound for time window expression
@@ -591,22 +437,7 @@ fn eval_ts_to_ts(
df_schema: &DFSchema,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let schema_ty = df_schema.field(0).data_type();
let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
ts.unit()
} else {
return UnexpectedSnafu {
reason: format!("Expect Timestamp, found {:?}", schema_cdt),
}
.fail();
};
let input_value = input_value
.convert_to(schema_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
})?;
let ts_vector = match schema_unit {
let ts_vector = match input_value.unit() {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
@@ -630,14 +461,59 @@ fn eval_ts_to_ts(
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
})?;
if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
Ok(*ts)
let val = match eval_res {
datafusion_expr::ColumnarValue::Array(array) => {
let ty = array.data_type();
let ty = ConcreteDataType::from_arrow_type(ty);
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
ty.unit()
} else {
return UnexpectedSnafu {
reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"),
}
.fail();
};
match time_unit {
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0),
TimeUnit::Millisecond => {
TimestampMillisecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
}
}
datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert scalar {scalar:?} to value"),
})?,
};
if let Value::Timestamp(ts) = val {
Ok(ts)
} else {
UnexpectedSnafu {
reason: format!(
"Expected timestamp in expression {phy:?} but got {:?}",
eval_res
),
reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"),
}
.fail()?
}
@@ -663,7 +539,7 @@ impl AddFilterRewriter {
impl TreeNodeRewriter for AddFilterRewriter {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
@@ -686,19 +562,7 @@ impl TreeNodeRewriter for AddFilterRewriter {
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
/// A dialect that forces all identifiers to be quoted
struct ForceQuoteIdentifiers;
impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers {
fn identifier_quote_style(&self, identifier: &str) -> Option<char> {
if identifier.to_lowercase() != identifier {
Some('"')
} else {
None
}
}
}
let unparser = Unparser::new(&ForceQuoteIdentifiers);
// first make all column qualified
let unparser = Unparser::default();
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
@@ -717,22 +581,6 @@ mod test {
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_sql_plan_convert() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#;
let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false)
.await
.unwrap();
let new_sql = df_plan_to_sql(&new).unwrap();
assert_eq!(
r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#,
new_sql
);
}
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
@@ -741,7 +589,7 @@ mod test {
),
(
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE ((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4)"
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4))"
),
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
@@ -781,7 +629,7 @@ mod test {
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
),
// complex time window index
(
@@ -856,53 +704,9 @@ mod test {
),
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
),
// subquery
(
"SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
),
// cte
(
"with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
),
// complex subquery without alias
(
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
),
// complex subquery alias
(
"SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
),
];
for (sql, current, expected, expected_unparsed) in testcases {
for (sql, current, expected, unparsed) in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
.await
.unwrap();
@@ -934,7 +738,7 @@ mod test {
} else {
sql.to_string()
};
assert_eq!(expected_unparsed, new_sql);
assert_eq!(unparsed, new_sql);
}
}
}

View File

@@ -12,40 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::flow::FlowResponse;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::TreeNode;
use datatypes::value::Value;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{oneshot, RwLock};
use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter, TimeWindowExpr};
use crate::adapter::{CreateFlowArgs, FlowId, TableName};
use crate::error::{
DatafusionSnafu, DatatypesSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu,
TimeSnafu, UnexpectedSnafu,
};
use super::{df_plan_to_sql, AddFilterRewriter};
use crate::adapter::{CreateFlowArgs, FlowId};
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_time_window_expr, sql_to_df_plan};
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
@@ -60,74 +49,18 @@ pub struct RecordingRuleEngine {
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
engine: QueryEngineRef,
}
impl RecordingRuleEngine {
pub fn new(
frontend_client: Arc<FrontendClient>,
engine: QueryEngineRef,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
) -> Self {
pub fn new(frontend_client: Arc<FrontendClient>, engine: QueryEngineRef) -> Self {
Self {
tasks: Default::default(),
shutdown_txs: Default::default(),
frontend_client,
flow_metadata_manager,
table_meta,
engine,
}
}
pub async fn handle_inserts(
&self,
request: api::v1::region::InsertRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_name: HashMap<TableName, Vec<api::v1::Rows>> = HashMap::new();
for r in request.requests {
let tid = RegionId::from(r.region_id).table_id();
let name = get_table_name(table_info_mgr, &tid).await?;
let entry = group_by_table_name.entry(name).or_default();
if let Some(rows) = r.rows {
entry.push(rows);
}
}
for (_flow_id, task) in self.tasks.read().await.iter() {
let src_table_names = &task.source_table_names;
for src_table_name in src_table_names {
if let Some(entry) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.time_window_expr else {
continue;
};
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().await;
state
.dirty_time_windows
.add_lower_bounds(involved_time_windows.into_iter());
}
}
}
Ok(Default::default())
}
}
async fn get_table_name(zelf: &TableInfoManager, table_id: &TableId) -> Result<TableName, Error> {
zelf.get(*table_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
@@ -137,7 +70,7 @@ impl RecordingRuleEngine {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
source_table_ids: _,
create_if_not_exists,
or_replace,
expire_after,
@@ -182,48 +115,14 @@ impl RecordingRuleEngine {
}
.fail()?
};
let query_ctx = Arc::new(query_ctx);
let mut source_table_names = Vec::new();
for src_id in source_table_ids {
let table_name = self
.table_meta
.table_info_manager()
.get(src_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", src_id),
})
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])?;
source_table_names.push(table_name);
}
let (tx, rx) = oneshot::channel();
let plan = sql_to_df_plan(query_ctx.clone(), self.engine.clone(), &sql, true).await?;
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
self.engine.engine_state().catalog_manager().clone(),
query_ctx.clone(),
)
.await?;
let phy_expr = time_window_expr
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
let task = RecordingRuleTask::new(
flow_id,
&sql,
phy_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
Arc::new(query_ctx),
rx,
);
@@ -270,35 +169,28 @@ impl RecordingRuleEngine {
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
pub flow_id: FlowId,
flow_id: FlowId,
query: String,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
pub expire_after: Option<i64>,
expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: HashSet<[String; 3]>,
state: Arc<RwLock<RecordingRuleState>>,
}
impl RecordingRuleTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
flow_id: FlowId,
query: &str,
time_window_expr: Option<TimeWindowExpr>,
expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
flow_id,
query: query.to_string(),
time_window_expr,
expire_after,
sink_table_name,
source_table_names: source_table_names.into_iter().collect(),
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
}
}
@@ -315,20 +207,17 @@ impl RecordingRuleTask {
loop {
// FIXME(discord9): test if need upper bound also works
let new_query = self.gen_query_with_time_window(engine.clone()).await?;
let new_query = self
.gen_query_with_time_window(engine.clone(), false)
.await?;
let insert_into = if let Some(new_query) = new_query {
format!(
"INSERT INTO {}.{}.{} {}",
self.sink_table_name[0],
self.sink_table_name[1],
self.sink_table_name[2],
new_query
)
} else {
tokio::time::sleep(MIN_REFRESH_DURATION).await;
continue;
};
let insert_into = format!(
"INSERT INTO {}.{}.{} {}",
self.sink_table_name[0],
self.sink_table_name[1],
self.sink_table_name[2],
new_query
);
if is_first {
is_first = false;
@@ -378,8 +267,6 @@ impl RecordingRuleTask {
.write()
.await
.after_query_exec(elapsed, res.is_ok());
// drop the result to free client-related resources
drop(res);
let sleep_until = {
let mut state = self.state.write().await;
@@ -397,11 +284,11 @@ impl RecordingRuleTask {
}
}
/// will merge and use the first ten time window in query
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
) -> Result<Option<String>, Error> {
need_upper_bound: bool,
) -> Result<String, Error> {
let query_ctx = self.state.read().await.query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
@@ -409,67 +296,49 @@ impl RecordingRuleTask {
.expect("Time went backwards");
let low_bound = self
.expire_after
.map(|e| since_the_epoch.as_secs() - e as u64)
.unwrap_or(u64::MIN);
.map(|e| since_the_epoch.as_secs() - e as u64);
let Some(low_bound) = low_bound else {
return Ok(self.query.clone());
};
let low_bound = Timestamp::new_second(low_bound as i64);
// TODO(discord9): use time window expr to get the precise expire lower bound
let expire_time_window_bound = self
.time_window_expr
.as_ref()
.map(|expr| expr.eval(low_bound))
.transpose()?;
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
let (col_name, lower, upper) =
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
.await?;
let new_sql = {
let expr = {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.flow_id
),
})?;
self.state
.write()
.await
.dirty_time_windows
.gen_filter_exprs(&col_name, Some(l), window_size, self)?
}
_ => {
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id
);
// since no time window lower/upper bound is found, just return the original query
return Ok(Some(self.query.clone()));
}
}
let to_df_literal = |value| -> Result<_, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
};
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.transpose()?
.map(|s| s.to_string())
);
let Some(expr) = expr else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.flow_id);
return Ok(None);
let lower = lower.map(to_df_literal).transpose()?;
let upper = upper.map(to_df_literal).transpose()?.and_then(|u| {
if need_upper_bound {
Some(u)
} else {
None
}
});
let expr = {
use datafusion_expr::{col, lit};
match (lower, upper) {
(Some(l), Some(u)) => col(&col_name)
.gt_eq(lit(l))
.and(col(&col_name).lt_eq(lit(u))),
(Some(l), None) => col(&col_name).gt_eq(lit(l)),
(None, Some(u)) => col(&col_name).lt(lit(u)),
// no time window, direct return
(None, None) => return Ok(self.query.clone()),
}
};
let mut add_filter = AddFilterRewriter::new(expr);
@@ -486,7 +355,7 @@ impl RecordingRuleTask {
df_plan_to_sql(&plan)?
};
Ok(Some(new_sql))
Ok(new_sql)
}
}
@@ -497,189 +366,16 @@ pub struct RecordingRuleState {
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
/// Dirty Time windows need to be updated
/// mapping of `start -> end` and non-overlapping
dirty_time_windows: DirtyTimeWindows,
exec_state: ExecState,
shutdown_rx: oneshot::Receiver<()>,
}
#[derive(Debug, Clone, Default)]
pub struct DirtyTimeWindows {
windows: BTreeMap<Timestamp, Option<Timestamp>>,
}
fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
}
impl DirtyTimeWindows {
/// Time window merge distance
const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
///
/// * `lower_bounds` - An iterator of lower bounds to be added.
pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
for lower_bound in lower_bounds {
let entry = self.windows.entry(lower_bound);
entry.or_insert(None);
}
}
/// Generate all filter expressions consuming all time windows
pub fn gen_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
task_ctx: &RecordingRuleTask,
) -> Result<Option<datafusion_expr::Expr>, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
window_size
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
warn!(
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
task_ctx.time_window_expr,
task_ctx.expire_after,
first_time_window,
last_time_window,
task_ctx.query
);
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
debug!(
"Time window start: {:?}, end: {:?}",
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
);
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
let upper = end.map(to_df_literal).transpose()?;
let expr = if let Some(upper) = upper {
col(col_name)
.gt_eq(lit(lower))
.and(col(col_name).lt(lit(upper)))
} else {
col(col_name).gt_eq(lit(lower))
};
expr_lst.push(expr);
}
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
Ok(expr)
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
expire_lower_bound: Option<Timestamp>,
) -> Result<(), Error> {
let mut new_windows = BTreeMap::new();
let mut prev_tw = None;
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
// filter out expired time window
if let Some(expire_lower_bound) = expire_lower_bound {
if lower_bound <= expire_lower_bound {
continue;
}
}
let Some(prev_tw) = &mut prev_tw else {
prev_tw = Some((lower_bound, upper_bound));
continue;
};
let std_window_size = window_size.to_std().map_err(|e| {
InternalSnafu {
reason: e.to_string(),
}
.build()
})?;
// if cur.lower - prev.upper <= window_size * 2, merge
let prev_upper = prev_tw
.1
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
prev_tw.1 = Some(prev_upper);
let cur_upper = upper_bound.unwrap_or(
lower_bound
.add_duration(std_window_size)
.context(TimeSnafu)?,
);
if lower_bound
.sub(&prev_upper)
.map(|dist| dist <= window_size * Self::MERGE_DIST)
.unwrap_or(false)
{
prev_tw.1 = Some(cur_upper);
} else {
new_windows.insert(prev_tw.0, prev_tw.1);
*prev_tw = (lower_bound, Some(cur_upper));
}
}
if let Some(prev_tw) = prev_tw {
new_windows.insert(prev_tw.0, prev_tw.1);
}
self.windows = new_windows;
Ok(())
}
}
impl RecordingRuleState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
dirty_time_windows: Default::default(),
exec_state: ExecState::Idle,
shutdown_rx,
}
@@ -709,107 +405,3 @@ enum ExecState {
Idle,
Executing,
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_merge_dirty_time_windows() {
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
dirty.windows,
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)])
);
// separate time window
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(5 * 60))
),
(
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Some(Timestamp::new_second(
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)
]),
dirty.windows
);
// overlapping
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
),]),
dirty.windows
);
// expired
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
)),
)
.unwrap();
// just enough to merge
assert_eq!(BTreeMap::from([]), dirty.windows);
}
}

View File

@@ -43,7 +43,6 @@ fn client_from_urls(addrs: Vec<String>) -> Client {
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
channel_mgr: ChannelManager,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -67,10 +66,7 @@ impl DatabaseWithPeer {
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
channel_mgr: default_channel_mgr(),
}
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
@@ -79,8 +75,7 @@ impl FrontendClient {
addr: addr.clone(),
};
let mgr = default_channel_mgr();
let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]);
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
@@ -124,40 +119,32 @@ impl FrontendClient {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
match &self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed {
meta_client: _,
channel_mgr,
} => {
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client =
Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await,
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
}

View File

@@ -453,12 +453,8 @@ impl FlownodeBuilder {
let node_id = self.opts.node_id.map(|id| id as u32);
let rule_engine = RecordingRuleEngine::new(
self.frontend_client.clone(),
query_engine.clone(),
self.flow_metadata_manager.clone(),
table_meta.clone(),
);
let rule_engine =
RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone());
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
for worker_id in 0..num_workers {

View File

@@ -115,37 +115,6 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let schema = vec![
datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
columns.push(column);
let ts = (1..=10).collect_vec();
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);
let schema = Arc::new(Schema::new(schema));
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch);
let req_with_ts = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(),
table_id: 1025,
table,
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -7,7 +7,6 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
mysql_kvbackend = [] # placeholder features so CI can compile
[lints]
workspace = true

View File

@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = (&key).into();
let key = key.into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,

View File

@@ -32,7 +32,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
@@ -152,8 +151,6 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -195,7 +192,6 @@ impl Default for MetasrvOptions {
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
}
}
}
@@ -446,10 +442,6 @@ impl Metasrv {
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
self.options.node_max_idle_time,
self.in_memory.clone(),
)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}

View File

@@ -68,15 +68,13 @@ impl heartbeat_server::Heartbeat for Metasrv {
};
if pusher_id.is_none() {
pusher_id =
Some(register_pusher(&handler_group, header, tx.clone()).await);
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
}
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await
@@ -175,13 +173,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> PusherId {
) -> Option<PusherId> {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(pusher_id, pusher).await;
pusher_id
Some(pusher_id)
}
#[cfg(test)]

View File

@@ -336,11 +336,11 @@ impl Inserter {
let InstantAndNormalInsertRequests {
normal_requests,
instant_requests,
instant_requests: _,
} = requests;
// TODO(discord9): mirror some
/*
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
@@ -350,7 +350,7 @@ impl Inserter {
.chain(instant_requests.requests.iter()),
)
.await?;
flow_mirror_task.detach(self.node_manager.clone())?;
flow_mirror_task.detach(self.node_manager.clone())?;*/
// Write requests to datanode and wait for response
let write_tasks = self

View File

@@ -583,8 +583,7 @@ impl HistogramFoldStream {
.expect("field column should not be nullable");
counters.push(counter);
}
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
@@ -673,7 +672,7 @@ impl HistogramFoldStream {
if bucket.len() <= 1 {
return Ok(f64::NAN);
}
if bucket.last().unwrap().is_finite() {
if *bucket.last().unwrap() != f64::INFINITY {
return Err(DataFusionError::Execution(
"last bucket should be +Inf".to_string(),
));
@@ -693,8 +692,8 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
let total = *counter.last().unwrap();
let expected_pos = total * quantile;

View File

@@ -21,7 +21,6 @@ mod idelta;
mod predict_linear;
mod quantile;
mod resets;
mod round;
#[cfg(test)]
mod test_util;
@@ -40,7 +39,6 @@ pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub use round::Round;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -1,105 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use datatypes::compute;
use crate::functions::extract_array;
pub struct Round {
nearest: f64,
}
impl Round {
fn new(nearest: f64) -> Self {
Self { nearest }
}
pub const fn name() -> &'static str {
"prom_round"
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Immutable,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
let value_array = extract_array(&input[0])?;
if self.nearest == 0.0 {
let values = value_array.as_primitive::<Float64Type>();
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
Ok(ColumnarValue::Array(Arc::new(result) as _))
} else {
let values = value_array.as_primitive::<Float64Type>();
let nearest = self.nearest;
let result =
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
Ok(ColumnarValue::Array(Arc::new(result) as _))
}
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::Float64Array;
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let result = round_udf.invoke_batch(&input, 1).unwrap();
let result_array = extract_array(&result).unwrap();
assert_eq!(result_array.len(), 1);
assert_eq!(
result_array.as_primitive::<Float64Type>().values(),
&expected
);
}
#[test]
fn test_round() {
test_round_f64(vec![123.456], 0.001, vec![123.456]);
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
test_round_f64(vec![123.456], 0.1, vec![123.5]);
test_round_f64(vec![123.456], 0.0, vec![123.0]);
test_round_f64(vec![123.456], 1.0, vec![123.0]);
test_round_f64(vec![123.456], 10.0, vec![120.0]);
test_round_f64(vec![123.456], 100.0, vec![100.0]);
test_round_f64(vec![123.456], 105.0, vec![105.0]);
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
}
}

View File

@@ -52,7 +52,7 @@ use promql::extension_plan::{
use promql::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::token::TokenType;
@@ -200,9 +200,10 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
}
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
}
.fail()?,
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
@@ -217,48 +218,6 @@ impl PromPlanner {
Ok(res)
}
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
expr, range, step, ..
} = subquery_expr;
let current_interval = self.ctx.interval;
if let Some(step) = step {
self.ctx.interval = step.as_millis() as _;
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
input,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
@@ -482,7 +441,6 @@ impl PromPlanner {
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
modifier,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -1510,20 +1468,6 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"round" => {
let nearest = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
None => 0.0,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
@@ -1730,7 +1674,7 @@ impl PromPlanner {
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join"
fn_name: "label_join",
}
);
@@ -2177,49 +2121,24 @@ impl PromPlanner {
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let mut left_tag_columns = if only_join_time_index {
BTreeSet::new()
vec![]
} else {
self.ctx
.tag_columns
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.map(Column::from_name)
.collect::<Vec<_>>()
};
let mut right_tag_columns = left_tag_columns.clone();
// apply modifier
if let Some(modifier) = modifier {
// apply label modifier
if let Some(matching) = &modifier.matching {
match matching {
// keeps columns mentioned in `on`
LabelModifier::Include(on) => {
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
right_tag_columns =
right_tag_columns.intersection(&mask).cloned().collect();
}
// removes columns memtioned in `ignoring`
LabelModifier::Exclude(ignoring) => {
// doesn't check existence of label
for label in &ignoring.labels {
let _ = left_tag_columns.remove(label);
let _ = right_tag_columns.remove(label);
}
}
}
}
}
// push time index column if it exists
if let (Some(left_time_index_column), Some(right_time_index_column)) =
(left_time_index_column, right_time_index_column)
{
left_tag_columns.insert(left_time_index_column);
right_tag_columns.insert(right_time_index_column);
left_tag_columns.push(Column::from_name(left_time_index_column));
right_tag_columns.push(Column::from_name(right_time_index_column));
}
let right = LogicalPlanBuilder::from(right)
@@ -2235,16 +2154,7 @@ impl PromPlanner {
.join(
right,
JoinType::Inner,
(
left_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
right_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
),
(left_tag_columns, right_tag_columns),
None,
)
.context(DataFusionPlanningSnafu)?
@@ -3430,59 +3340,6 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn test_hash_join() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_sum".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
)
.await;
// Should be ok
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
SubqueryAlias: http_server_requests_seconds_sum
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_sum
SubqueryAlias: http_server_requests_seconds_count
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_count"#;
assert_eq!(plan.to_string(), expected);
}
#[tokio::test]
async fn test_nested_histogram_quantile() {
let mut eval_stmt = EvalStmt {

View File

@@ -1,81 +0,0 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------+-------+
drop table cache_hit;
Affected Rows: 0

View File

@@ -1,30 +0,0 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
drop table cache_hit;

View File

@@ -638,78 +638,3 @@ drop table cache_miss;
Affected Rows: 0
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
Affected Rows: 4
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
drop table cache_hit_with_null_label;
Affected Rows: 0
drop table cache_miss_with_null_label;
Affected Rows: 0

View File

@@ -295,45 +295,3 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
drop table cache_hit;
drop table cache_miss;
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
drop table cache_hit_with_null_label;
drop table cache_miss_with_null_label;

View File

@@ -295,40 +295,3 @@ drop table histogram3_bucket;
Affected Rows: 0
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
Affected Rows: 0
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
Affected Rows: 7
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
+---------------------+---+-----+
| ts | s | val |
+---------------------+---+-----+
| 1970-01-01T00:48:20 | a | 5.0 |
| 1970-01-01T00:50:00 | a | 5.0 |
+---------------------+---+-----+
drop table histogram4_bucket;
Affected Rows: 0

View File

@@ -163,27 +163,3 @@ insert into histogram3_bucket values
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
drop table histogram3_bucket;
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
drop table histogram4_bucket;

View File

@@ -1,65 +0,0 @@
create table metric_total (
ts timestamp time index,
val double,
);
Affected Rows: 0
insert into metric_total values
(0, 1),
(10000, 2);
Affected Rows: 2
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 3.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 4.0 |
+---------------------+----------------------------------+
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:00 | 10.0 |
+---------------------+----------------------------------+
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:59 | 2.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
drop table metric_total;
Affected Rows: 0

View File

@@ -1,22 +0,0 @@
create table metric_total (
ts timestamp time index,
val double,
);
insert into metric_total values
(0, 1),
(10000, 2);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
drop table metric_total;