Compare commits

..

6 Commits

Author SHA1 Message Date
Weny Xu
904d560175 feat(promql-planner): introduce vector matching binary operation (#5578)
* feat(promql-planner): support vector matching for binary operation

* test: add sqlness tests
2025-02-27 07:39:19 +00:00
Lei, HUANG
765d1277ee fix(metasrv): clean expired nodes in memory (#5592)
* fix/frontend-node-state: Refactor NodeInfoKey and Context Handling in Meta Server

 • Removed unused cluster_id from NodeInfoKey struct.
 • Updated HeartbeatHandlerGroup to return Context alongside HeartbeatResponse.
 • Added current_node_info to Context for tracking node information.
 • Implemented on_node_disconnect in Context to handle node disconnection events, specifically for Frontend roles.
 • Adjusted register_pusher function to return PusherId directly.
 • Updated tests to accommodate changes in Context structure.

* fix/frontend-node-state: Refactor Heartbeat Handler Context Management

Refactored the HeartbeatHandlerGroup::handle method to use a mutable reference for Context instead of passing it by value. This change simplifies the
context management by eliminating the need to return the context with the response. Updated the Metasrv implementation to align with this new context
handling approach, improving code clarity and reducing unnecessary context cloning.

* revert: clean cluster info on disconnect

* fix/frontend-node-state: Add Frontend Expiry Listener and Update NodeInfoKey Conversion

 • Introduced FrontendExpiryListener to manage the expiration of frontend nodes, including its integration with leadership change notifications.
 • Modified NodeInfoKey conversion to use references, enhancing efficiency and consistency across the codebase.
 • Updated collect_cluster_info_handler and metasrv to incorporate the new listener and conversion changes.
 • Added frontend_expiry module to the project structure for better organization and maintainability.

* chore: add config for node expiry

* add some doc

* fix: clippy

* fix/frontend-node-state:
 ### Refactor Node Expiry Handling
 - **Configuration Update**: Removed `node_expiry_tick` from `metasrv.example.toml` and `MetasrvOptions` in `metasrv.rs`.
 - **Module Renaming**: Renamed `frontend_expiry.rs` to `node_expiry_listener.rs` and updated references in `lib.rs`.
 - **Code Refactoring**: Replaced `FrontendExpiryListener` with `NodeExpiryListener` in `node_expiry_listener.rs` and `metasrv.rs`, removing the tick     interval and adjusting logic to use a fixed 60-second interval for node expiry checks.

* fix/frontend-node-state:
 Improve logging in `node_expiry_listener.rs`

 - Enhanced warning message to include peer information when an unrecognized node info key is encountered in `node_expiry_listener.rs`.

* docs: update config docs

* fix/frontend-node-state:
 **Refactor Context Handling in Heartbeat Services**

 - Updated `HeartbeatHandlerGroup` in `handler.rs` to pass `Context` by value instead of by mutable reference, allowing for more flexible context
 management.
 - Modified `Metasrv` implementation in `heartbeat.rs` to clone `Context` when passing to `handle` method, ensuring thread safety and consistency in
 asynchronous operations.
2025-02-27 06:16:36 +00:00
discord9
ccf42a9d97 fix: flow heartbeat retry (#5600)
* fix: flow heartbeat retry

* fix?: not sure if fixed

* chore: per review
2025-02-27 03:58:21 +00:00
Weny Xu
71e2fb895f feat: introduce prom_round fn (#5604)
* feat: introduce `prom_round` fn

* test: add sqlness tests
2025-02-27 03:30:15 +00:00
Ruihang Xia
c9671fd669 feat(promql): implement subquery (#5606)
* feat: initial implement for promql subquery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl and test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:28:04 +00:00
Ruihang Xia
b5efc75aab feat(promql): ignore invalid input in histogram plan (#5607)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:18:20 +00:00
40 changed files with 875 additions and 1426 deletions

View File

@@ -319,6 +319,7 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |

View File

@@ -50,6 +50,9 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true

View File

@@ -16,6 +16,7 @@
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
@@ -33,6 +34,7 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
@@ -317,8 +317,6 @@ impl StartCommand {
Arc::new(executor),
);
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flownode_builder = FlownodeBuilder::new(
opts,
@@ -326,7 +324,6 @@ impl StartCommand {
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
Arc::new(frontend_client),
)
.with_heartbeat_task(heartbeat_task);

View File

@@ -54,10 +54,7 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient,
FrontendInvoker,
};
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -536,16 +533,12 @@ impl StartCommand {
flow: opts.flow.clone(),
..Default::default()
};
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(
flow_builder

View File

@@ -57,12 +57,10 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -232,8 +230,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
}
}
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
@@ -315,7 +313,7 @@ mod tests {
node_id: 2,
};
let key_bytes: Vec<u8> = key.into();
let key_bytes: Vec<u8> = (&key).into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);

View File

@@ -343,7 +343,6 @@ pub enum FlowType {
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const STREAMING: &str = "streaming";
pub const FLOW_TYPE_KEY: &str = "flow_type";
}
impl Default for FlowType {
@@ -399,8 +398,7 @@ impl From<&CreateFlowData> for CreateRequest {
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
req.flow_options.insert("flow_type".to_string(), flow_type);
req
}
}
@@ -432,7 +430,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.collect::<Vec<_>>();
let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
options.insert("flow_type".to_string(), flow_type);
let flow_info = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),

View File

@@ -34,6 +34,7 @@ pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod range_stream;

View File

@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::time::Duration;
use common_telemetry::{debug, error, info, warn};
use tokio::task::JoinHandle;
use tokio::time::{interval, MissedTickBehavior};
use crate::cluster::{NodeInfo, NodeInfoKey};
use crate::error;
use crate::kv_backend::ResettableKvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
/// [NodeExpiryListener] periodically checks all node info in memory and removes
/// expired node info to prevent memory leak.
pub struct NodeExpiryListener {
handle: Mutex<Option<JoinHandle<()>>>,
max_idle_time: Duration,
in_memory: ResettableKvBackendRef,
}
impl Drop for NodeExpiryListener {
fn drop(&mut self) {
self.stop();
}
}
impl NodeExpiryListener {
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
Self {
handle: Mutex::new(None),
max_idle_time,
in_memory,
}
}
async fn start(&self) {
let mut handle = self.handle.lock().unwrap();
if handle.is_none() {
let in_memory = self.in_memory.clone();
let max_idle_time = self.max_idle_time;
let ticker_loop = tokio::spawn(async move {
// Run clean task every minute.
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
error!(e; "Failed to clean expired node");
}
}
});
*handle = Some(ticker_loop);
}
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.abort();
info!("Node expiry listener stopped")
}
}
/// Cleans expired nodes from memory.
async fn clean_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<()> {
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
for key in node_keys {
let key_bytes: Vec<u8> = (&key).into();
if let Err(e) = in_memory.delete(&key_bytes, false).await {
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
} else {
debug!("Deleted expired node key: {:?}", key);
}
}
Ok(())
}
/// Lists expired nodes that have been inactive more than `max_idle_time`.
async fn list_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
let req = RangeRequest::new().with_prefix(prefix);
let current_time_millis = common_time::util::current_time_millis();
let resp = in_memory.range(req).await?;
Ok(resp
.kvs
.into_iter()
.filter_map(move |KeyValue { key, value }| {
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
warn!(e; "Unrecognized node info value");
}) else {
return None;
};
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
{
NodeInfoKey::try_from(key)
.inspect_err(|e| {
warn!(e; "Unrecognized node info key: {:?}", info.peer);
})
.ok()
.inspect(|node_key| {
debug!("Found expired node: {:?}", node_key);
})
} else {
None
}
}))
}
}
#[async_trait::async_trait]
impl LeadershipChangeListener for NodeExpiryListener {
fn name(&self) -> &str {
"NodeExpiryListener"
}
async fn on_leader_start(&self) -> error::Result<()> {
self.start().await;
info!(
"On leader start, node expiry listener started with max idle time: {:?}",
self.max_idle_time
);
Ok(())
}
async fn on_leader_stop(&self) -> error::Result<()> {
self.stop();
info!("On leader stop, node expiry listener stopped");
Ok(())
}
}

View File

@@ -32,5 +32,5 @@ pub mod types;
pub mod value;
pub mod vectors;
pub use arrow;
pub use arrow::{self, compute};
pub use error::{Error, Result};

View File

@@ -49,13 +49,12 @@ pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle};
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::recording_rules::RecordingRuleEngine;
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
mod flownode_impl;
@@ -172,8 +171,6 @@ pub struct FlowWorkerManager {
flush_lock: RwLock<()>,
/// receive a oneshot sender to send state size report
state_report_handler: RwLock<Option<StateReportHandler>>,
/// engine for recording rule
rule_engine: RecordingRuleEngine,
}
/// Building FlownodeManager
@@ -188,7 +185,6 @@ impl FlowWorkerManager {
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
rule_engine: RecordingRuleEngine,
) -> Self {
let srv_map = ManagedTableSource::new(
table_meta.table_info_manager().clone(),
@@ -211,7 +207,6 @@ impl FlowWorkerManager {
node_id,
flush_lock: RwLock::new(()),
state_report_handler: RwLock::new(None),
rule_engine,
}
}
@@ -220,6 +215,25 @@ impl FlowWorkerManager {
self
}
/// Create a flownode manager with one worker
pub fn new_with_workers<'s>(
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
num_workers: usize,
) -> (Self, Vec<Worker<'s>>) {
let mut zelf = Self::new(node_id, query_engine, table_meta);
let workers: Vec<_> = (0..num_workers)
.map(|_| {
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
worker
})
.collect();
(zelf, workers)
}
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(handle);
@@ -737,11 +751,7 @@ pub struct CreateFlowArgs {
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
#[allow(unreachable_code)]
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.remove_flow(flow_id).await;
for handle in self.worker_handles.iter() {
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
@@ -757,10 +767,8 @@ impl FlowWorkerManager {
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments, unreachable_code)]
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.create_flow(args).await;
let CreateFlowArgs {
flow_id,
sink_table_name,

View File

@@ -153,10 +153,7 @@ impl Flownode for FlowWorkerManager {
}
}
#[allow(unreachable_code, unused)]
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
return Ok(Default::default());
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush

View File

@@ -16,7 +16,6 @@
use std::any::Any;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
@@ -157,15 +156,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
Arrow {
#[snafu(source)]
raw: ArrowError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
Datafusion {
#[snafu(source)]
@@ -240,7 +230,6 @@ impl ErrorExt for Error {
match self {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Arrow { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,

View File

@@ -238,7 +238,6 @@ mod test {
for (sql, current, expected) in &testcases {
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await

View File

@@ -14,6 +14,7 @@
//! Send heartbeat from flownode to metasrv
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
@@ -24,7 +25,7 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
@@ -65,6 +66,7 @@ pub struct HeartbeatTask {
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -87,11 +89,25 @@ impl HeartbeatTask {
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
}
pub async fn start(&self) -> Result<(), Error> {
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Heartbeat task started multiple times");
return Ok(());
}
self.create_streams().await
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -114,6 +130,13 @@ impl HeartbeatTask {
pub fn shutdown(&self) {
info!("Close heartbeat task for flownode");
if self
.running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
}
fn new_heartbeat_request(
@@ -258,7 +281,7 @@ impl HeartbeatTask {
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.start().await.is_ok() {
if self.create_streams().await.is_ok() {
break;
}
}

View File

@@ -33,7 +33,6 @@ mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod recording_rules;
mod repr;
mod server;
mod transform;
@@ -44,5 +43,4 @@ mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use recording_rules::FrontendClient;
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};

View File

@@ -28,32 +28,6 @@ lazy_static! {
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_query_time",
"flow rule engine query time",
&["flow_id"],
vec![
0.0,
1.,
3.,
5.,
10.,
20.,
30.,
60.,
2. * 60.,
5. * 60.,
10. * 60.
]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_slow_query",
"flow rule engine slow query",
&["flow_id", "sql", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -1,744 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user
mod engine;
mod frontend_client;
use std::collections::HashSet;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{Column, DFSchema, TableReference};
use datafusion_expr::LogicalPlan;
use datafusion_physical_expr::PhysicalExprRef;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, Vector,
};
pub use engine::RecordingRuleEngine;
pub use frontend_client::FrontendClient;
use query::parser::QueryLanguageParser;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
use crate::Error;
/// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = if optimize {
apply_df_optimizer(plan).await?
} else {
plan
};
Ok(plan)
}
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
/// return `Some("2021-07-01 00:00:00.000")`
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
///
/// Time window expr is a expr that:
/// 1. ref only to a time index column
/// 2. is monotonic increasing
/// 3. show up in GROUP BY clause
///
/// note this plan should only contain one TableScan
pub async fn find_plan_time_window_bound(
plan: &LogicalPlan,
current: Timestamp,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
// TODO(discord9): find the expr that do time window
let catalog_man = engine.engine_state().catalog_manager();
let mut table_name = None;
// first find the table source in the logical plan
plan.apply(|plan| {
let LogicalPlan::TableScan(table_scan) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
table_name = Some(table_scan.table_name.clone());
Ok(TreeNodeRecursion::Stop)
})
.with_context(|_| DatafusionSnafu {
context: format!("Can't find table source in plan {plan:?}"),
})?;
let Some(table_name) = table_name else {
UnexpectedSnafu {
reason: format!("Can't find table source in plan {plan:?}"),
}
.fail()?
};
let current_schema = query_ctx.current_schema();
let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
let schema_name = table_name.schema().unwrap_or(&current_schema);
let table_name = table_name.table();
let Some(table_ref) = catalog_man
.table(catalog_name, schema_name, table_name, Some(&query_ctx))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
else {
UnexpectedSnafu {
reason: format!(
"Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
),
}
.fail()?
};
let schema = &table_ref.table_info().meta.schema;
let ts_index = schema.timestamp_column().context(UnexpectedSnafu {
reason: format!("Can't find timestamp column in table {table_name:?}"),
})?;
let ts_col_name = ts_index.name.clone();
let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
reason: format!(
"Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
),
})?.unit();
let ts_columns: HashSet<_> = HashSet::from_iter(vec![
format!("{catalog_name}.{schema_name}.{table_name}.{ts_col_name}"),
format!("{schema_name}.{table_name}.{ts_col_name}"),
format!("{table_name}.{ts_col_name}"),
format!("{ts_col_name}"),
]);
let ts_columns: HashSet<_> = ts_columns
.into_iter()
.map(Column::from_qualified_name)
.collect();
let ts_columns_ref: HashSet<&Column> = ts_columns.iter().collect();
// find the time window expr which refers to the time index column
let mut time_window_expr: Option<Expr> = None;
let find_time_window_expr = |plan: &LogicalPlan| {
let LogicalPlan::Aggregate(aggregate) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
for group_expr in &aggregate.group_expr {
let refs = group_expr.column_refs();
if refs.len() != 1 {
continue;
}
let ref_col = refs.iter().next().unwrap();
if ts_columns_ref.contains(ref_col) {
time_window_expr = Some(group_expr.clone());
break;
}
}
Ok(TreeNodeRecursion::Stop)
};
plan.apply(find_time_window_expr)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find time window expr in plan {plan:?}"),
})?;
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
ts_col_name.clone(),
ts_index.data_type.as_arrow_type(),
false,
)]));
let df_schema = DFSchema::from_field_specific_qualified_schema(
vec![Some(TableReference::bare(table_name))],
&arrow_schema,
)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
// cast current to ts_index's type
let new_current = current
.convert_to(expected_time_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
})?;
// if no time_window_expr is found, return None
if let Some(time_window_expr) = time_window_expr {
let lower_bound =
find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?;
let upper_bound =
find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?;
Ok((ts_col_name, lower_bound, upper_bound))
} else {
Ok((ts_col_name, None, None))
}
}
/// Find the lower bound of time window in given `expr` and `current` timestamp.
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
fn find_expr_time_window_lower_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
if cur_time_window == current {
return Ok(Some(current));
}
// search to find the lower bound
let mut offset: i64 = 1;
let lower_bound;
let mut upper_bound = Some(current);
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_sub(offset) else {
// no lower bound
return Ok(None);
};
let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
let prev_time_window = eval_ts_to_ts(&phy_expr, df_schema, prev_time_probe)?;
match prev_time_window.cmp(&cur_time_window) {
Ordering::Less => {
lower_bound = Some(prev_time_probe);
break;
}
Ordering::Equal => {
upper_bound = Some(prev_time_probe);
}
Ordering::Greater => {
UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no lower bound
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact lower bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let input_time_unit = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, input_time_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => low = mid + 1,
Ordering::Equal => high = mid,
Ordering::Greater => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
}
}
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit);
Ok(Some(final_lower_bound_for_time_window))
}
/// Find the upper bound for time window expression
fn find_expr_time_window_upper_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
// search to find the lower bound
let mut offset: i64 = 1;
let mut lower_bound = Some(current);
let upper_bound;
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_add(offset) else {
// no upper bound if overflow
return Ok(None);
};
let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?;
match next_time_window.cmp(&cur_time_window) {
Ordering::Less => {UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
Ordering::Equal => {
lower_bound = Some(next_time_probe);
}
Ordering::Greater => {
upper_bound = Some(next_time_probe);
break
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no upper bound if overflow
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact upper bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let output_unit = upper_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, output_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
Ordering::Equal => low = mid + 1,
Ordering::Greater => high = mid,
}
}
let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
Ok(Some(final_upper_bound_for_time_window))
}
fn eval_ts_to_ts(
phy: &PhysicalExprRef,
df_schema: &DFSchema,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let ts_vector = match input_value.unit() {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Millisecond => {
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
};
let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
.with_context(|_| ArrowSnafu {
context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
})?;
let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
})?;
let val = match eval_res {
datafusion_expr::ColumnarValue::Array(array) => {
let ty = array.data_type();
let ty = ConcreteDataType::from_arrow_type(ty);
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
ty.unit()
} else {
return UnexpectedSnafu {
reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"),
}
.fail();
};
match time_unit {
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0),
TimeUnit::Millisecond => {
TimestampMillisecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
}
}
datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert scalar {scalar:?} to value"),
})?,
};
if let Value::Timestamp(ts) = val {
Ok(ts)
} else {
UnexpectedSnafu {
reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"),
}
.fail()?
}
}
// TODO(discord9): a method to found out the precise time window
/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it
#[derive(Debug)]
pub struct AddFilterRewriter {
extra_filter: Expr,
is_rewritten: bool,
}
impl AddFilterRewriter {
fn new(filter: Expr) -> Self {
Self {
extra_filter: filter,
is_rewritten: false,
}
}
}
impl TreeNodeRewriter for AddFilterRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
match node {
LogicalPlan::Filter(mut filter) if !filter.having => {
filter.predicate = filter.predicate.and(self.extra_filter.clone());
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
LogicalPlan::TableScan(_) => {
// add a new filter
let filter =
datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?;
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
_ => Ok(Transformed::no(node)),
}
}
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
let unparser = Unparser::default();
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to unparse logical plan {plan:?}"),
})?;
Ok(sql.to_string())
}
#[cfg(test)]
mod test {
use datafusion_common::tree_node::TreeNode;
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::{sql_to_df_plan, *};
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
(
"SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number"
),
(
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4))"
),
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
)
];
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for (before, after) in testcases {
let sql = before;
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32)));
let plan = plan.rewrite(&mut add_filter).unwrap().data;
let new_sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(after, new_sql);
}
}
#[tokio::test]
async fn test_plan_time_window_lower_bound() {
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let testcases = [
// same alias is not same column
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
),
// complex time window index
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
Timestamp::new(23, TimeUnit::Millisecond),
("ts".to_string(), None, None),
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
),
// time index
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// on spot
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(0, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// different time unit
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23_000_000, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other fields
(
"SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other pks
(
"SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
),
];
for (sql, current, expected, unparsed) in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
.await
.unwrap();
let real =
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
.await
.unwrap();
assert_eq!(expected, real);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let (col_name, lower, upper) = real;
let new_sql = if lower.is_some() {
let to_df_literal = |value| {
let value = Value::from(value);
value.try_to_scalar_value(&value.data_type()).unwrap()
};
let lower = to_df_literal(lower.unwrap());
let upper = to_df_literal(upper.unwrap());
let expr = col(&col_name)
.gt_eq(lit(lower))
.and(col(&col_name).lt_eq(lit(upper)));
let mut add_filter = AddFilterRewriter::new(expr);
let plan = plan.rewrite(&mut add_filter).unwrap().data;
df_plan_to_sql(&plan).unwrap()
} else {
sql.to_string()
};
assert_eq!(unparsed, new_sql);
}
}
}

View File

@@ -1,407 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use common_meta::ddl::create_flow::FlowType;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion_common::tree_node::TreeNode;
use datatypes::value::Value;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{oneshot, RwLock};
use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter};
use crate::adapter::{CreateFlowArgs, FlowId};
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 10 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// TODO(discord9): determine how to configure refresh rate
pub struct RecordingRuleEngine {
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
engine: QueryEngineRef,
}
impl RecordingRuleEngine {
pub fn new(frontend_client: Arc<FrontendClient>, engine: QueryEngineRef) -> Self {
Self {
tasks: Default::default(),
shutdown_txs: Default::default(),
frontend_client,
engine,
}
}
}
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
impl RecordingRuleEngine {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids: _,
create_if_not_exists,
or_replace,
expire_after,
comment: _,
sql,
flow_options,
query_ctx,
} = args;
// or replace logic
{
let is_exist = self.tasks.read().await.contains_key(&flow_id);
match (create_if_not_exists, or_replace, is_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(),
UnexpectedSnafu {
reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}")
}
);
let Some(query_ctx) = query_ctx else {
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
.fail()?
};
let (tx, rx) = oneshot::channel();
let task = RecordingRuleTask::new(
flow_id,
&sql,
expire_after,
sink_table_name,
Arc::new(query_ctx),
rx,
);
let task_inner = task.clone();
let engine = self.engine.clone();
let frontend = self.frontend_client.clone();
// TODO(discord9): also save handle & use time wheel or what for better
let _handle = common_runtime::spawn_global(async move {
match task_inner.start_executing(engine, frontend).await {
Ok(()) => info!("Flow {} shutdown", task_inner.flow_id),
Err(err) => common_telemetry::error!(
"Flow {} encounter unrecoverable error: {err:?}",
task_inner.flow_id
),
}
});
// TODO(discord9): deal with replace logic
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
drop(replaced_old_task_opt);
self.shutdown_txs.write().await.insert(flow_id, tx);
Ok(Some(flow_id))
}
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
reason: format!("Can't found shutdown tx for flow {flow_id}"),
}
.fail()?
};
if tx.send(()).is_err() {
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
flow_id: FlowId,
query: String,
/// in seconds
expire_after: Option<i64>,
sink_table_name: [String; 3],
state: Arc<RwLock<RecordingRuleState>>,
}
impl RecordingRuleTask {
pub fn new(
flow_id: FlowId,
query: &str,
expire_after: Option<i64>,
sink_table_name: [String; 3],
query_ctx: QueryContextRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
flow_id,
query: query.to_string(),
expire_after,
sink_table_name,
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
}
}
}
impl RecordingRuleTask {
/// This should be called in a new tokio task
pub async fn start_executing(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) -> Result<(), Error> {
// only first query don't need upper bound
let mut is_first = true;
loop {
// FIXME(discord9): test if need upper bound also works
let new_query = self
.gen_query_with_time_window(engine.clone(), false)
.await?;
let insert_into = format!(
"INSERT INTO {}.{}.{} {}",
self.sink_table_name[0],
self.sink_table_name[1],
self.sink_table_name[2],
new_query
);
if is_first {
is_first = false;
}
let instant = Instant::now();
let flow_id = self.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.expire_after, peer_addr, &insert_into
);
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let res = db_client.database.sql(&insert_into).await;
drop(timer);
let elapsed = instant.elapsed();
if let Ok(res1) = &res {
debug!(
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
elapsed
);
} else if let Err(res) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &insert_into
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &insert_into
);
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.await
.after_query_exec(elapsed, res.is_ok());
let sleep_until = {
let mut state = self.state.write().await;
match state.shutdown_rx.try_recv() {
Ok(()) => break Ok(()),
Err(TryRecvError::Closed) => {
warn!("Unexpected shutdown flow {flow_id}, shutdown anyway");
break Ok(());
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(None)
};
tokio::time::sleep_until(sleep_until).await;
}
}
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
need_upper_bound: bool,
) -> Result<String, Error> {
let query_ctx = self.state.read().await.query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let low_bound = self
.expire_after
.map(|e| since_the_epoch.as_secs() - e as u64);
let Some(low_bound) = low_bound else {
return Ok(self.query.clone());
};
let low_bound = Timestamp::new_second(low_bound as i64);
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
let (col_name, lower, upper) =
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
.await?;
let new_sql = {
let to_df_literal = |value| -> Result<_, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
};
let lower = lower.map(to_df_literal).transpose()?;
let upper = upper.map(to_df_literal).transpose()?.and_then(|u| {
if need_upper_bound {
Some(u)
} else {
None
}
});
let expr = {
use datafusion_expr::{col, lit};
match (lower, upper) {
(Some(l), Some(u)) => col(&col_name)
.gt_eq(lit(l))
.and(col(&col_name).lt_eq(lit(u))),
(Some(l), None) => col(&col_name).gt_eq(lit(l)),
(None, Some(u)) => col(&col_name).lt(lit(u)),
// no time window, direct return
(None, None) => return Ok(self.query.clone()),
}
};
let mut add_filter = AddFilterRewriter::new(expr);
// make a not optimized plan for clearer unparse
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?;
let plan = plan
.clone()
.rewrite(&mut add_filter)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {plan:?}"),
})?
.data;
df_plan_to_sql(&plan)?
};
Ok(new_sql)
}
}
#[derive(Debug)]
pub struct RecordingRuleState {
query_ctx: QueryContextRef,
/// last query complete time
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
exec_state: ExecState,
shutdown_rx: oneshot::Receiver<()>,
}
impl RecordingRuleState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
exec_state: ExecState::Idle,
shutdown_rx,
}
}
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
self.last_update_time + next_duration
}
}
#[derive(Debug, Clone)]
enum ExecState {
Idle,
Executing,
}

View File

@@ -1,150 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
}
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer,
},
}
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
pub peer: Peer,
}
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cluster_id = meta_client.id().0;
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let resp = cluster_client
.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
}

View File

@@ -57,7 +57,6 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::recording_rules::{FrontendClient, RecordingRuleEngine};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
@@ -246,7 +245,6 @@ impl FlownodeInstance {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
if let Some(task) = &self.heartbeat_task {
info!("Close heartbeat task for flownode");
task.shutdown();
}
@@ -273,8 +271,6 @@ pub struct FlownodeBuilder {
heartbeat_task: Option<HeartbeatTask>,
/// receive a oneshot sender to send state size report
state_report_handler: Option<StateReportHandler>,
/// Client to send sql to frontend
frontend_client: Arc<FrontendClient>,
}
impl FlownodeBuilder {
@@ -285,7 +281,6 @@ impl FlownodeBuilder {
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
frontend_client: Arc<FrontendClient>,
) -> Self {
Self {
opts,
@@ -295,7 +290,6 @@ impl FlownodeBuilder {
flow_metadata_manager,
heartbeat_task: None,
state_report_handler: None,
frontend_client,
}
}
@@ -453,10 +447,7 @@ impl FlownodeBuilder {
let node_id = self.opts.node_id.map(|id| id as u32);
let rule_engine =
RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone());
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
for worker_id in 0..num_workers {
let (tx, rx) = oneshot::channel();

View File

@@ -86,8 +86,7 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
let schema = vec![
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();

View File

@@ -112,7 +112,6 @@ impl MetaClientBuilder {
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
}
pub fn enable_heartbeat(self) -> Self {

View File

@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = key.into();
let key = (&key).into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,

View File

@@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
@@ -151,6 +152,8 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -192,6 +195,7 @@ impl Default for MetasrvOptions {
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
}
}
}
@@ -442,6 +446,10 @@ impl Metasrv {
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
self.options.node_max_idle_time,
self.in_memory.clone(),
)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}

View File

@@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv {
};
if pusher_id.is_none() {
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
pusher_id =
Some(register_pusher(&handler_group, header, tx.clone()).await);
}
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await
@@ -173,13 +175,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> Option<PusherId> {
) -> PusherId {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(pusher_id, pusher).await;
Some(pusher_id)
pusher_id
}
#[cfg(test)]

View File

@@ -68,7 +68,6 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
#[allow(unused)]
table_flownode_set_cache: TableFlownodeSetCacheRef,
}
@@ -336,11 +335,9 @@ impl Inserter {
let InstantAndNormalInsertRequests {
normal_requests,
instant_requests: _,
instant_requests,
} = requests;
// TODO(discord9): mirror some
/*
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
@@ -350,7 +347,7 @@ impl Inserter {
.chain(instant_requests.requests.iter()),
)
.await?;
flow_mirror_task.detach(self.node_manager.clone())?;*/
flow_mirror_task.detach(self.node_manager.clone())?;
// Write requests to datanode and wait for response
let write_tasks = self
@@ -820,14 +817,12 @@ struct CreateAlterTableResult {
table_infos: HashMap<TableId, Arc<TableInfo>>,
}
#[allow(unused)]
struct FlowMirrorTask {
requests: HashMap<Peer, RegionInsertRequests>,
num_rows: usize,
}
impl FlowMirrorTask {
#[allow(unused)]
async fn new(
cache: &TableFlownodeSetCacheRef,
requests: impl Iterator<Item = &RegionInsertRequest>,
@@ -901,7 +896,6 @@ impl FlowMirrorTask {
})
}
#[allow(unused)]
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
for (peer, inserts) in self.requests {

View File

@@ -583,7 +583,8 @@ impl HistogramFoldStream {
.expect("field column should not be nullable");
counters.push(counter);
}
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
@@ -672,7 +673,7 @@ impl HistogramFoldStream {
if bucket.len() <= 1 {
return Ok(f64::NAN);
}
if *bucket.last().unwrap() != f64::INFINITY {
if bucket.last().unwrap().is_finite() {
return Err(DataFusionError::Execution(
"last bucket should be +Inf".to_string(),
));
@@ -692,8 +693,8 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
let total = *counter.last().unwrap();
let expected_pos = total * quantile;

View File

@@ -21,6 +21,7 @@ mod idelta;
mod predict_linear;
mod quantile;
mod resets;
mod round;
#[cfg(test)]
mod test_util;
@@ -39,6 +40,7 @@ pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub use round::Round;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -0,0 +1,105 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use datatypes::compute;
use crate::functions::extract_array;
pub struct Round {
nearest: f64,
}
impl Round {
fn new(nearest: f64) -> Self {
Self { nearest }
}
pub const fn name() -> &'static str {
"prom_round"
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Immutable,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
let value_array = extract_array(&input[0])?;
if self.nearest == 0.0 {
let values = value_array.as_primitive::<Float64Type>();
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
Ok(ColumnarValue::Array(Arc::new(result) as _))
} else {
let values = value_array.as_primitive::<Float64Type>();
let nearest = self.nearest;
let result =
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
Ok(ColumnarValue::Array(Arc::new(result) as _))
}
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::Float64Array;
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let result = round_udf.invoke_batch(&input, 1).unwrap();
let result_array = extract_array(&result).unwrap();
assert_eq!(result_array.len(), 1);
assert_eq!(
result_array.as_primitive::<Float64Type>().values(),
&expected
);
}
#[test]
fn test_round() {
test_round_f64(vec![123.456], 0.001, vec![123.456]);
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
test_round_f64(vec![123.456], 0.1, vec![123.5]);
test_round_f64(vec![123.456], 0.0, vec![123.0]);
test_round_f64(vec![123.456], 1.0, vec![123.0]);
test_round_f64(vec![123.456], 10.0, vec![120.0]);
test_round_f64(vec![123.456], 100.0, vec![100.0]);
test_round_f64(vec![123.456], 105.0, vec![105.0]);
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
}
}

View File

@@ -52,7 +52,7 @@ use promql::extension_plan::{
use promql::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::token::TokenType;
@@ -200,10 +200,9 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
}
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
}
.fail()?,
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
@@ -218,6 +217,48 @@ impl PromPlanner {
Ok(res)
}
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
expr, range, step, ..
} = subquery_expr;
let current_interval = self.ctx.interval;
if let Some(step) = step {
self.ctx.interval = step.as_millis() as _;
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
input,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
@@ -441,6 +482,7 @@ impl PromPlanner {
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
modifier,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -1468,6 +1510,20 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"round" => {
let nearest = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
None => 0.0,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
@@ -1674,7 +1730,7 @@ impl PromPlanner {
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
fn_name: "label_join"
}
);
@@ -2121,24 +2177,49 @@ impl PromPlanner {
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let mut left_tag_columns = if only_join_time_index {
vec![]
BTreeSet::new()
} else {
self.ctx
.tag_columns
.iter()
.map(Column::from_name)
.collect::<Vec<_>>()
.cloned()
.collect::<BTreeSet<_>>()
};
let mut right_tag_columns = left_tag_columns.clone();
// apply modifier
if let Some(modifier) = modifier {
// apply label modifier
if let Some(matching) = &modifier.matching {
match matching {
// keeps columns mentioned in `on`
LabelModifier::Include(on) => {
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
right_tag_columns =
right_tag_columns.intersection(&mask).cloned().collect();
}
// removes columns memtioned in `ignoring`
LabelModifier::Exclude(ignoring) => {
// doesn't check existence of label
for label in &ignoring.labels {
let _ = left_tag_columns.remove(label);
let _ = right_tag_columns.remove(label);
}
}
}
}
}
// push time index column if it exists
if let (Some(left_time_index_column), Some(right_time_index_column)) =
(left_time_index_column, right_time_index_column)
{
left_tag_columns.push(Column::from_name(left_time_index_column));
right_tag_columns.push(Column::from_name(right_time_index_column));
left_tag_columns.insert(left_time_index_column);
right_tag_columns.insert(right_time_index_column);
}
let right = LogicalPlanBuilder::from(right)
@@ -2154,7 +2235,16 @@ impl PromPlanner {
.join(
right,
JoinType::Inner,
(left_tag_columns, right_tag_columns),
(
left_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
right_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
),
None,
)
.context(DataFusionPlanningSnafu)?
@@ -3340,6 +3430,59 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn test_hash_join() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_sum".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
)
.await;
// Should be ok
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
SubqueryAlias: http_server_requests_seconds_sum
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_sum
SubqueryAlias: http_server_requests_seconds_count
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_count"#;
assert_eq!(plan.to_string(), expected);
}
#[tokio::test]
async fn test_nested_histogram_quantile() {
let mut eval_stmt = EvalStmt {

View File

@@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::{FlownodeBuilder, FrontendClient};
use flow::FlownodeBuilder;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
@@ -164,15 +164,12 @@ impl GreptimeDbStandaloneBuilder {
Some(procedure_manager.clone()),
);
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(flow_builder.build().await.unwrap());

View File

@@ -0,0 +1,81 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------+-------+
drop table cache_hit;
Affected Rows: 0

View File

@@ -0,0 +1,30 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
drop table cache_hit;

View File

@@ -638,3 +638,78 @@ drop table cache_miss;
Affected Rows: 0
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
Affected Rows: 4
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
drop table cache_hit_with_null_label;
Affected Rows: 0
drop table cache_miss_with_null_label;
Affected Rows: 0

View File

@@ -295,3 +295,45 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
drop table cache_hit;
drop table cache_miss;
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
drop table cache_hit_with_null_label;
drop table cache_miss_with_null_label;

View File

@@ -295,3 +295,40 @@ drop table histogram3_bucket;
Affected Rows: 0
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
Affected Rows: 0
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
Affected Rows: 7
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
+---------------------+---+-----+
| ts | s | val |
+---------------------+---+-----+
| 1970-01-01T00:48:20 | a | 5.0 |
| 1970-01-01T00:50:00 | a | 5.0 |
+---------------------+---+-----+
drop table histogram4_bucket;
Affected Rows: 0

View File

@@ -163,3 +163,27 @@ insert into histogram3_bucket values
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
drop table histogram3_bucket;
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
drop table histogram4_bucket;

View File

@@ -0,0 +1,65 @@
create table metric_total (
ts timestamp time index,
val double,
);
Affected Rows: 0
insert into metric_total values
(0, 1),
(10000, 2);
Affected Rows: 2
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 3.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 4.0 |
+---------------------+----------------------------------+
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:00 | 10.0 |
+---------------------+----------------------------------+
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:59 | 2.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
drop table metric_total;
Affected Rows: 0

View File

@@ -0,0 +1,22 @@
create table metric_total (
ts timestamp time index,
val double,
);
insert into metric_total values
(0, 1),
(10000, 2);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
drop table metric_total;