mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 08:29:59 +00:00
Compare commits
6 Commits
recording_
...
v0.12.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
904d560175 | ||
|
|
765d1277ee | ||
|
|
ccf42a9d97 | ||
|
|
71e2fb895f | ||
|
|
c9671fd669 | ||
|
|
b5efc75aab |
@@ -319,6 +319,7 @@
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
| `use_memory_store` | Bool | `false` | Store data in memory. |
|
||||
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
|
||||
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
|
||||
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
|
||||
@@ -50,6 +50,9 @@ use_memory_store = false
|
||||
## - Using shared storage (e.g., s3).
|
||||
enable_region_failover = false
|
||||
|
||||
## Max allowed idle time before removing node info from metasrv memory.
|
||||
node_max_idle_time = "24hours"
|
||||
|
||||
## Whether to enable greptimedb telemetry. Enabled by default.
|
||||
#+ enable_telemetry = true
|
||||
|
||||
|
||||
@@ -57,12 +57,10 @@ pub trait ClusterInfo {
|
||||
}
|
||||
|
||||
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
|
||||
///
|
||||
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
|
||||
/// a `cluster_id`, it serves multiple clusters.
|
||||
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
|
||||
pub struct NodeInfoKey {
|
||||
/// The cluster id.
|
||||
// todo(hl): remove cluster_id as it is not assigned anywhere.
|
||||
pub cluster_id: ClusterId,
|
||||
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
|
||||
pub role: Role,
|
||||
@@ -232,8 +230,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NodeInfoKey> for Vec<u8> {
|
||||
fn from(key: NodeInfoKey) -> Self {
|
||||
impl From<&NodeInfoKey> for Vec<u8> {
|
||||
fn from(key: &NodeInfoKey) -> Self {
|
||||
format!(
|
||||
"{}-{}-{}-{}",
|
||||
CLUSTER_NODE_INFO_PREFIX,
|
||||
@@ -315,7 +313,7 @@ mod tests {
|
||||
node_id: 2,
|
||||
};
|
||||
|
||||
let key_bytes: Vec<u8> = key.into();
|
||||
let key_bytes: Vec<u8> = (&key).into();
|
||||
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
|
||||
|
||||
assert_eq!(1, new_key.cluster_id);
|
||||
|
||||
@@ -34,6 +34,7 @@ pub mod kv_backend;
|
||||
pub mod leadership_notifier;
|
||||
pub mod lock_key;
|
||||
pub mod metrics;
|
||||
pub mod node_expiry_listener;
|
||||
pub mod node_manager;
|
||||
pub mod peer;
|
||||
pub mod range_stream;
|
||||
|
||||
152
src/common/meta/src/node_expiry_listener.rs
Normal file
152
src/common/meta/src/node_expiry_listener.rs
Normal file
@@ -0,0 +1,152 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
|
||||
use crate::cluster::{NodeInfo, NodeInfoKey};
|
||||
use crate::error;
|
||||
use crate::kv_backend::ResettableKvBackendRef;
|
||||
use crate::leadership_notifier::LeadershipChangeListener;
|
||||
use crate::rpc::store::RangeRequest;
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
/// [NodeExpiryListener] periodically checks all node info in memory and removes
|
||||
/// expired node info to prevent memory leak.
|
||||
pub struct NodeExpiryListener {
|
||||
handle: Mutex<Option<JoinHandle<()>>>,
|
||||
max_idle_time: Duration,
|
||||
in_memory: ResettableKvBackendRef,
|
||||
}
|
||||
|
||||
impl Drop for NodeExpiryListener {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeExpiryListener {
|
||||
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
|
||||
Self {
|
||||
handle: Mutex::new(None),
|
||||
max_idle_time,
|
||||
in_memory,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(&self) {
|
||||
let mut handle = self.handle.lock().unwrap();
|
||||
if handle.is_none() {
|
||||
let in_memory = self.in_memory.clone();
|
||||
|
||||
let max_idle_time = self.max_idle_time;
|
||||
let ticker_loop = tokio::spawn(async move {
|
||||
// Run clean task every minute.
|
||||
let mut interval = interval(Duration::from_secs(60));
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
|
||||
error!(e; "Failed to clean expired node");
|
||||
}
|
||||
}
|
||||
});
|
||||
*handle = Some(ticker_loop);
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
if let Some(handle) = self.handle.lock().unwrap().take() {
|
||||
handle.abort();
|
||||
info!("Node expiry listener stopped")
|
||||
}
|
||||
}
|
||||
|
||||
/// Cleans expired nodes from memory.
|
||||
async fn clean_expired_nodes(
|
||||
in_memory: &ResettableKvBackendRef,
|
||||
max_idle_time: Duration,
|
||||
) -> error::Result<()> {
|
||||
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
|
||||
for key in node_keys {
|
||||
let key_bytes: Vec<u8> = (&key).into();
|
||||
if let Err(e) = in_memory.delete(&key_bytes, false).await {
|
||||
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
|
||||
} else {
|
||||
debug!("Deleted expired node key: {:?}", key);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lists expired nodes that have been inactive more than `max_idle_time`.
|
||||
async fn list_expired_nodes(
|
||||
in_memory: &ResettableKvBackendRef,
|
||||
max_idle_time: Duration,
|
||||
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
|
||||
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
|
||||
let req = RangeRequest::new().with_prefix(prefix);
|
||||
let current_time_millis = common_time::util::current_time_millis();
|
||||
let resp = in_memory.range(req).await?;
|
||||
Ok(resp
|
||||
.kvs
|
||||
.into_iter()
|
||||
.filter_map(move |KeyValue { key, value }| {
|
||||
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
|
||||
warn!(e; "Unrecognized node info value");
|
||||
}) else {
|
||||
return None;
|
||||
};
|
||||
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
|
||||
{
|
||||
NodeInfoKey::try_from(key)
|
||||
.inspect_err(|e| {
|
||||
warn!(e; "Unrecognized node info key: {:?}", info.peer);
|
||||
})
|
||||
.ok()
|
||||
.inspect(|node_key| {
|
||||
debug!("Found expired node: {:?}", node_key);
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LeadershipChangeListener for NodeExpiryListener {
|
||||
fn name(&self) -> &str {
|
||||
"NodeExpiryListener"
|
||||
}
|
||||
|
||||
async fn on_leader_start(&self) -> error::Result<()> {
|
||||
self.start().await;
|
||||
info!(
|
||||
"On leader start, node expiry listener started with max idle time: {:?}",
|
||||
self.max_idle_time
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_leader_stop(&self) -> error::Result<()> {
|
||||
self.stop();
|
||||
info!("On leader stop, node expiry listener stopped");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -32,5 +32,5 @@ pub mod types;
|
||||
pub mod value;
|
||||
pub mod vectors;
|
||||
|
||||
pub use arrow;
|
||||
pub use arrow::{self, compute};
|
||||
pub use error::{Error, Result};
|
||||
|
||||
@@ -103,6 +103,11 @@ impl HeartbeatTask {
|
||||
warn!("Heartbeat task started multiple times");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.create_streams().await
|
||||
}
|
||||
|
||||
async fn create_streams(&self) -> Result<(), Error> {
|
||||
info!("Start to establish the heartbeat connection to metasrv.");
|
||||
let (req_sender, resp_stream) = self
|
||||
.meta_client
|
||||
@@ -231,6 +236,8 @@ impl HeartbeatTask {
|
||||
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
|
||||
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
|
||||
}
|
||||
|
||||
info!("flownode heartbeat task stopped.");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -274,7 +281,7 @@ impl HeartbeatTask {
|
||||
|
||||
info!("Try to re-establish the heartbeat connection to metasrv.");
|
||||
|
||||
if self.start().await.is_ok() {
|
||||
if self.create_streams().await.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
|
||||
}
|
||||
|
||||
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
|
||||
let key = key.into();
|
||||
let key = (&key).into();
|
||||
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
|
||||
let put_req = PutRequest {
|
||||
key,
|
||||
|
||||
@@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
|
||||
use common_meta::leadership_notifier::{
|
||||
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
|
||||
};
|
||||
use common_meta::node_expiry_listener::NodeExpiryListener;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::region_keeper::MemoryRegionKeeperRef;
|
||||
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
|
||||
@@ -151,6 +152,8 @@ pub struct MetasrvOptions {
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
|
||||
pub meta_election_lock_id: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub node_max_idle_time: Duration,
|
||||
}
|
||||
|
||||
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
|
||||
@@ -192,6 +195,7 @@ impl Default for MetasrvOptions {
|
||||
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
|
||||
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -442,6 +446,10 @@ impl Metasrv {
|
||||
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
|
||||
leadership_change_notifier
|
||||
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
|
||||
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
|
||||
self.options.node_max_idle_time,
|
||||
self.in_memory.clone(),
|
||||
)));
|
||||
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
|
||||
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
|
||||
}
|
||||
|
||||
@@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
};
|
||||
|
||||
if pusher_id.is_none() {
|
||||
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
|
||||
pusher_id =
|
||||
Some(register_pusher(&handler_group, header, tx.clone()).await);
|
||||
}
|
||||
if let Some(k) = &pusher_id {
|
||||
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
|
||||
} else {
|
||||
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
|
||||
}
|
||||
|
||||
let res = handler_group
|
||||
.handle(req, ctx.clone())
|
||||
.await
|
||||
@@ -173,13 +175,13 @@ async fn register_pusher(
|
||||
handler_group: &HeartbeatHandlerGroup,
|
||||
header: &RequestHeader,
|
||||
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
|
||||
) -> Option<PusherId> {
|
||||
) -> PusherId {
|
||||
let role = header.role();
|
||||
let id = get_node_id(header);
|
||||
let pusher_id = PusherId::new(role, id);
|
||||
let pusher = Pusher::new(sender, header);
|
||||
handler_group.register_pusher(pusher_id, pusher).await;
|
||||
Some(pusher_id)
|
||||
pusher_id
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -583,7 +583,8 @@ impl HistogramFoldStream {
|
||||
.expect("field column should not be nullable");
|
||||
counters.push(counter);
|
||||
}
|
||||
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
|
||||
// ignore invalid data
|
||||
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
|
||||
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
|
||||
cursor += bucket_num;
|
||||
remaining_rows -= bucket_num;
|
||||
@@ -672,7 +673,7 @@ impl HistogramFoldStream {
|
||||
if bucket.len() <= 1 {
|
||||
return Ok(f64::NAN);
|
||||
}
|
||||
if *bucket.last().unwrap() != f64::INFINITY {
|
||||
if bucket.last().unwrap().is_finite() {
|
||||
return Err(DataFusionError::Execution(
|
||||
"last bucket should be +Inf".to_string(),
|
||||
));
|
||||
@@ -692,8 +693,8 @@ impl HistogramFoldStream {
|
||||
}
|
||||
|
||||
// check input value
|
||||
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
|
||||
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
|
||||
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
|
||||
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
|
||||
|
||||
let total = *counter.last().unwrap();
|
||||
let expected_pos = total * quantile;
|
||||
|
||||
@@ -21,6 +21,7 @@ mod idelta;
|
||||
mod predict_linear;
|
||||
mod quantile;
|
||||
mod resets;
|
||||
mod round;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
@@ -39,6 +40,7 @@ pub use idelta::IDelta;
|
||||
pub use predict_linear::PredictLinear;
|
||||
pub use quantile::QuantileOverTime;
|
||||
pub use resets::Resets;
|
||||
pub use round::Round;
|
||||
|
||||
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
|
||||
if let ColumnarValue::Array(array) = columnar_value {
|
||||
|
||||
105
src/promql/src/functions/round.rs
Normal file
105
src/promql/src/functions/round.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
|
||||
use datatypes::arrow::array::AsArray;
|
||||
use datatypes::arrow::datatypes::{DataType, Float64Type};
|
||||
use datatypes::compute;
|
||||
|
||||
use crate::functions::extract_array;
|
||||
|
||||
pub struct Round {
|
||||
nearest: f64,
|
||||
}
|
||||
|
||||
impl Round {
|
||||
fn new(nearest: f64) -> Self {
|
||||
Self { nearest }
|
||||
}
|
||||
|
||||
pub const fn name() -> &'static str {
|
||||
"prom_round"
|
||||
}
|
||||
|
||||
fn input_type() -> Vec<DataType> {
|
||||
vec![DataType::Float64]
|
||||
}
|
||||
|
||||
pub fn return_type() -> DataType {
|
||||
DataType::Float64
|
||||
}
|
||||
|
||||
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
|
||||
create_udf(
|
||||
Self::name(),
|
||||
Self::input_type(),
|
||||
Self::return_type(),
|
||||
Volatility::Immutable,
|
||||
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
|
||||
)
|
||||
}
|
||||
|
||||
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
|
||||
assert_eq!(input.len(), 1);
|
||||
|
||||
let value_array = extract_array(&input[0])?;
|
||||
|
||||
if self.nearest == 0.0 {
|
||||
let values = value_array.as_primitive::<Float64Type>();
|
||||
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
|
||||
Ok(ColumnarValue::Array(Arc::new(result) as _))
|
||||
} else {
|
||||
let values = value_array.as_primitive::<Float64Type>();
|
||||
let nearest = self.nearest;
|
||||
let result =
|
||||
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
|
||||
Ok(ColumnarValue::Array(Arc::new(result) as _))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::arrow::array::Float64Array;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
|
||||
let round_udf = Round::scalar_udf(nearest);
|
||||
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
|
||||
let result = round_udf.invoke_batch(&input, 1).unwrap();
|
||||
let result_array = extract_array(&result).unwrap();
|
||||
assert_eq!(result_array.len(), 1);
|
||||
assert_eq!(
|
||||
result_array.as_primitive::<Float64Type>().values(),
|
||||
&expected
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round() {
|
||||
test_round_f64(vec![123.456], 0.001, vec![123.456]);
|
||||
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
|
||||
test_round_f64(vec![123.456], 0.1, vec![123.5]);
|
||||
test_round_f64(vec![123.456], 0.0, vec![123.0]);
|
||||
test_round_f64(vec![123.456], 1.0, vec![123.0]);
|
||||
test_round_f64(vec![123.456], 10.0, vec![120.0]);
|
||||
test_round_f64(vec![123.456], 100.0, vec![100.0]);
|
||||
test_round_f64(vec![123.456], 105.0, vec![105.0]);
|
||||
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
|
||||
}
|
||||
}
|
||||
@@ -52,7 +52,7 @@ use promql::extension_plan::{
|
||||
use promql::functions::{
|
||||
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
|
||||
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
|
||||
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
|
||||
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
|
||||
};
|
||||
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
|
||||
use promql_parser::parser::token::TokenType;
|
||||
@@ -200,10 +200,9 @@ impl PromPlanner {
|
||||
PromExpr::Paren(ParenExpr { expr }) => {
|
||||
self.prom_expr_to_plan(expr, session_state).await?
|
||||
}
|
||||
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
|
||||
name: "Prom Subquery",
|
||||
PromExpr::Subquery(expr) => {
|
||||
self.prom_subquery_expr_to_plan(session_state, expr).await?
|
||||
}
|
||||
.fail()?,
|
||||
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
|
||||
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
|
||||
PromExpr::VectorSelector(selector) => {
|
||||
@@ -218,6 +217,48 @@ impl PromPlanner {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn prom_subquery_expr_to_plan(
|
||||
&mut self,
|
||||
session_state: &SessionState,
|
||||
subquery_expr: &SubqueryExpr,
|
||||
) -> Result<LogicalPlan> {
|
||||
let SubqueryExpr {
|
||||
expr, range, step, ..
|
||||
} = subquery_expr;
|
||||
|
||||
let current_interval = self.ctx.interval;
|
||||
if let Some(step) = step {
|
||||
self.ctx.interval = step.as_millis() as _;
|
||||
}
|
||||
let current_start = self.ctx.start;
|
||||
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
|
||||
let input = self.prom_expr_to_plan(expr, session_state).await?;
|
||||
self.ctx.interval = current_interval;
|
||||
self.ctx.start = current_start;
|
||||
|
||||
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
|
||||
let range_ms = range.as_millis() as _;
|
||||
self.ctx.range = Some(range_ms);
|
||||
|
||||
let manipulate = RangeManipulate::new(
|
||||
self.ctx.start,
|
||||
self.ctx.end,
|
||||
self.ctx.interval,
|
||||
range_ms,
|
||||
self.ctx
|
||||
.time_index_column
|
||||
.clone()
|
||||
.expect("time index should be set in `setup_context`"),
|
||||
self.ctx.field_columns.clone(),
|
||||
input,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
Ok(LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(manipulate),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn prom_aggr_expr_to_plan(
|
||||
&mut self,
|
||||
session_state: &SessionState,
|
||||
@@ -441,6 +482,7 @@ impl PromPlanner {
|
||||
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
|
||||
// under this case we only join on time index
|
||||
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
|
||||
modifier,
|
||||
)?;
|
||||
let join_plan_schema = join_plan.schema().clone();
|
||||
|
||||
@@ -1468,6 +1510,20 @@ impl PromPlanner {
|
||||
|
||||
ScalarFunc::GeneratedExpr
|
||||
}
|
||||
"round" => {
|
||||
let nearest = match other_input_exprs.pop_front() {
|
||||
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
|
||||
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
|
||||
None => 0.0,
|
||||
other => UnexpectedPlanExprSnafu {
|
||||
desc: format!("expected f64 literal as t, but found {:?}", other),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
|
||||
}
|
||||
|
||||
_ => {
|
||||
if let Some(f) = session_state.scalar_functions().get(func.name) {
|
||||
ScalarFunc::DataFusionBuiltin(f.clone())
|
||||
@@ -1674,7 +1730,7 @@ impl PromPlanner {
|
||||
ensure!(
|
||||
!src_labels.is_empty(),
|
||||
FunctionInvalidArgumentSnafu {
|
||||
fn_name: "label_join",
|
||||
fn_name: "label_join"
|
||||
}
|
||||
);
|
||||
|
||||
@@ -2121,24 +2177,49 @@ impl PromPlanner {
|
||||
left_time_index_column: Option<String>,
|
||||
right_time_index_column: Option<String>,
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> Result<LogicalPlan> {
|
||||
let mut left_tag_columns = if only_join_time_index {
|
||||
vec![]
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
self.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let mut right_tag_columns = left_tag_columns.clone();
|
||||
|
||||
// apply modifier
|
||||
if let Some(modifier) = modifier {
|
||||
// apply label modifier
|
||||
if let Some(matching) = &modifier.matching {
|
||||
match matching {
|
||||
// keeps columns mentioned in `on`
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns =
|
||||
right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
// removes columns memtioned in `ignoring`
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
// doesn't check existence of label
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// push time index column if it exists
|
||||
if let (Some(left_time_index_column), Some(right_time_index_column)) =
|
||||
(left_time_index_column, right_time_index_column)
|
||||
{
|
||||
left_tag_columns.push(Column::from_name(left_time_index_column));
|
||||
right_tag_columns.push(Column::from_name(right_time_index_column));
|
||||
left_tag_columns.insert(left_time_index_column);
|
||||
right_tag_columns.insert(right_time_index_column);
|
||||
}
|
||||
|
||||
let right = LogicalPlanBuilder::from(right)
|
||||
@@ -2154,7 +2235,16 @@ impl PromPlanner {
|
||||
.join(
|
||||
right,
|
||||
JoinType::Inner,
|
||||
(left_tag_columns, right_tag_columns),
|
||||
(
|
||||
left_tag_columns
|
||||
.into_iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>(),
|
||||
right_tag_columns
|
||||
.into_iter()
|
||||
.map(Column::from_name)
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
@@ -3340,6 +3430,59 @@ mod test {
|
||||
indie_query_plan_compare(query, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hash_join() {
|
||||
let mut eval_stmt = EvalStmt {
|
||||
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
|
||||
|
||||
let prom_expr = parser::parse(case).unwrap();
|
||||
eval_stmt.expr = prom_expr;
|
||||
let table_provider = build_test_table_provider_with_fields(
|
||||
&[
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"http_server_requests_seconds_sum".to_string(),
|
||||
),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"http_server_requests_seconds_count".to_string(),
|
||||
),
|
||||
],
|
||||
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
|
||||
)
|
||||
.await;
|
||||
// Should be ok
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
|
||||
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
|
||||
SubqueryAlias: http_server_requests_seconds_sum
|
||||
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
|
||||
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
|
||||
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
|
||||
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
|
||||
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
|
||||
TableScan: http_server_requests_seconds_sum
|
||||
SubqueryAlias: http_server_requests_seconds_count
|
||||
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
|
||||
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
|
||||
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
|
||||
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
|
||||
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
|
||||
TableScan: http_server_requests_seconds_count"#;
|
||||
assert_eq!(plan.to_string(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nested_histogram_quantile() {
|
||||
let mut eval_stmt = EvalStmt {
|
||||
|
||||
81
tests/cases/standalone/common/promql/round_fn.result
Normal file
81
tests/cases/standalone/common/promql/round_fn.result
Normal file
@@ -0,0 +1,81 @@
|
||||
create table cache_hit (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
greptime_value double,
|
||||
primary key (job)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into cache_hit values
|
||||
(3000, "read", 123.45),
|
||||
(3000, "write", 234.567),
|
||||
(4000, "read", 345.678),
|
||||
(4000, "write", 456.789);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.01);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.45 | read |
|
||||
| 1970-01-01T00:00:03 | 234.57 | write |
|
||||
| 1970-01-01T00:00:04 | 345.68 | read |
|
||||
| 1970-01-01T00:00:04 | 456.79 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.1);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.5 | read |
|
||||
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
|
||||
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
|
||||
| 1970-01-01T00:00:04 | 456.8 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 1.0);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.0 | read |
|
||||
| 1970-01-01T00:00:03 | 235.0 | write |
|
||||
| 1970-01-01T00:00:04 | 346.0 | read |
|
||||
| 1970-01-01T00:00:04 | 457.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.0 | read |
|
||||
| 1970-01-01T00:00:03 | 235.0 | write |
|
||||
| 1970-01-01T00:00:04 | 346.0 | read |
|
||||
| 1970-01-01T00:00:04 | 457.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 10.0);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 120.0 | read |
|
||||
| 1970-01-01T00:00:03 | 230.0 | write |
|
||||
| 1970-01-01T00:00:04 | 350.0 | read |
|
||||
| 1970-01-01T00:00:04 | 460.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
drop table cache_hit;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
30
tests/cases/standalone/common/promql/round_fn.sql
Normal file
30
tests/cases/standalone/common/promql/round_fn.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
|
||||
create table cache_hit (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
greptime_value double,
|
||||
primary key (job)
|
||||
);
|
||||
|
||||
insert into cache_hit values
|
||||
(3000, "read", 123.45),
|
||||
(3000, "write", 234.567),
|
||||
(4000, "read", 345.678),
|
||||
(4000, "write", 456.789);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.01);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.1);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 1.0);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 10.0);
|
||||
|
||||
drop table cache_hit;
|
||||
@@ -638,3 +638,78 @@ drop table cache_miss;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
create table cache_hit_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
create table cache_miss_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into cache_hit_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 3.0),
|
||||
(4000, "write", null, 4.0);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
insert into cache_miss_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 1.0),
|
||||
(4000, "write", null, 2.0);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
|
||||
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
drop table cache_hit_with_null_label;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
drop table cache_miss_with_null_label;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -295,3 +295,45 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
|
||||
drop table cache_hit;
|
||||
|
||||
drop table cache_miss;
|
||||
|
||||
create table cache_hit_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
create table cache_miss_with_null_label (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
null_label string null,
|
||||
greptime_value double,
|
||||
primary key (job, null_label)
|
||||
);
|
||||
|
||||
insert into cache_hit_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 3.0),
|
||||
(4000, "write", null, 4.0);
|
||||
|
||||
insert into cache_miss_with_null_label values
|
||||
(3000, "read", null, 1.0),
|
||||
(3000, "write", null, 2.0),
|
||||
(4000, "read", null, 1.0),
|
||||
(4000, "write", null, 2.0);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
-- null!=null, so it will returns the empty set.
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
|
||||
|
||||
drop table cache_hit_with_null_label;
|
||||
|
||||
drop table cache_miss_with_null_label;
|
||||
|
||||
@@ -295,3 +295,40 @@ drop table histogram3_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test with invalid data (unaligned buckets)
|
||||
create table histogram4_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into histogram4_bucket values
|
||||
(2900000, "0.1", "a", 0),
|
||||
(2900000, "1", "a", 10),
|
||||
(2900000, "5", "a", 20),
|
||||
(2900000, "+Inf", "a", 150),
|
||||
(3000000, "0.1", "a", 50),
|
||||
(3000000, "1", "a", 70),
|
||||
(3000000, "5", "a", 120),
|
||||
-- INF here is missing
|
||||
;
|
||||
|
||||
Affected Rows: 7
|
||||
|
||||
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
|
||||
|
||||
+---------------------+---+-----+
|
||||
| ts | s | val |
|
||||
+---------------------+---+-----+
|
||||
| 1970-01-01T00:48:20 | a | 5.0 |
|
||||
| 1970-01-01T00:50:00 | a | 5.0 |
|
||||
+---------------------+---+-----+
|
||||
|
||||
drop table histogram4_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -163,3 +163,27 @@ insert into histogram3_bucket values
|
||||
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
|
||||
|
||||
drop table histogram3_bucket;
|
||||
|
||||
-- test with invalid data (unaligned buckets)
|
||||
create table histogram4_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
s string,
|
||||
val double,
|
||||
primary key (s, le),
|
||||
);
|
||||
|
||||
insert into histogram4_bucket values
|
||||
(2900000, "0.1", "a", 0),
|
||||
(2900000, "1", "a", 10),
|
||||
(2900000, "5", "a", 20),
|
||||
(2900000, "+Inf", "a", 150),
|
||||
(3000000, "0.1", "a", 50),
|
||||
(3000000, "1", "a", 70),
|
||||
(3000000, "5", "a", 120),
|
||||
-- INF here is missing
|
||||
;
|
||||
|
||||
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
|
||||
|
||||
drop table histogram4_bucket;
|
||||
|
||||
65
tests/cases/standalone/common/promql/subquery.result
Normal file
65
tests/cases/standalone/common/promql/subquery.result
Normal file
@@ -0,0 +1,65 @@
|
||||
create table metric_total (
|
||||
ts timestamp time index,
|
||||
val double,
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into metric_total values
|
||||
(0, 1),
|
||||
(10000, 2);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
|
||||
|
||||
+---------------------+----------------------------------+
|
||||
| ts | prom_sum_over_time(ts_range,val) |
|
||||
+---------------------+----------------------------------+
|
||||
| 1970-01-01T00:00:10 | 3.0 |
|
||||
+---------------------+----------------------------------+
|
||||
|
||||
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
|
||||
|
||||
+---------------------+----------------------------------+
|
||||
| ts | prom_sum_over_time(ts_range,val) |
|
||||
+---------------------+----------------------------------+
|
||||
| 1970-01-01T00:00:10 | 4.0 |
|
||||
+---------------------+----------------------------------+
|
||||
|
||||
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
|
||||
|
||||
+---------------------+----------------------------------+
|
||||
| ts | prom_sum_over_time(ts_range,val) |
|
||||
+---------------------+----------------------------------+
|
||||
| 1970-01-01T00:05:00 | 10.0 |
|
||||
+---------------------+----------------------------------+
|
||||
|
||||
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
|
||||
|
||||
+---------------------+----------------------------------+
|
||||
| ts | prom_sum_over_time(ts_range,val) |
|
||||
+---------------------+----------------------------------+
|
||||
| 1970-01-01T00:05:59 | 2.0 |
|
||||
+---------------------+----------------------------------+
|
||||
|
||||
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
|
||||
|
||||
+---------------------+----------------------------+
|
||||
| ts | prom_rate(ts_range,val,ts) |
|
||||
+---------------------+----------------------------+
|
||||
| 1970-01-01T00:00:10 | 0.1 |
|
||||
+---------------------+----------------------------+
|
||||
|
||||
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
|
||||
|
||||
+---------------------+----------------------------+
|
||||
| ts | prom_rate(ts_range,val,ts) |
|
||||
+---------------------+----------------------------+
|
||||
| 1970-01-01T00:00:20 | 0.06666666666666667 |
|
||||
+---------------------+----------------------------+
|
||||
|
||||
drop table metric_total;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
22
tests/cases/standalone/common/promql/subquery.sql
Normal file
22
tests/cases/standalone/common/promql/subquery.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
create table metric_total (
|
||||
ts timestamp time index,
|
||||
val double,
|
||||
);
|
||||
|
||||
insert into metric_total values
|
||||
(0, 1),
|
||||
(10000, 2);
|
||||
|
||||
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
|
||||
|
||||
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
|
||||
|
||||
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
|
||||
|
||||
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
|
||||
|
||||
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
|
||||
|
||||
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
|
||||
|
||||
drop table metric_total;
|
||||
Reference in New Issue
Block a user