Compare commits

...

6 Commits

Author SHA1 Message Date
Weny Xu
904d560175 feat(promql-planner): introduce vector matching binary operation (#5578)
* feat(promql-planner): support vector matching for binary operation

* test: add sqlness tests
2025-02-27 07:39:19 +00:00
Lei, HUANG
765d1277ee fix(metasrv): clean expired nodes in memory (#5592)
* fix/frontend-node-state: Refactor NodeInfoKey and Context Handling in Meta Server

 • Removed unused cluster_id from NodeInfoKey struct.
 • Updated HeartbeatHandlerGroup to return Context alongside HeartbeatResponse.
 • Added current_node_info to Context for tracking node information.
 • Implemented on_node_disconnect in Context to handle node disconnection events, specifically for Frontend roles.
 • Adjusted register_pusher function to return PusherId directly.
 • Updated tests to accommodate changes in Context structure.

* fix/frontend-node-state: Refactor Heartbeat Handler Context Management

Refactored the HeartbeatHandlerGroup::handle method to use a mutable reference for Context instead of passing it by value. This change simplifies the
context management by eliminating the need to return the context with the response. Updated the Metasrv implementation to align with this new context
handling approach, improving code clarity and reducing unnecessary context cloning.

* revert: clean cluster info on disconnect

* fix/frontend-node-state: Add Frontend Expiry Listener and Update NodeInfoKey Conversion

 • Introduced FrontendExpiryListener to manage the expiration of frontend nodes, including its integration with leadership change notifications.
 • Modified NodeInfoKey conversion to use references, enhancing efficiency and consistency across the codebase.
 • Updated collect_cluster_info_handler and metasrv to incorporate the new listener and conversion changes.
 • Added frontend_expiry module to the project structure for better organization and maintainability.

* chore: add config for node expiry

* add some doc

* fix: clippy

* fix/frontend-node-state:
 ### Refactor Node Expiry Handling
 - **Configuration Update**: Removed `node_expiry_tick` from `metasrv.example.toml` and `MetasrvOptions` in `metasrv.rs`.
 - **Module Renaming**: Renamed `frontend_expiry.rs` to `node_expiry_listener.rs` and updated references in `lib.rs`.
 - **Code Refactoring**: Replaced `FrontendExpiryListener` with `NodeExpiryListener` in `node_expiry_listener.rs` and `metasrv.rs`, removing the tick     interval and adjusting logic to use a fixed 60-second interval for node expiry checks.

* fix/frontend-node-state:
 Improve logging in `node_expiry_listener.rs`

 - Enhanced warning message to include peer information when an unrecognized node info key is encountered in `node_expiry_listener.rs`.

* docs: update config docs

* fix/frontend-node-state:
 **Refactor Context Handling in Heartbeat Services**

 - Updated `HeartbeatHandlerGroup` in `handler.rs` to pass `Context` by value instead of by mutable reference, allowing for more flexible context
 management.
 - Modified `Metasrv` implementation in `heartbeat.rs` to clone `Context` when passing to `handle` method, ensuring thread safety and consistency in
 asynchronous operations.
2025-02-27 06:16:36 +00:00
discord9
ccf42a9d97 fix: flow heartbeat retry (#5600)
* fix: flow heartbeat retry

* fix?: not sure if fixed

* chore: per review
2025-02-27 03:58:21 +00:00
Weny Xu
71e2fb895f feat: introduce prom_round fn (#5604)
* feat: introduce `prom_round` fn

* test: add sqlness tests
2025-02-27 03:30:15 +00:00
Ruihang Xia
c9671fd669 feat(promql): implement subquery (#5606)
* feat: initial implement for promql subquery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl and test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:28:04 +00:00
Ruihang Xia
b5efc75aab feat(promql): ignore invalid input in histogram plan (#5607)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:18:20 +00:00
22 changed files with 826 additions and 27 deletions

View File

@@ -319,6 +319,7 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |

View File

@@ -50,6 +50,9 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true

View File

@@ -57,12 +57,10 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -232,8 +230,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
}
}
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
@@ -315,7 +313,7 @@ mod tests {
node_id: 2,
};
let key_bytes: Vec<u8> = key.into();
let key_bytes: Vec<u8> = (&key).into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);

View File

@@ -34,6 +34,7 @@ pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod range_stream;

View File

@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::time::Duration;
use common_telemetry::{debug, error, info, warn};
use tokio::task::JoinHandle;
use tokio::time::{interval, MissedTickBehavior};
use crate::cluster::{NodeInfo, NodeInfoKey};
use crate::error;
use crate::kv_backend::ResettableKvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
/// [NodeExpiryListener] periodically checks all node info in memory and removes
/// expired node info to prevent memory leak.
pub struct NodeExpiryListener {
handle: Mutex<Option<JoinHandle<()>>>,
max_idle_time: Duration,
in_memory: ResettableKvBackendRef,
}
impl Drop for NodeExpiryListener {
fn drop(&mut self) {
self.stop();
}
}
impl NodeExpiryListener {
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
Self {
handle: Mutex::new(None),
max_idle_time,
in_memory,
}
}
async fn start(&self) {
let mut handle = self.handle.lock().unwrap();
if handle.is_none() {
let in_memory = self.in_memory.clone();
let max_idle_time = self.max_idle_time;
let ticker_loop = tokio::spawn(async move {
// Run clean task every minute.
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
error!(e; "Failed to clean expired node");
}
}
});
*handle = Some(ticker_loop);
}
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.abort();
info!("Node expiry listener stopped")
}
}
/// Cleans expired nodes from memory.
async fn clean_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<()> {
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
for key in node_keys {
let key_bytes: Vec<u8> = (&key).into();
if let Err(e) = in_memory.delete(&key_bytes, false).await {
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
} else {
debug!("Deleted expired node key: {:?}", key);
}
}
Ok(())
}
/// Lists expired nodes that have been inactive more than `max_idle_time`.
async fn list_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
let req = RangeRequest::new().with_prefix(prefix);
let current_time_millis = common_time::util::current_time_millis();
let resp = in_memory.range(req).await?;
Ok(resp
.kvs
.into_iter()
.filter_map(move |KeyValue { key, value }| {
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
warn!(e; "Unrecognized node info value");
}) else {
return None;
};
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
{
NodeInfoKey::try_from(key)
.inspect_err(|e| {
warn!(e; "Unrecognized node info key: {:?}", info.peer);
})
.ok()
.inspect(|node_key| {
debug!("Found expired node: {:?}", node_key);
})
} else {
None
}
}))
}
}
#[async_trait::async_trait]
impl LeadershipChangeListener for NodeExpiryListener {
fn name(&self) -> &str {
"NodeExpiryListener"
}
async fn on_leader_start(&self) -> error::Result<()> {
self.start().await;
info!(
"On leader start, node expiry listener started with max idle time: {:?}",
self.max_idle_time
);
Ok(())
}
async fn on_leader_stop(&self) -> error::Result<()> {
self.stop();
info!("On leader stop, node expiry listener stopped");
Ok(())
}
}

View File

@@ -32,5 +32,5 @@ pub mod types;
pub mod value;
pub mod vectors;
pub use arrow;
pub use arrow::{self, compute};
pub use error::{Error, Result};

View File

@@ -103,6 +103,11 @@ impl HeartbeatTask {
warn!("Heartbeat task started multiple times");
return Ok(());
}
self.create_streams().await
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -231,6 +236,8 @@ impl HeartbeatTask {
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
}
info!("flownode heartbeat task stopped.");
});
}
@@ -274,7 +281,7 @@ impl HeartbeatTask {
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.start().await.is_ok() {
if self.create_streams().await.is_ok() {
break;
}
}

View File

@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = key.into();
let key = (&key).into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,

View File

@@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
@@ -151,6 +152,8 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -192,6 +195,7 @@ impl Default for MetasrvOptions {
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
}
}
}
@@ -442,6 +446,10 @@ impl Metasrv {
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
self.options.node_max_idle_time,
self.in_memory.clone(),
)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}

View File

@@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv {
};
if pusher_id.is_none() {
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
pusher_id =
Some(register_pusher(&handler_group, header, tx.clone()).await);
}
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await
@@ -173,13 +175,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> Option<PusherId> {
) -> PusherId {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(pusher_id, pusher).await;
Some(pusher_id)
pusher_id
}
#[cfg(test)]

View File

@@ -583,7 +583,8 @@ impl HistogramFoldStream {
.expect("field column should not be nullable");
counters.push(counter);
}
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
@@ -672,7 +673,7 @@ impl HistogramFoldStream {
if bucket.len() <= 1 {
return Ok(f64::NAN);
}
if *bucket.last().unwrap() != f64::INFINITY {
if bucket.last().unwrap().is_finite() {
return Err(DataFusionError::Execution(
"last bucket should be +Inf".to_string(),
));
@@ -692,8 +693,8 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
let total = *counter.last().unwrap();
let expected_pos = total * quantile;

View File

@@ -21,6 +21,7 @@ mod idelta;
mod predict_linear;
mod quantile;
mod resets;
mod round;
#[cfg(test)]
mod test_util;
@@ -39,6 +40,7 @@ pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub use round::Round;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -0,0 +1,105 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use datatypes::compute;
use crate::functions::extract_array;
pub struct Round {
nearest: f64,
}
impl Round {
fn new(nearest: f64) -> Self {
Self { nearest }
}
pub const fn name() -> &'static str {
"prom_round"
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Immutable,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
let value_array = extract_array(&input[0])?;
if self.nearest == 0.0 {
let values = value_array.as_primitive::<Float64Type>();
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
Ok(ColumnarValue::Array(Arc::new(result) as _))
} else {
let values = value_array.as_primitive::<Float64Type>();
let nearest = self.nearest;
let result =
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
Ok(ColumnarValue::Array(Arc::new(result) as _))
}
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::Float64Array;
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let result = round_udf.invoke_batch(&input, 1).unwrap();
let result_array = extract_array(&result).unwrap();
assert_eq!(result_array.len(), 1);
assert_eq!(
result_array.as_primitive::<Float64Type>().values(),
&expected
);
}
#[test]
fn test_round() {
test_round_f64(vec![123.456], 0.001, vec![123.456]);
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
test_round_f64(vec![123.456], 0.1, vec![123.5]);
test_round_f64(vec![123.456], 0.0, vec![123.0]);
test_round_f64(vec![123.456], 1.0, vec![123.0]);
test_round_f64(vec![123.456], 10.0, vec![120.0]);
test_round_f64(vec![123.456], 100.0, vec![100.0]);
test_round_f64(vec![123.456], 105.0, vec![105.0]);
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
}
}

View File

@@ -52,7 +52,7 @@ use promql::extension_plan::{
use promql::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::token::TokenType;
@@ -200,10 +200,9 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
}
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
}
.fail()?,
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
@@ -218,6 +217,48 @@ impl PromPlanner {
Ok(res)
}
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
expr, range, step, ..
} = subquery_expr;
let current_interval = self.ctx.interval;
if let Some(step) = step {
self.ctx.interval = step.as_millis() as _;
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
input,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
@@ -441,6 +482,7 @@ impl PromPlanner {
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
modifier,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -1468,6 +1510,20 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"round" => {
let nearest = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
None => 0.0,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
@@ -1674,7 +1730,7 @@ impl PromPlanner {
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
fn_name: "label_join"
}
);
@@ -2121,24 +2177,49 @@ impl PromPlanner {
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let mut left_tag_columns = if only_join_time_index {
vec![]
BTreeSet::new()
} else {
self.ctx
.tag_columns
.iter()
.map(Column::from_name)
.collect::<Vec<_>>()
.cloned()
.collect::<BTreeSet<_>>()
};
let mut right_tag_columns = left_tag_columns.clone();
// apply modifier
if let Some(modifier) = modifier {
// apply label modifier
if let Some(matching) = &modifier.matching {
match matching {
// keeps columns mentioned in `on`
LabelModifier::Include(on) => {
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
right_tag_columns =
right_tag_columns.intersection(&mask).cloned().collect();
}
// removes columns memtioned in `ignoring`
LabelModifier::Exclude(ignoring) => {
// doesn't check existence of label
for label in &ignoring.labels {
let _ = left_tag_columns.remove(label);
let _ = right_tag_columns.remove(label);
}
}
}
}
}
// push time index column if it exists
if let (Some(left_time_index_column), Some(right_time_index_column)) =
(left_time_index_column, right_time_index_column)
{
left_tag_columns.push(Column::from_name(left_time_index_column));
right_tag_columns.push(Column::from_name(right_time_index_column));
left_tag_columns.insert(left_time_index_column);
right_tag_columns.insert(right_time_index_column);
}
let right = LogicalPlanBuilder::from(right)
@@ -2154,7 +2235,16 @@ impl PromPlanner {
.join(
right,
JoinType::Inner,
(left_tag_columns, right_tag_columns),
(
left_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
right_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
),
None,
)
.context(DataFusionPlanningSnafu)?
@@ -3340,6 +3430,59 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn test_hash_join() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_sum".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
)
.await;
// Should be ok
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
SubqueryAlias: http_server_requests_seconds_sum
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_sum
SubqueryAlias: http_server_requests_seconds_count
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_count"#;
assert_eq!(plan.to_string(), expected);
}
#[tokio::test]
async fn test_nested_histogram_quantile() {
let mut eval_stmt = EvalStmt {

View File

@@ -0,0 +1,81 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------+-------+
drop table cache_hit;
Affected Rows: 0

View File

@@ -0,0 +1,30 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
drop table cache_hit;

View File

@@ -638,3 +638,78 @@ drop table cache_miss;
Affected Rows: 0
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
Affected Rows: 4
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
drop table cache_hit_with_null_label;
Affected Rows: 0
drop table cache_miss_with_null_label;
Affected Rows: 0

View File

@@ -295,3 +295,45 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
drop table cache_hit;
drop table cache_miss;
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
drop table cache_hit_with_null_label;
drop table cache_miss_with_null_label;

View File

@@ -295,3 +295,40 @@ drop table histogram3_bucket;
Affected Rows: 0
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
Affected Rows: 0
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
Affected Rows: 7
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
+---------------------+---+-----+
| ts | s | val |
+---------------------+---+-----+
| 1970-01-01T00:48:20 | a | 5.0 |
| 1970-01-01T00:50:00 | a | 5.0 |
+---------------------+---+-----+
drop table histogram4_bucket;
Affected Rows: 0

View File

@@ -163,3 +163,27 @@ insert into histogram3_bucket values
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
drop table histogram3_bucket;
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
drop table histogram4_bucket;

View File

@@ -0,0 +1,65 @@
create table metric_total (
ts timestamp time index,
val double,
);
Affected Rows: 0
insert into metric_total values
(0, 1),
(10000, 2);
Affected Rows: 2
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 3.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 4.0 |
+---------------------+----------------------------------+
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:00 | 10.0 |
+---------------------+----------------------------------+
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:59 | 2.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
drop table metric_total;
Affected Rows: 0

View File

@@ -0,0 +1,22 @@
create table metric_total (
ts timestamp time index,
val double,
);
insert into metric_total values
(0, 1),
(10000, 2);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
drop table metric_total;