Compare commits

..

4 Commits

Author SHA1 Message Date
discord9
08161812ae metrics: better bucket&longer timeout 2025-02-27 17:19:34 +08:00
discord9
33d1ba242f fix: timeout 2025-02-27 17:19:34 +08:00
discord9
40ce94f3bf fix: heartbeat&expire_after unit 2025-02-27 17:19:34 +08:00
discord9
4628119a42 feat: time window in df plan
WIP

test: found out time window expr

chore: pub

tests: also unparsed

tests: rm dup code

feat: frontend client for recording rule

fix: bound edgecase

WIP

WIP

feat: rule engine

feat: add init options& tmp rerounte to rule

fix: dist client get

fix: also not handle mirror write in flownode

chore: clippy
2025-02-27 17:19:34 +08:00
40 changed files with 1426 additions and 875 deletions

View File

@@ -319,7 +319,6 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |

View File

@@ -50,9 +50,6 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true

View File

@@ -16,7 +16,6 @@
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendClient, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
@@ -317,6 +317,8 @@ impl StartCommand {
Arc::new(executor),
);
let frontend_client = FrontendClient::from_meta_client(meta_client.clone());
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flownode_builder = FlownodeBuilder::new(
opts,
@@ -324,6 +326,7 @@ impl StartCommand {
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
Arc::new(frontend_client),
)
.with_heartbeat_task(heartbeat_task);

View File

@@ -54,7 +54,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendClient,
FrontendInvoker,
};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -533,12 +536,16 @@ impl StartCommand {
flow: opts.flow.clone(),
..Default::default()
};
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(
flow_builder

View File

@@ -57,10 +57,12 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -230,8 +232,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
}
}
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
@@ -313,7 +315,7 @@ mod tests {
node_id: 2,
};
let key_bytes: Vec<u8> = (&key).into();
let key_bytes: Vec<u8> = key.into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);

View File

@@ -343,6 +343,7 @@ pub enum FlowType {
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const STREAMING: &str = "streaming";
pub const FLOW_TYPE_KEY: &str = "flow_type";
}
impl Default for FlowType {
@@ -398,7 +399,8 @@ impl From<&CreateFlowData> for CreateRequest {
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options.insert("flow_type".to_string(), flow_type);
req.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
req
}
}
@@ -430,7 +432,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.collect::<Vec<_>>();
let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert("flow_type".to_string(), flow_type);
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
let flow_info = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),

View File

@@ -34,7 +34,6 @@ pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod range_stream;

View File

@@ -1,152 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::time::Duration;
use common_telemetry::{debug, error, info, warn};
use tokio::task::JoinHandle;
use tokio::time::{interval, MissedTickBehavior};
use crate::cluster::{NodeInfo, NodeInfoKey};
use crate::error;
use crate::kv_backend::ResettableKvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
/// [NodeExpiryListener] periodically checks all node info in memory and removes
/// expired node info to prevent memory leak.
pub struct NodeExpiryListener {
handle: Mutex<Option<JoinHandle<()>>>,
max_idle_time: Duration,
in_memory: ResettableKvBackendRef,
}
impl Drop for NodeExpiryListener {
fn drop(&mut self) {
self.stop();
}
}
impl NodeExpiryListener {
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
Self {
handle: Mutex::new(None),
max_idle_time,
in_memory,
}
}
async fn start(&self) {
let mut handle = self.handle.lock().unwrap();
if handle.is_none() {
let in_memory = self.in_memory.clone();
let max_idle_time = self.max_idle_time;
let ticker_loop = tokio::spawn(async move {
// Run clean task every minute.
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
error!(e; "Failed to clean expired node");
}
}
});
*handle = Some(ticker_loop);
}
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.abort();
info!("Node expiry listener stopped")
}
}
/// Cleans expired nodes from memory.
async fn clean_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<()> {
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
for key in node_keys {
let key_bytes: Vec<u8> = (&key).into();
if let Err(e) = in_memory.delete(&key_bytes, false).await {
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
} else {
debug!("Deleted expired node key: {:?}", key);
}
}
Ok(())
}
/// Lists expired nodes that have been inactive more than `max_idle_time`.
async fn list_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
let req = RangeRequest::new().with_prefix(prefix);
let current_time_millis = common_time::util::current_time_millis();
let resp = in_memory.range(req).await?;
Ok(resp
.kvs
.into_iter()
.filter_map(move |KeyValue { key, value }| {
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
warn!(e; "Unrecognized node info value");
}) else {
return None;
};
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
{
NodeInfoKey::try_from(key)
.inspect_err(|e| {
warn!(e; "Unrecognized node info key: {:?}", info.peer);
})
.ok()
.inspect(|node_key| {
debug!("Found expired node: {:?}", node_key);
})
} else {
None
}
}))
}
}
#[async_trait::async_trait]
impl LeadershipChangeListener for NodeExpiryListener {
fn name(&self) -> &str {
"NodeExpiryListener"
}
async fn on_leader_start(&self) -> error::Result<()> {
self.start().await;
info!(
"On leader start, node expiry listener started with max idle time: {:?}",
self.max_idle_time
);
Ok(())
}
async fn on_leader_stop(&self) -> error::Result<()> {
self.stop();
info!("On leader stop, node expiry listener stopped");
Ok(())
}
}

View File

@@ -32,5 +32,5 @@ pub mod types;
pub mod value;
pub mod vectors;
pub use arrow::{self, compute};
pub use arrow;
pub use error::{Error, Result};

View File

@@ -49,12 +49,13 @@ pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
pub(crate) use crate::adapter::worker::{create_worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::recording_rules::RecordingRuleEngine;
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
mod flownode_impl;
@@ -171,6 +172,8 @@ pub struct FlowWorkerManager {
flush_lock: RwLock<()>,
/// receive a oneshot sender to send state size report
state_report_handler: RwLock<Option<StateReportHandler>>,
/// engine for recording rule
rule_engine: RecordingRuleEngine,
}
/// Building FlownodeManager
@@ -185,6 +188,7 @@ impl FlowWorkerManager {
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
rule_engine: RecordingRuleEngine,
) -> Self {
let srv_map = ManagedTableSource::new(
table_meta.table_info_manager().clone(),
@@ -207,6 +211,7 @@ impl FlowWorkerManager {
node_id,
flush_lock: RwLock::new(()),
state_report_handler: RwLock::new(None),
rule_engine,
}
}
@@ -215,25 +220,6 @@ impl FlowWorkerManager {
self
}
/// Create a flownode manager with one worker
pub fn new_with_workers<'s>(
node_id: Option<u32>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
num_workers: usize,
) -> (Self, Vec<Worker<'s>>) {
let mut zelf = Self::new(node_id, query_engine, table_meta);
let workers: Vec<_> = (0..num_workers)
.map(|_| {
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
worker
})
.collect();
(zelf, workers)
}
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(handle);
@@ -751,7 +737,11 @@ pub struct CreateFlowArgs {
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
#[allow(unreachable_code)]
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.remove_flow(flow_id).await;
for handle in self.worker_handles.iter() {
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
@@ -767,8 +757,10 @@ impl FlowWorkerManager {
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments, unreachable_code)]
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
// TODO(discord9): reroute some back to streaming engine later
return self.rule_engine.create_flow(args).await;
let CreateFlowArgs {
flow_id,
sink_table_name,

View File

@@ -153,7 +153,10 @@ impl Flownode for FlowWorkerManager {
}
}
#[allow(unreachable_code, unused)]
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
return Ok(Default::default());
// using try_read to ensure two things:
// 1. flush wouldn't happen until inserts before it is inserted
// 2. inserts happening concurrently with flush wouldn't be block by flush

View File

@@ -16,6 +16,7 @@
use std::any::Any;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
@@ -156,6 +157,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
Arrow {
#[snafu(source)]
raw: ArrowError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
Datafusion {
#[snafu(source)]
@@ -230,6 +240,7 @@ impl ErrorExt for Error {
match self {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Arrow { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,

View File

@@ -238,6 +238,7 @@ mod test {
for (sql, current, expected) in &testcases {
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await

View File

@@ -14,7 +14,6 @@
//! Send heartbeat from flownode to metasrv
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
@@ -25,7 +24,7 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, error, info};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
@@ -66,7 +65,6 @@ pub struct HeartbeatTask {
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -89,25 +87,11 @@ impl HeartbeatTask {
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
}
pub async fn start(&self) -> Result<(), Error> {
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Heartbeat task started multiple times");
return Ok(());
}
self.create_streams().await
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -130,13 +114,6 @@ impl HeartbeatTask {
pub fn shutdown(&self) {
info!("Close heartbeat task for flownode");
if self
.running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
}
fn new_heartbeat_request(
@@ -281,7 +258,7 @@ impl HeartbeatTask {
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.create_streams().await.is_ok() {
if self.start().await.is_ok() {
break;
}
}

View File

@@ -33,6 +33,7 @@ mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod recording_rules;
mod repr;
mod server;
mod transform;
@@ -43,4 +44,5 @@ mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use recording_rules::FrontendClient;
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};

View File

@@ -28,6 +28,32 @@ lazy_static! {
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_query_time",
"flow rule engine query time",
&["flow_id"],
vec![
0.0,
1.,
3.,
5.,
10.,
20.,
30.,
60.,
2. * 60.,
5. * 60.,
10. * 60.
]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_slow_query",
"flow rule engine slow query",
&["flow_id", "sql", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -0,0 +1,744 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Run flow as recording rule which is time-window-aware normal query triggered every tick set by user
mod engine;
mod frontend_client;
use std::collections::HashSet;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionContext;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{Column, DFSchema, TableReference};
use datafusion_expr::LogicalPlan;
use datafusion_physical_expr::PhysicalExprRef;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, Vector,
};
pub use engine::RecordingRuleEngine;
pub use frontend_client::FrontendClient;
use query::parser::QueryLanguageParser;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, UnexpectedSnafu};
use crate::Error;
/// Convert sql to datafusion logical plan
pub async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = if optimize {
apply_df_optimizer(plan).await?
} else {
plan
};
Ok(plan)
}
/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
/// return `Some("2021-07-01 00:00:00.000")`
/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
///
/// Time window expr is a expr that:
/// 1. ref only to a time index column
/// 2. is monotonic increasing
/// 3. show up in GROUP BY clause
///
/// note this plan should only contain one TableScan
pub async fn find_plan_time_window_bound(
plan: &LogicalPlan,
current: Timestamp,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
// TODO(discord9): find the expr that do time window
let catalog_man = engine.engine_state().catalog_manager();
let mut table_name = None;
// first find the table source in the logical plan
plan.apply(|plan| {
let LogicalPlan::TableScan(table_scan) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
table_name = Some(table_scan.table_name.clone());
Ok(TreeNodeRecursion::Stop)
})
.with_context(|_| DatafusionSnafu {
context: format!("Can't find table source in plan {plan:?}"),
})?;
let Some(table_name) = table_name else {
UnexpectedSnafu {
reason: format!("Can't find table source in plan {plan:?}"),
}
.fail()?
};
let current_schema = query_ctx.current_schema();
let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
let schema_name = table_name.schema().unwrap_or(&current_schema);
let table_name = table_name.table();
let Some(table_ref) = catalog_man
.table(catalog_name, schema_name, table_name, Some(&query_ctx))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
else {
UnexpectedSnafu {
reason: format!(
"Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
),
}
.fail()?
};
let schema = &table_ref.table_info().meta.schema;
let ts_index = schema.timestamp_column().context(UnexpectedSnafu {
reason: format!("Can't find timestamp column in table {table_name:?}"),
})?;
let ts_col_name = ts_index.name.clone();
let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
reason: format!(
"Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
),
})?.unit();
let ts_columns: HashSet<_> = HashSet::from_iter(vec![
format!("{catalog_name}.{schema_name}.{table_name}.{ts_col_name}"),
format!("{schema_name}.{table_name}.{ts_col_name}"),
format!("{table_name}.{ts_col_name}"),
format!("{ts_col_name}"),
]);
let ts_columns: HashSet<_> = ts_columns
.into_iter()
.map(Column::from_qualified_name)
.collect();
let ts_columns_ref: HashSet<&Column> = ts_columns.iter().collect();
// find the time window expr which refers to the time index column
let mut time_window_expr: Option<Expr> = None;
let find_time_window_expr = |plan: &LogicalPlan| {
let LogicalPlan::Aggregate(aggregate) = plan else {
return Ok(TreeNodeRecursion::Continue);
};
for group_expr in &aggregate.group_expr {
let refs = group_expr.column_refs();
if refs.len() != 1 {
continue;
}
let ref_col = refs.iter().next().unwrap();
if ts_columns_ref.contains(ref_col) {
time_window_expr = Some(group_expr.clone());
break;
}
}
Ok(TreeNodeRecursion::Stop)
};
plan.apply(find_time_window_expr)
.with_context(|_| DatafusionSnafu {
context: format!("Can't find time window expr in plan {plan:?}"),
})?;
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
ts_col_name.clone(),
ts_index.data_type.as_arrow_type(),
false,
)]));
let df_schema = DFSchema::from_field_specific_qualified_schema(
vec![Some(TableReference::bare(table_name))],
&arrow_schema,
)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
})?;
// cast current to ts_index's type
let new_current = current
.convert_to(expected_time_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
})?;
// if no time_window_expr is found, return None
if let Some(time_window_expr) = time_window_expr {
let lower_bound =
find_expr_time_window_lower_bound(&time_window_expr, &df_schema, new_current)?;
let upper_bound =
find_expr_time_window_upper_bound(&time_window_expr, &df_schema, new_current)?;
Ok((ts_col_name, lower_bound, upper_bound))
} else {
Ok((ts_col_name, None, None))
}
}
/// Find the lower bound of time window in given `expr` and `current` timestamp.
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
fn find_expr_time_window_lower_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
if cur_time_window == current {
return Ok(Some(current));
}
// search to find the lower bound
let mut offset: i64 = 1;
let lower_bound;
let mut upper_bound = Some(current);
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_sub(offset) else {
// no lower bound
return Ok(None);
};
let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
let prev_time_window = eval_ts_to_ts(&phy_expr, df_schema, prev_time_probe)?;
match prev_time_window.cmp(&cur_time_window) {
Ordering::Less => {
lower_bound = Some(prev_time_probe);
break;
}
Ordering::Equal => {
upper_bound = Some(prev_time_probe);
}
Ordering::Greater => {
UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no lower bound
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact lower bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let input_time_unit = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, input_time_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => low = mid + 1,
Ordering::Equal => high = mid,
Ordering::Greater => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
}
}
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit);
Ok(Some(final_lower_bound_for_time_window))
}
/// Find the upper bound for time window expression
fn find_expr_time_window_upper_bound(
expr: &Expr,
df_schema: &DFSchema,
current: Timestamp,
) -> Result<Option<Timestamp>, Error> {
use std::cmp::Ordering;
let phy_planner = DefaultPhysicalPlanner::default();
let phy_expr: PhysicalExprRef = phy_planner
.create_physical_expr(expr, df_schema, &SessionContext::new().state())
.with_context(|_e| DatafusionSnafu {
context: format!(
"Failed to create physical expression from {expr:?} using {df_schema:?}"
),
})?;
let cur_time_window = eval_ts_to_ts(&phy_expr, df_schema, current)?;
// search to find the lower bound
let mut offset: i64 = 1;
let mut lower_bound = Some(current);
let upper_bound;
// first expontial probe to found a range for binary search
loop {
let Some(next_val) = current.value().checked_add(offset) else {
// no upper bound if overflow
return Ok(None);
};
let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
let next_time_window = eval_ts_to_ts(&phy_expr, df_schema, next_time_probe)?;
match next_time_window.cmp(&cur_time_window) {
Ordering::Less => {UnexpectedSnafu {
reason: format!(
"Unsupported time window expression, expect monotonic increasing for time window expression {expr:?}"
),
}
.fail()?
}
Ordering::Equal => {
lower_bound = Some(next_time_probe);
}
Ordering::Greater => {
upper_bound = Some(next_time_probe);
break
}
}
let Some(new_offset) = offset.checked_mul(2) else {
// no upper bound if overflow
return Ok(None);
};
offset = new_offset;
}
// binary search for the exact upper bound
ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let output_unit = upper_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.unit();
let mut low = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
.value();
let mut high = upper_bound
.context(UnexpectedSnafu {
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, output_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
Ordering::Less => UnexpectedSnafu {
reason: format!("Binary search failed for time window expression {expr:?}"),
}
.fail()?,
Ordering::Equal => low = mid + 1,
Ordering::Greater => high = mid,
}
}
let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
Ok(Some(final_upper_bound_for_time_window))
}
fn eval_ts_to_ts(
phy: &PhysicalExprRef,
df_schema: &DFSchema,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let ts_vector = match input_value.unit() {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Millisecond => {
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
};
let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
.with_context(|_| ArrowSnafu {
context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
})?;
let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
})?;
let val = match eval_res {
datafusion_expr::ColumnarValue::Array(array) => {
let ty = array.data_type();
let ty = ConcreteDataType::from_arrow_type(ty);
let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
ty.unit()
} else {
return UnexpectedSnafu {
reason: format!("Physical expression {phy:?} evaluated to non-timestamp type"),
}
.fail();
};
match time_unit {
TimeUnit::Second => TimestampSecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0),
TimeUnit::Millisecond => {
TimestampMillisecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::try_from_arrow_array(array.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to create vector from arrow array {array:?}"),
})?
.get(0)
}
}
}
datafusion_expr::ColumnarValue::Scalar(scalar) => Value::try_from(scalar.clone())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert scalar {scalar:?} to value"),
})?,
};
if let Value::Timestamp(ts) = val {
Ok(ts)
} else {
UnexpectedSnafu {
reason: format!("Expected timestamp in expression {phy:?} but got {val:?}"),
}
.fail()?
}
}
// TODO(discord9): a method to found out the precise time window
/// Find out the `Filter` Node corresponding to outermost `WHERE` and add a new filter expr to it
#[derive(Debug)]
pub struct AddFilterRewriter {
extra_filter: Expr,
is_rewritten: bool,
}
impl AddFilterRewriter {
fn new(filter: Expr) -> Self {
Self {
extra_filter: filter,
is_rewritten: false,
}
}
}
impl TreeNodeRewriter for AddFilterRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
match node {
LogicalPlan::Filter(mut filter) if !filter.having => {
filter.predicate = filter.predicate.and(self.extra_filter.clone());
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
LogicalPlan::TableScan(_) => {
// add a new filter
let filter =
datafusion_expr::Filter::try_new(self.extra_filter.clone(), Arc::new(node))?;
self.is_rewritten = true;
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
}
_ => Ok(Transformed::no(node)),
}
}
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
let unparser = Unparser::default();
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
context: format!("Failed to unparse logical plan {plan:?}"),
})?;
Ok(sql.to_string())
}
#[cfg(test)]
mod test {
use datafusion_common::tree_node::TreeNode;
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::{sql_to_df_plan, *};
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
(
"SELECT number FROM numbers_with_ts GROUP BY number","SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (number > 4) GROUP BY numbers_with_ts.number"
),
(
"SELECT number FROM numbers_with_ts WHERE number < 2 OR number >10",
"SELECT numbers_with_ts.number FROM numbers_with_ts WHERE (((numbers_with_ts.number < 2) OR (numbers_with_ts.number > 10)) AND (number > 4))"
),
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window",
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE (number > 4) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
)
];
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for (before, after) in testcases {
let sql = before;
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let mut add_filter = AddFilterRewriter::new(col("number").gt(lit(4u32)));
let plan = plan.rewrite(&mut add_filter).unwrap().data;
let new_sql = df_plan_to_sql(&plan).unwrap();
assert_eq!(after, new_sql);
}
}
#[tokio::test]
async fn test_plan_time_window_lower_bound() {
use datafusion_expr::{col, lit};
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let testcases = [
// same alias is not same column
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
),
// complex time window index
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
Timestamp::new(23, TimeUnit::Millisecond),
("ts".to_string(), None, None),
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
),
// time index
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// on spot
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(0, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// different time unit
(
"SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23_000_000, TimeUnit::Nanosecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other fields
(
"SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
),
// time index with other pks
(
"SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
Timestamp::new(23, TimeUnit::Millisecond),
(
"ts".to_string(),
Some(Timestamp::new(0, TimeUnit::Millisecond)),
Some(Timestamp::new(300000, TimeUnit::Millisecond)),
),
"SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
),
];
for (sql, current, expected, unparsed) in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
.await
.unwrap();
let real =
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
.await
.unwrap();
assert_eq!(expected, real);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await
.unwrap();
let (col_name, lower, upper) = real;
let new_sql = if lower.is_some() {
let to_df_literal = |value| {
let value = Value::from(value);
value.try_to_scalar_value(&value.data_type()).unwrap()
};
let lower = to_df_literal(lower.unwrap());
let upper = to_df_literal(upper.unwrap());
let expr = col(&col_name)
.gt_eq(lit(lower))
.and(col(&col_name).lt_eq(lit(upper)));
let mut add_filter = AddFilterRewriter::new(expr);
let plan = plan.rewrite(&mut add_filter).unwrap().data;
df_plan_to_sql(&plan).unwrap()
} else {
sql.to_string()
};
assert_eq!(unparsed, new_sql);
}
}
}

View File

@@ -0,0 +1,407 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use common_meta::ddl::create_flow::FlowType;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion_common::tree_node::TreeNode;
use datatypes::value::Value;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{oneshot, RwLock};
use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter};
use crate::adapter::{CreateFlowArgs, FlowId};
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 10 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// TODO(discord9): determine how to configure refresh rate
pub struct RecordingRuleEngine {
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
engine: QueryEngineRef,
}
impl RecordingRuleEngine {
pub fn new(frontend_client: Arc<FrontendClient>, engine: QueryEngineRef) -> Self {
Self {
tasks: Default::default(),
shutdown_txs: Default::default(),
frontend_client,
engine,
}
}
}
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
impl RecordingRuleEngine {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids: _,
create_if_not_exists,
or_replace,
expire_after,
comment: _,
sql,
flow_options,
query_ctx,
} = args;
// or replace logic
{
let is_exist = self.tasks.read().await.contains_key(&flow_id);
match (create_if_not_exists, or_replace, is_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
flow_type == Some(&FlowType::RecordingRule.to_string()) || flow_type.is_none(),
UnexpectedSnafu {
reason: format!("Flow type is not RecordingRule nor None, got {flow_type:?}")
}
);
let Some(query_ctx) = query_ctx else {
UnexpectedSnafu {
reason: "Query context is None".to_string(),
}
.fail()?
};
let (tx, rx) = oneshot::channel();
let task = RecordingRuleTask::new(
flow_id,
&sql,
expire_after,
sink_table_name,
Arc::new(query_ctx),
rx,
);
let task_inner = task.clone();
let engine = self.engine.clone();
let frontend = self.frontend_client.clone();
// TODO(discord9): also save handle & use time wheel or what for better
let _handle = common_runtime::spawn_global(async move {
match task_inner.start_executing(engine, frontend).await {
Ok(()) => info!("Flow {} shutdown", task_inner.flow_id),
Err(err) => common_telemetry::error!(
"Flow {} encounter unrecoverable error: {err:?}",
task_inner.flow_id
),
}
});
// TODO(discord9): deal with replace logic
let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
drop(replaced_old_task_opt);
self.shutdown_txs.write().await.insert(flow_id, tx);
Ok(Some(flow_id))
}
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
reason: format!("Can't found shutdown tx for flow {flow_id}"),
}
.fail()?
};
if tx.send(()).is_err() {
warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
flow_id: FlowId,
query: String,
/// in seconds
expire_after: Option<i64>,
sink_table_name: [String; 3],
state: Arc<RwLock<RecordingRuleState>>,
}
impl RecordingRuleTask {
pub fn new(
flow_id: FlowId,
query: &str,
expire_after: Option<i64>,
sink_table_name: [String; 3],
query_ctx: QueryContextRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
flow_id,
query: query.to_string(),
expire_after,
sink_table_name,
state: Arc::new(RwLock::new(RecordingRuleState::new(query_ctx, shutdown_rx))),
}
}
}
impl RecordingRuleTask {
/// This should be called in a new tokio task
pub async fn start_executing(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) -> Result<(), Error> {
// only first query don't need upper bound
let mut is_first = true;
loop {
// FIXME(discord9): test if need upper bound also works
let new_query = self
.gen_query_with_time_window(engine.clone(), false)
.await?;
let insert_into = format!(
"INSERT INTO {}.{}.{} {}",
self.sink_table_name[0],
self.sink_table_name[1],
self.sink_table_name[2],
new_query
);
if is_first {
is_first = false;
}
let instant = Instant::now();
let flow_id = self.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.expire_after, peer_addr, &insert_into
);
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let res = db_client.database.sql(&insert_into).await;
drop(timer);
let elapsed = instant.elapsed();
if let Ok(res1) = &res {
debug!(
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
elapsed
);
} else if let Err(res) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &insert_into
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &insert_into
);
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.await
.after_query_exec(elapsed, res.is_ok());
let sleep_until = {
let mut state = self.state.write().await;
match state.shutdown_rx.try_recv() {
Ok(()) => break Ok(()),
Err(TryRecvError::Closed) => {
warn!("Unexpected shutdown flow {flow_id}, shutdown anyway");
break Ok(());
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(None)
};
tokio::time::sleep_until(sleep_until).await;
}
}
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
need_upper_bound: bool,
) -> Result<String, Error> {
let query_ctx = self.state.read().await.query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let low_bound = self
.expire_after
.map(|e| since_the_epoch.as_secs() - e as u64);
let Some(low_bound) = low_bound else {
return Ok(self.query.clone());
};
let low_bound = Timestamp::new_second(low_bound as i64);
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
let (col_name, lower, upper) =
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
.await?;
let new_sql = {
let to_df_literal = |value| -> Result<_, Error> {
let value = Value::from(value);
let value = value
.try_to_scalar_value(&value.data_type())
.with_context(|_| DatatypesSnafu {
extra: format!("Failed to convert to scalar value: {}", value),
})?;
Ok(value)
};
let lower = lower.map(to_df_literal).transpose()?;
let upper = upper.map(to_df_literal).transpose()?.and_then(|u| {
if need_upper_bound {
Some(u)
} else {
None
}
});
let expr = {
use datafusion_expr::{col, lit};
match (lower, upper) {
(Some(l), Some(u)) => col(&col_name)
.gt_eq(lit(l))
.and(col(&col_name).lt_eq(lit(u))),
(Some(l), None) => col(&col_name).gt_eq(lit(l)),
(None, Some(u)) => col(&col_name).lt(lit(u)),
// no time window, direct return
(None, None) => return Ok(self.query.clone()),
}
};
let mut add_filter = AddFilterRewriter::new(expr);
// make a not optimized plan for clearer unparse
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, false).await?;
let plan = plan
.clone()
.rewrite(&mut add_filter)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan {plan:?}"),
})?
.data;
df_plan_to_sql(&plan)?
};
Ok(new_sql)
}
}
#[derive(Debug)]
pub struct RecordingRuleState {
query_ctx: QueryContextRef,
/// last query complete time
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
exec_state: ExecState,
shutdown_rx: oneshot::Receiver<()>,
}
impl RecordingRuleState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
Self {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
exec_state: ExecState::Idle,
shutdown_rx,
}
}
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
self.last_update_time + next_duration
}
}
#[derive(Debug, Clone)]
enum ExecState {
Idle,
Executing,
}

View File

@@ -0,0 +1,150 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
}
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer,
},
}
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
pub peer: Peer,
}
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cluster_id = meta_client.id().0;
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let resp = cluster_client
.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
}

View File

@@ -57,6 +57,7 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::recording_rules::{FrontendClient, RecordingRuleEngine};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowWorkerManager, FlownodeOptions};
@@ -245,6 +246,7 @@ impl FlownodeInstance {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
if let Some(task) = &self.heartbeat_task {
info!("Close heartbeat task for flownode");
task.shutdown();
}
@@ -271,6 +273,8 @@ pub struct FlownodeBuilder {
heartbeat_task: Option<HeartbeatTask>,
/// receive a oneshot sender to send state size report
state_report_handler: Option<StateReportHandler>,
/// Client to send sql to frontend
frontend_client: Arc<FrontendClient>,
}
impl FlownodeBuilder {
@@ -281,6 +285,7 @@ impl FlownodeBuilder {
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
frontend_client: Arc<FrontendClient>,
) -> Self {
Self {
opts,
@@ -290,6 +295,7 @@ impl FlownodeBuilder {
flow_metadata_manager,
heartbeat_task: None,
state_report_handler: None,
frontend_client,
}
}
@@ -447,7 +453,10 @@ impl FlownodeBuilder {
let node_id = self.opts.node_id.map(|id| id as u32);
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
let rule_engine =
RecordingRuleEngine::new(self.frontend_client.clone(), query_engine.clone());
let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta, rule_engine);
for worker_id in 0..num_workers {
let (tx, rx) = oneshot::channel();

View File

@@ -86,7 +86,8 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
let schema = vec![
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();

View File

@@ -112,6 +112,7 @@ impl MetaClientBuilder {
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
}
pub fn enable_heartbeat(self) -> Self {

View File

@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = (&key).into();
let key = key.into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,

View File

@@ -32,7 +32,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
@@ -152,8 +151,6 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -195,7 +192,6 @@ impl Default for MetasrvOptions {
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
}
}
}
@@ -446,10 +442,6 @@ impl Metasrv {
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
self.options.node_max_idle_time,
self.in_memory.clone(),
)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}

View File

@@ -68,15 +68,13 @@ impl heartbeat_server::Heartbeat for Metasrv {
};
if pusher_id.is_none() {
pusher_id =
Some(register_pusher(&handler_group, header, tx.clone()).await);
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
}
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await
@@ -175,13 +173,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> PusherId {
) -> Option<PusherId> {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(pusher_id, pusher).await;
pusher_id
Some(pusher_id)
}
#[cfg(test)]

View File

@@ -68,6 +68,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
#[allow(unused)]
table_flownode_set_cache: TableFlownodeSetCacheRef,
}
@@ -335,9 +336,11 @@ impl Inserter {
let InstantAndNormalInsertRequests {
normal_requests,
instant_requests,
instant_requests: _,
} = requests;
// TODO(discord9): mirror some
/*
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
@@ -347,7 +350,7 @@ impl Inserter {
.chain(instant_requests.requests.iter()),
)
.await?;
flow_mirror_task.detach(self.node_manager.clone())?;
flow_mirror_task.detach(self.node_manager.clone())?;*/
// Write requests to datanode and wait for response
let write_tasks = self
@@ -817,12 +820,14 @@ struct CreateAlterTableResult {
table_infos: HashMap<TableId, Arc<TableInfo>>,
}
#[allow(unused)]
struct FlowMirrorTask {
requests: HashMap<Peer, RegionInsertRequests>,
num_rows: usize,
}
impl FlowMirrorTask {
#[allow(unused)]
async fn new(
cache: &TableFlownodeSetCacheRef,
requests: impl Iterator<Item = &RegionInsertRequest>,
@@ -896,6 +901,7 @@ impl FlowMirrorTask {
})
}
#[allow(unused)]
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
for (peer, inserts) in self.requests {

View File

@@ -583,8 +583,7 @@ impl HistogramFoldStream {
.expect("field column should not be nullable");
counters.push(counter);
}
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
@@ -673,7 +672,7 @@ impl HistogramFoldStream {
if bucket.len() <= 1 {
return Ok(f64::NAN);
}
if bucket.last().unwrap().is_finite() {
if *bucket.last().unwrap() != f64::INFINITY {
return Err(DataFusionError::Execution(
"last bucket should be +Inf".to_string(),
));
@@ -693,8 +692,8 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
let total = *counter.last().unwrap();
let expected_pos = total * quantile;

View File

@@ -21,7 +21,6 @@ mod idelta;
mod predict_linear;
mod quantile;
mod resets;
mod round;
#[cfg(test)]
mod test_util;
@@ -40,7 +39,6 @@ pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub use round::Round;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -1,105 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use datatypes::compute;
use crate::functions::extract_array;
pub struct Round {
nearest: f64,
}
impl Round {
fn new(nearest: f64) -> Self {
Self { nearest }
}
pub const fn name() -> &'static str {
"prom_round"
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Immutable,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
let value_array = extract_array(&input[0])?;
if self.nearest == 0.0 {
let values = value_array.as_primitive::<Float64Type>();
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
Ok(ColumnarValue::Array(Arc::new(result) as _))
} else {
let values = value_array.as_primitive::<Float64Type>();
let nearest = self.nearest;
let result =
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
Ok(ColumnarValue::Array(Arc::new(result) as _))
}
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::Float64Array;
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let result = round_udf.invoke_batch(&input, 1).unwrap();
let result_array = extract_array(&result).unwrap();
assert_eq!(result_array.len(), 1);
assert_eq!(
result_array.as_primitive::<Float64Type>().values(),
&expected
);
}
#[test]
fn test_round() {
test_round_f64(vec![123.456], 0.001, vec![123.456]);
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
test_round_f64(vec![123.456], 0.1, vec![123.5]);
test_round_f64(vec![123.456], 0.0, vec![123.0]);
test_round_f64(vec![123.456], 1.0, vec![123.0]);
test_round_f64(vec![123.456], 10.0, vec![120.0]);
test_round_f64(vec![123.456], 100.0, vec![100.0]);
test_round_f64(vec![123.456], 105.0, vec![105.0]);
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
}
}

View File

@@ -52,7 +52,7 @@ use promql::extension_plan::{
use promql::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::token::TokenType;
@@ -200,9 +200,10 @@ impl PromPlanner {
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
}
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Subquery",
}
.fail()?,
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
@@ -217,48 +218,6 @@ impl PromPlanner {
Ok(res)
}
async fn prom_subquery_expr_to_plan(
&mut self,
session_state: &SessionState,
subquery_expr: &SubqueryExpr,
) -> Result<LogicalPlan> {
let SubqueryExpr {
expr, range, step, ..
} = subquery_expr;
let current_interval = self.ctx.interval;
if let Some(step) = step {
self.ctx.interval = step.as_millis() as _;
}
let current_start = self.ctx.start;
self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
let input = self.prom_expr_to_plan(expr, session_state).await?;
self.ctx.interval = current_interval;
self.ctx.start = current_start;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let manipulate = RangeManipulate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
range_ms,
self.ctx
.time_index_column
.clone()
.expect("time index should be set in `setup_context`"),
self.ctx.field_columns.clone(),
input,
)
.context(DataFusionPlanningSnafu)?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(manipulate),
}))
}
async fn prom_aggr_expr_to_plan(
&mut self,
session_state: &SessionState,
@@ -482,7 +441,6 @@ impl PromPlanner {
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
modifier,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -1510,20 +1468,6 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"round" => {
let nearest = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
None => 0.0,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
@@ -1730,7 +1674,7 @@ impl PromPlanner {
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join"
fn_name: "label_join",
}
);
@@ -2177,49 +2121,24 @@ impl PromPlanner {
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let mut left_tag_columns = if only_join_time_index {
BTreeSet::new()
vec![]
} else {
self.ctx
.tag_columns
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.map(Column::from_name)
.collect::<Vec<_>>()
};
let mut right_tag_columns = left_tag_columns.clone();
// apply modifier
if let Some(modifier) = modifier {
// apply label modifier
if let Some(matching) = &modifier.matching {
match matching {
// keeps columns mentioned in `on`
LabelModifier::Include(on) => {
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
right_tag_columns =
right_tag_columns.intersection(&mask).cloned().collect();
}
// removes columns memtioned in `ignoring`
LabelModifier::Exclude(ignoring) => {
// doesn't check existence of label
for label in &ignoring.labels {
let _ = left_tag_columns.remove(label);
let _ = right_tag_columns.remove(label);
}
}
}
}
}
// push time index column if it exists
if let (Some(left_time_index_column), Some(right_time_index_column)) =
(left_time_index_column, right_time_index_column)
{
left_tag_columns.insert(left_time_index_column);
right_tag_columns.insert(right_time_index_column);
left_tag_columns.push(Column::from_name(left_time_index_column));
right_tag_columns.push(Column::from_name(right_time_index_column));
}
let right = LogicalPlanBuilder::from(right)
@@ -2235,16 +2154,7 @@ impl PromPlanner {
.join(
right,
JoinType::Inner,
(
left_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
right_tag_columns
.into_iter()
.map(Column::from_name)
.collect::<Vec<_>>(),
),
(left_tag_columns, right_tag_columns),
None,
)
.context(DataFusionPlanningSnafu)?
@@ -3430,59 +3340,6 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn test_hash_join() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_sum".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["uri", "kubernetes_namespace", "kubernetes_pod_name"],
)
.await;
// Should be ok
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value
Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri
SubqueryAlias: http_server_requests_seconds_sum
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_sum.uri DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_sum.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_sum.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_sum.uri = Utf8("/accounts/login") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_sum
SubqueryAlias: http_server_requests_seconds_count
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false]
PromSeriesDivide: tags=["uri", "kubernetes_namespace", "kubernetes_pod_name"]
Sort: http_server_requests_seconds_count.uri DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_namespace DESC NULLS LAST, http_server_requests_seconds_count.kubernetes_pod_name DESC NULLS LAST, http_server_requests_seconds_count.greptime_timestamp DESC NULLS LAST
Filter: http_server_requests_seconds_count.uri = Utf8("/accounts/login") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)
TableScan: http_server_requests_seconds_count"#;
assert_eq!(plan.to_string(), expected);
}
#[tokio::test]
async fn test_nested_histogram_quantile() {
let mut eval_stmt = EvalStmt {

View File

@@ -40,7 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::FlownodeBuilder;
use flow::{FlownodeBuilder, FrontendClient};
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
@@ -164,12 +164,15 @@ impl GreptimeDbStandaloneBuilder {
Some(procedure_manager.clone()),
);
let fe_server_addr = opts.frontend_options().grpc.bind_addr.clone();
let frontend_client = FrontendClient::from_static_grpc_addr(fe_server_addr);
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client),
);
let flownode = Arc::new(flow_builder.build().await.unwrap());

View File

@@ -1,81 +0,0 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
Affected Rows: 0
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------+-------+
drop table cache_hit;
Affected Rows: 0

View File

@@ -1,30 +0,0 @@
create table cache_hit (
ts timestamp time index,
job string,
greptime_value double,
primary key (job)
);
insert into cache_hit values
(3000, "read", 123.45),
(3000, "write", 234.567),
(4000, "read", 345.678),
(4000, "write", 456.789);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
drop table cache_hit;

View File

@@ -638,78 +638,3 @@ drop table cache_miss;
Affected Rows: 0
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
Affected Rows: 0
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
Affected Rows: 4
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
++
++
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
drop table cache_hit_with_null_label;
Affected Rows: 0
drop table cache_miss_with_null_label;
Affected Rows: 0

View File

@@ -295,45 +295,3 @@ tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
drop table cache_hit;
drop table cache_miss;
create table cache_hit_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
create table cache_miss_with_null_label (
ts timestamp time index,
job string,
null_label string null,
greptime_value double,
primary key (job, null_label)
);
insert into cache_hit_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 3.0),
(4000, "write", null, 4.0);
insert into cache_miss_with_null_label values
(3000, "read", null, 1.0),
(3000, "write", null, 2.0),
(4000, "read", null, 1.0),
(4000, "write", null, 2.0);
-- SQLNESS SORT_RESULT 3 1
-- null!=null, so it will returns the empty set.
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / on(job) (cache_miss_with_null_label + on(job) cache_hit_with_null_label);
drop table cache_hit_with_null_label;
drop table cache_miss_with_null_label;

View File

@@ -295,40 +295,3 @@ drop table histogram3_bucket;
Affected Rows: 0
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
Affected Rows: 0
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
Affected Rows: 7
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
+---------------------+---+-----+
| ts | s | val |
+---------------------+---+-----+
| 1970-01-01T00:48:20 | a | 5.0 |
| 1970-01-01T00:50:00 | a | 5.0 |
+---------------------+---+-----+
drop table histogram4_bucket;
Affected Rows: 0

View File

@@ -163,27 +163,3 @@ insert into histogram3_bucket values
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
drop table histogram3_bucket;
-- test with invalid data (unaligned buckets)
create table histogram4_bucket (
ts timestamp time index,
le string,
s string,
val double,
primary key (s, le),
);
insert into histogram4_bucket values
(2900000, "0.1", "a", 0),
(2900000, "1", "a", 10),
(2900000, "5", "a", 20),
(2900000, "+Inf", "a", 150),
(3000000, "0.1", "a", 50),
(3000000, "1", "a", 70),
(3000000, "5", "a", 120),
-- INF here is missing
;
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
drop table histogram4_bucket;

View File

@@ -1,65 +0,0 @@
create table metric_total (
ts timestamp time index,
val double,
);
Affected Rows: 0
insert into metric_total values
(0, 1),
(10000, 2);
Affected Rows: 2
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 3.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:00:10 | 4.0 |
+---------------------+----------------------------------+
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:00 | 10.0 |
+---------------------+----------------------------------+
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
+---------------------+----------------------------------+
| ts | prom_sum_over_time(ts_range,val) |
+---------------------+----------------------------------+
| 1970-01-01T00:05:59 | 2.0 |
+---------------------+----------------------------------+
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
drop table metric_total;
Affected Rows: 0

View File

@@ -1,22 +0,0 @@
create table metric_total (
ts timestamp time index,
val double,
);
insert into metric_total values
(0, 1),
(10000, 2);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (10, 10, '1s') sum_over_time(metric_total[50s:5s]);
tql eval (300, 300, '1s') sum_over_time(metric_total[50s:10s]);
tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
drop table metric_total;