mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
Compare commits
6 Commits
feat/bulk-
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51454c93d9 | ||
|
|
5dd6f92b60 | ||
|
|
11e4a8abb3 | ||
|
|
a9cbcbb0c8 | ||
|
|
ebd815d288 | ||
|
|
403958f5ba |
@@ -196,12 +196,22 @@ impl Database {
|
||||
|
||||
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
|
||||
/// is `max_retries * GRPC_CONN_TIMEOUT`
|
||||
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
|
||||
pub async fn handle_with_retry(
|
||||
&self,
|
||||
request: Request,
|
||||
max_retries: u32,
|
||||
hints: &[(&str, &str)],
|
||||
) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let mut retries = 0;
|
||||
|
||||
let request = self.to_rpc_request(request);
|
||||
|
||||
loop {
|
||||
let raw_response = client.handle(request.clone()).await;
|
||||
let mut tonic_request = tonic::Request::new(request.clone());
|
||||
let metadata = tonic_request.metadata_mut();
|
||||
Self::put_hints(metadata, hints)?;
|
||||
let raw_response = client.handle(tonic_request).await;
|
||||
match (raw_response, retries < max_retries) {
|
||||
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
|
||||
(Err(err), true) => {
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) mod hll;
|
||||
mod uddsketch;
|
||||
pub mod hll;
|
||||
pub mod uddsketch;
|
||||
|
||||
pub(crate) struct ApproximateFunction;
|
||||
|
||||
|
||||
@@ -409,7 +409,7 @@ impl FrontendClient {
|
||||
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||
|
||||
db.database
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES, &[("query_parallelism", "1")])
|
||||
.await
|
||||
.with_context(|_| InvalidRequestSnafu {
|
||||
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
|
||||
|
||||
@@ -59,6 +59,11 @@ pub struct TaskState {
|
||||
pub(crate) min_run_interval: Option<u64>,
|
||||
/// max filter number per query
|
||||
pub(crate) max_filter_num: Option<usize>,
|
||||
/// Current filter count, will grow when query succeeds(capped by `max_filter_num`),
|
||||
/// and reset to 1 when query fails.
|
||||
///
|
||||
/// This is useful for controlling resource usage
|
||||
pub(crate) cur_filter_cnt: usize,
|
||||
}
|
||||
impl TaskState {
|
||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||
@@ -72,6 +77,7 @@ impl TaskState {
|
||||
task_handle: None,
|
||||
min_run_interval: None,
|
||||
max_filter_num: None,
|
||||
cur_filter_cnt: 1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,34 +101,39 @@ impl TaskState {
|
||||
/// TODO: Make this behavior configurable.
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
_flow_id: FlowId,
|
||||
_time_window_size: &Option<Duration>,
|
||||
flow_id: FlowId,
|
||||
time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration)
|
||||
.max(
|
||||
self.min_run_interval
|
||||
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
|
||||
let lower = self.min_run_interval
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or(MIN_REFRESH_DURATION),
|
||||
);
|
||||
.unwrap_or(MIN_REFRESH_DURATION).max(time_window_size.unwrap_or_default());
|
||||
let next_duration = self.last_query_duration.max(lower);
|
||||
let next_duration = if let Some(max_timeout) = max_timeout {
|
||||
next_duration.min(max_timeout)
|
||||
} else {
|
||||
next_duration
|
||||
};
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
/*if self.dirty_time_windows.windows.is_empty() {
|
||||
let cur_dirty_window_size = self.dirty_time_windows.window_size();
|
||||
let max_query_update_range = time_window_size.clone().unwrap_or_default().mul_f64(
|
||||
self.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM) as f64,
|
||||
);
|
||||
if cur_dirty_window_size < max_query_update_range {
|
||||
self.last_update_time + next_duration
|
||||
} else {
|
||||
// if dirty time windows can't be clean up in one query, execute immediately to faster
|
||||
// clean up dirty time windows
|
||||
debug!(
|
||||
"Flow id = {}, still have {} dirty time window({:?}), execute immediately",
|
||||
"Flow id = {}, still have too many{} dirty time window({:?}), execute immediately",
|
||||
flow_id,
|
||||
self.dirty_time_windows.windows.len(),
|
||||
self.dirty_time_windows.windows
|
||||
);
|
||||
Instant::now()
|
||||
}*/
|
||||
|
||||
// wait for next duration anyway
|
||||
self.last_update_time + next_duration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +146,17 @@ pub struct DirtyTimeWindows {
|
||||
windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
}
|
||||
|
||||
/// Time windows that are being worked on, which are not dirty but are currently being processed
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct WorkingTimeWindows {
|
||||
/// windows's `start -> end` and non-overlapping
|
||||
/// `end` is exclusive(and optional)
|
||||
pub windows: BTreeMap<Timestamp, Option<Timestamp>>,
|
||||
/// Filter expression for the time windows
|
||||
/// This is used to filter the data in the time windows.
|
||||
pub filter: Option<datafusion_expr::Expr>,
|
||||
}
|
||||
|
||||
impl DirtyTimeWindows {
|
||||
/// Time window merge distance
|
||||
///
|
||||
@@ -156,10 +178,28 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn window_size(&self) -> Duration {
|
||||
let mut ret = Duration::from_secs(0);
|
||||
for (start, end) in &self.windows {
|
||||
if let Some(end) = end {
|
||||
if let Some(duration) = end.sub(start) {
|
||||
ret += duration.to_std().unwrap_or_default();
|
||||
}
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||
self.windows.insert(start, end);
|
||||
}
|
||||
|
||||
pub fn add_windows(&mut self, windows: BTreeMap<Timestamp, Option<Timestamp>>) {
|
||||
for (start, end) in windows {
|
||||
self.windows.insert(start, end);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clean all dirty time windows, useful when can't found time window expr
|
||||
pub fn clean(&mut self) {
|
||||
self.windows.clear();
|
||||
@@ -178,7 +218,7 @@ impl DirtyTimeWindows {
|
||||
window_cnt: usize,
|
||||
flow_id: FlowId,
|
||||
task_ctx: Option<&BatchingTask>,
|
||||
) -> Result<Option<datafusion_expr::Expr>, Error> {
|
||||
) -> Result<WorkingTimeWindows, Error> {
|
||||
debug!(
|
||||
"expire_lower_bound: {:?}, window_size: {:?}",
|
||||
expire_lower_bound.map(|t| t.to_iso8601_string()),
|
||||
@@ -301,7 +341,7 @@ impl DirtyTimeWindows {
|
||||
.observe(stalled_time_range.num_seconds() as f64);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in to_be_query.into_iter() {
|
||||
for (start, end) in to_be_query.clone().into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
@@ -333,7 +373,12 @@ impl DirtyTimeWindows {
|
||||
expr_lst.push(expr);
|
||||
}
|
||||
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
|
||||
Ok(expr)
|
||||
|
||||
let working = WorkingTimeWindows {
|
||||
windows: to_be_query,
|
||||
filter: expr,
|
||||
};
|
||||
Ok(working)
|
||||
}
|
||||
|
||||
fn align_time_window(
|
||||
@@ -629,7 +674,8 @@ mod test {
|
||||
0,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
.unwrap()
|
||||
.filter;
|
||||
|
||||
let unparser = datafusion::sql::unparser::Unparser::default();
|
||||
let to_sql = filter_expr
|
||||
|
||||
@@ -46,7 +46,7 @@ use tokio::time::Instant;
|
||||
|
||||
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
|
||||
use crate::batching_mode::state::{DirtyTimeWindows, TaskState, WorkingTimeWindows};
|
||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||
use crate::batching_mode::utils::{
|
||||
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
|
||||
@@ -61,9 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT,
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -110,6 +110,13 @@ enum QueryType {
|
||||
Sql,
|
||||
}
|
||||
|
||||
/// A plan with working time windows, used to track the time windows that are currently being processed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PlanWithWindow {
|
||||
pub plan: Option<LogicalPlan>,
|
||||
pub working_windows: WorkingTimeWindows,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BatchingTask {
|
||||
pub config: Arc<TaskConfig>,
|
||||
@@ -218,30 +225,29 @@ impl BatchingTask {
|
||||
engine: &QueryEngineRef,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await? {
|
||||
if let Some(new_query) = self.gen_insert_plan(engine).await?.plan {
|
||||
debug!("Generate new query: {}", new_query);
|
||||
self.execute_logical_plan(frontend_client, &new_query).await
|
||||
self.execute_logical_plan(frontend_client, &new_query)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
debug!("Generate no query");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn gen_insert_plan(
|
||||
&self,
|
||||
engine: &QueryEngineRef,
|
||||
) -> Result<Option<LogicalPlan>, Error> {
|
||||
pub async fn gen_insert_plan(&self, engine: &QueryEngineRef) -> Result<PlanWithWindow, Error> {
|
||||
let (table, df_schema) = get_table_info_df_schema(
|
||||
self.config.catalog_manager.clone(),
|
||||
self.config.sink_table_name.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_query = self
|
||||
let new_query_info = self
|
||||
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
|
||||
.await?;
|
||||
|
||||
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
|
||||
let insert_into = if let Some(new_query) = new_query_info.plan {
|
||||
// first check if all columns in input query exists in sink table
|
||||
// since insert into ref to names in record batch generate by given query
|
||||
let table_columns = df_schema
|
||||
@@ -272,12 +278,15 @@ impl BatchingTask {
|
||||
Arc::new(new_query),
|
||||
))
|
||||
} else {
|
||||
return Ok(None);
|
||||
return Ok(new_query_info);
|
||||
};
|
||||
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
|
||||
context: "Failed to recompute schema",
|
||||
})?;
|
||||
Ok(Some(insert_into))
|
||||
Ok(PlanWithWindow {
|
||||
plan: Some(insert_into),
|
||||
working_windows: new_query_info.working_windows,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_table(
|
||||
@@ -297,7 +306,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
frontend_client: &Arc<FrontendClient>,
|
||||
plan: &LogicalPlan,
|
||||
) -> Result<Option<(u32, Duration)>, Error> {
|
||||
) -> Result<(u32, Duration), Error> {
|
||||
let instant = Instant::now();
|
||||
let flow_id = self.config.flow_id;
|
||||
|
||||
@@ -410,7 +419,7 @@ impl BatchingTask {
|
||||
|
||||
let res = res?;
|
||||
|
||||
Ok(Some((res, elapsed)))
|
||||
Ok((res, elapsed))
|
||||
}
|
||||
|
||||
/// start executing query in a loop, break when receive shutdown signal
|
||||
@@ -443,7 +452,7 @@ impl BatchingTask {
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
let new_query_info = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
Err(err) => {
|
||||
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
|
||||
@@ -453,8 +462,10 @@ impl BatchingTask {
|
||||
}
|
||||
};
|
||||
|
||||
let res = if let Some(new_query) = &new_query {
|
||||
self.execute_logical_plan(&frontend_client, new_query).await
|
||||
let res = if let Some(new_query) = &new_query_info.plan {
|
||||
self.execute_logical_plan(&frontend_client, new_query)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
};
|
||||
@@ -463,7 +474,17 @@ impl BatchingTask {
|
||||
// normal execute, sleep for some time before doing next query
|
||||
Ok(Some(_)) => {
|
||||
let sleep_until = {
|
||||
let state = self.state.write().unwrap();
|
||||
let mut state = self.state.write().unwrap();
|
||||
|
||||
// double cur_filter_cnt
|
||||
state.cur_filter_cnt = state.cur_filter_cnt.saturating_mul(2).min(
|
||||
state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
|
||||
);
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.set(state.cur_filter_cnt as i64);
|
||||
|
||||
state.get_next_start_query_time(
|
||||
self.config.flow_id,
|
||||
@@ -491,7 +512,7 @@ impl BatchingTask {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
match new_query_info.plan {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
}
|
||||
@@ -499,6 +520,17 @@ impl BatchingTask {
|
||||
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
|
||||
}
|
||||
}
|
||||
{
|
||||
// return working windows to dirty windows, and reset current filter cnt so next time we generate query only generate a small query
|
||||
let mut state = self.state.write().unwrap();
|
||||
state
|
||||
.dirty_time_windows
|
||||
.add_windows(new_query_info.working_windows.windows);
|
||||
state.cur_filter_cnt = 1;
|
||||
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.set(state.cur_filter_cnt as i64);
|
||||
}
|
||||
// also sleep for a little while before try again to prevent flooding logs
|
||||
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||
}
|
||||
@@ -527,7 +559,7 @@ impl BatchingTask {
|
||||
&self,
|
||||
engine: QueryEngineRef,
|
||||
sink_table_schema: &Arc<Schema>,
|
||||
) -> Result<Option<(LogicalPlan, usize)>, Error> {
|
||||
) -> Result<PlanWithWindow, Error> {
|
||||
let query_ctx = self.state.read().unwrap().query_ctx.clone();
|
||||
let start = SystemTime::now();
|
||||
let since_the_epoch = start
|
||||
@@ -540,7 +572,6 @@ impl BatchingTask {
|
||||
.unwrap_or(u64::MIN);
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
let schema_len = self.config.output_schema.fields().len();
|
||||
|
||||
let expire_time_window_bound = self
|
||||
.config
|
||||
@@ -573,10 +604,12 @@ impl BatchingTask {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
return Ok(PlanWithWindow {
|
||||
plan: Some(plan),
|
||||
working_windows: WorkingTimeWindows::default(),
|
||||
});
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -598,16 +631,14 @@ impl BatchingTask {
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = {
|
||||
let working_windows = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let max_window_cnt = state
|
||||
.max_filter_num
|
||||
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||
let cur_wnd_cnt = state.cur_filter_cnt;
|
||||
state.dirty_time_windows.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
max_window_cnt,
|
||||
cur_wnd_cnt,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
@@ -616,7 +647,9 @@ impl BatchingTask {
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
working_windows
|
||||
.filter
|
||||
.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
@@ -624,13 +657,16 @@ impl BatchingTask {
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
let Some(expr) = &working_windows.filter else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
return Ok(PlanWithWindow {
|
||||
plan: None,
|
||||
working_windows,
|
||||
});
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let mut add_filter = AddFilterRewriter::new(expr.clone());
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan =
|
||||
@@ -646,7 +682,10 @@ impl BatchingTask {
|
||||
// only apply optimize after complex rewrite is done
|
||||
let new_plan = apply_df_optimizer(rewrite).await?;
|
||||
|
||||
Ok(Some((new_plan, schema_len)))
|
||||
Ok(PlanWithWindow {
|
||||
plan: Some(new_plan),
|
||||
working_windows,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -96,6 +96,12 @@ lazy_static! {
|
||||
&["flow_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"greptime_flow_batching_current_window_count",
|
||||
"flow batching engine current query window count per flow id",
|
||||
&["flow_id"]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_flow_batching_engine_guess_fe_load",
|
||||
|
||||
@@ -962,19 +962,22 @@ impl StreamContext {
|
||||
}
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
write!(f, "{{")?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"partition_count={} ({} memtable ranges, {} file {} ranges)",
|
||||
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
|
||||
self.ranges.len(),
|
||||
num_mem_ranges,
|
||||
self.input.num_files(),
|
||||
num_file_ranges,
|
||||
)?;
|
||||
if let Some(selector) = &self.input.series_row_selector {
|
||||
write!(f, ", selector={}", selector)?;
|
||||
write!(f, ", \"selector\":\"{}\"", selector)?;
|
||||
}
|
||||
if let Some(distribution) = &self.input.distribution {
|
||||
write!(f, ", distribution={}", distribution)?;
|
||||
write!(f, ", \"distribution\":\"{}\"", distribution)?;
|
||||
}
|
||||
|
||||
if verbose {
|
||||
@@ -991,14 +994,15 @@ impl StreamContext {
|
||||
|
||||
impl fmt::Debug for FileWrapper<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let (start, end) = self.file.time_range();
|
||||
write!(
|
||||
f,
|
||||
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
|
||||
r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
|
||||
self.file.file_id(),
|
||||
self.file.time_range().0.value(),
|
||||
self.file.time_range().0.unit(),
|
||||
self.file.time_range().1.value(),
|
||||
self.file.time_range().1.unit(),
|
||||
start.value(),
|
||||
start.unit(),
|
||||
end.value(),
|
||||
end.unit(),
|
||||
self.file.num_rows(),
|
||||
self.file.size(),
|
||||
self.file.index_size()
|
||||
@@ -1014,25 +1018,22 @@ impl StreamContext {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let output_schema = self.input.mapper.output_schema();
|
||||
if !output_schema.is_empty() {
|
||||
write!(f, ", projection=")?;
|
||||
f.debug_list()
|
||||
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
|
||||
.finish()?;
|
||||
let names: Vec<_> = output_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|col| &col.name)
|
||||
.collect();
|
||||
write!(f, ", \"projection\": {:?}", names)?;
|
||||
}
|
||||
if let Some(predicate) = &self.input.predicate.predicate() {
|
||||
if !predicate.exprs().is_empty() {
|
||||
write!(f, ", filters=[")?;
|
||||
for (i, expr) in predicate.exprs().iter().enumerate() {
|
||||
if i == predicate.exprs().len() - 1 {
|
||||
write!(f, "{}]", expr)?;
|
||||
} else {
|
||||
write!(f, "{}, ", expr)?;
|
||||
}
|
||||
}
|
||||
let exprs: Vec<_> =
|
||||
predicate.exprs().iter().map(|e| e.to_string()).collect();
|
||||
write!(f, ", \"filters\": {:?}", exprs)?;
|
||||
}
|
||||
}
|
||||
if !self.input.files.is_empty() {
|
||||
write!(f, ", files=")?;
|
||||
write!(f, ", \"files\": ")?;
|
||||
f.debug_list()
|
||||
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
|
||||
.finish()?;
|
||||
|
||||
@@ -142,36 +142,36 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
|
||||
write!(
|
||||
f,
|
||||
"{{prepare_scan_cost={prepare_scan_cost:?}, \
|
||||
build_reader_cost={build_reader_cost:?}, \
|
||||
scan_cost={scan_cost:?}, \
|
||||
convert_cost={convert_cost:?}, \
|
||||
yield_cost={yield_cost:?}, \
|
||||
total_cost={total_cost:?}, \
|
||||
num_rows={num_rows}, \
|
||||
num_batches={num_batches}, \
|
||||
num_mem_ranges={num_mem_ranges}, \
|
||||
num_file_ranges={num_file_ranges}, \
|
||||
build_parts_cost={build_parts_cost:?}, \
|
||||
rg_total={rg_total}, \
|
||||
rg_fulltext_filtered={rg_fulltext_filtered}, \
|
||||
rg_inverted_filtered={rg_inverted_filtered}, \
|
||||
rg_minmax_filtered={rg_minmax_filtered}, \
|
||||
rg_bloom_filtered={rg_bloom_filtered}, \
|
||||
rows_before_filter={rows_before_filter}, \
|
||||
rows_fulltext_filtered={rows_fulltext_filtered}, \
|
||||
rows_inverted_filtered={rows_inverted_filtered}, \
|
||||
rows_bloom_filtered={rows_bloom_filtered}, \
|
||||
rows_precise_filtered={rows_precise_filtered}, \
|
||||
num_sst_record_batches={num_sst_record_batches}, \
|
||||
num_sst_batches={num_sst_batches}, \
|
||||
num_sst_rows={num_sst_rows}, \
|
||||
first_poll={first_poll:?}, \
|
||||
num_series_send_timeout={num_series_send_timeout}, \
|
||||
num_distributor_rows={num_distributor_rows}, \
|
||||
num_distributor_batches={num_distributor_batches}, \
|
||||
distributor_scan_cost={distributor_scan_cost:?}, \
|
||||
distributor_yield_cost={distributor_yield_cost:?}}},"
|
||||
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
|
||||
\"build_reader_cost\":\"{build_reader_cost:?}\", \
|
||||
\"scan_cost\":\"{scan_cost:?}\", \
|
||||
\"convert_cost\":\"{convert_cost:?}\", \
|
||||
\"yield_cost\":\"{yield_cost:?}\", \
|
||||
\"total_cost\":\"{total_cost:?}\", \
|
||||
\"num_rows\":{num_rows}, \
|
||||
\"num_batches\":{num_batches}, \
|
||||
\"num_mem_ranges\":{num_mem_ranges}, \
|
||||
\"num_file_ranges\":{num_file_ranges}, \
|
||||
\"build_parts_cost\":\"{build_parts_cost:?}\", \
|
||||
\"rg_total\":{rg_total}, \
|
||||
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
|
||||
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
|
||||
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
|
||||
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
|
||||
\"rows_before_filter\":{rows_before_filter}, \
|
||||
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
|
||||
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
|
||||
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
|
||||
\"rows_precise_filtered\":{rows_precise_filtered}, \
|
||||
\"num_sst_record_batches\":{num_sst_record_batches}, \
|
||||
\"num_sst_batches\":{num_sst_batches}, \
|
||||
\"num_sst_rows\":{num_sst_rows}, \
|
||||
\"first_poll\":\"{first_poll:?}\", \
|
||||
\"num_series_send_timeout\":{num_series_send_timeout}, \
|
||||
\"num_distributor_rows\":{num_distributor_rows}, \
|
||||
\"num_distributor_batches\":{num_distributor_batches}, \
|
||||
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
|
||||
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -390,10 +390,11 @@ impl PartitionMetricsList {
|
||||
/// Format verbose metrics for each partition for explain.
|
||||
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let list = self.0.lock().unwrap();
|
||||
write!(f, ", metrics_per_partition: ")?;
|
||||
write!(f, ", \"metrics_per_partition\": ")?;
|
||||
f.debug_list()
|
||||
.entries(list.iter().filter_map(|p| p.as_ref()))
|
||||
.finish()
|
||||
.finish()?;
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,7 +489,11 @@ impl PartitionMetrics {
|
||||
impl fmt::Debug for PartitionMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let metrics = self.0.metrics.lock().unwrap();
|
||||
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
|
||||
write!(
|
||||
f,
|
||||
r#"{{"partition":{}, "metrics":{:?}}}"#,
|
||||
self.0.partition, metrics
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -478,6 +478,21 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
|
||||
let mut state = self.state.session_state();
|
||||
state.config_mut().set_extension(query_ctx.clone());
|
||||
// note that hints in "x-greptime-hints" is automatically parsed
|
||||
// and set to query context's extension, so we can get it from query context.
|
||||
if let Some(parallelism) = query_ctx.extension("query_parallelism") {
|
||||
if let Ok(n) = parallelism.parse::<u64>() {
|
||||
if n > 0 {
|
||||
let new_cfg = state.config().clone().with_target_partitions(n as usize);
|
||||
*state.config_mut() = new_cfg;
|
||||
}
|
||||
} else {
|
||||
common_telemetry::warn!(
|
||||
"Failed to parse query_parallelism: {}, using default value",
|
||||
parallelism
|
||||
);
|
||||
}
|
||||
}
|
||||
QueryEngineContext::new(state, query_ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -148,12 +149,13 @@ struct PlanRewriter {
|
||||
level: usize,
|
||||
/// Simulated stack for the `rewrite` recursion
|
||||
stack: Vec<(LogicalPlan, usize)>,
|
||||
/// Stages to be expanded
|
||||
/// Stages to be expanded, will be added as parent node of merge scan one by one
|
||||
stage: Vec<LogicalPlan>,
|
||||
status: RewriterStatus,
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
expand_on_next_call: bool,
|
||||
}
|
||||
|
||||
impl PlanRewriter {
|
||||
@@ -174,6 +176,10 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -190,12 +196,20 @@ impl PlanRewriter {
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
Commutativity::TransformedCommutative(transformer) => {
|
||||
Commutativity::TransformedCommutative {
|
||||
transformer,
|
||||
expand_on_parent,
|
||||
} => {
|
||||
if let Some(transformer) = transformer
|
||||
&& let Some(plan) = transformer(plan)
|
||||
&& let Some(changed_plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.stage.push(plan)
|
||||
debug!("PlanRewriter: transformed plan: {changed_plan:#?} from {plan}");
|
||||
if let Some(last_stage) = changed_plan.last() {
|
||||
// update the column requirements from the last stage
|
||||
self.update_column_requirements(last_stage);
|
||||
}
|
||||
self.stage.extend(changed_plan.into_iter().rev());
|
||||
self.expand_on_next_call = expand_on_parent;
|
||||
}
|
||||
}
|
||||
Commutativity::NonCommutative
|
||||
@@ -391,10 +405,21 @@ impl TreeNodeRewriter for PlanRewriter {
|
||||
return Ok(Transformed::yes(node));
|
||||
};
|
||||
|
||||
let parent = parent.clone();
|
||||
|
||||
// TODO(ruihang): avoid this clone
|
||||
if self.should_expand(&parent.clone()) {
|
||||
if self.should_expand(&parent) {
|
||||
// TODO(ruihang): does this work for nodes with multiple children?;
|
||||
let node = self.expand(node)?;
|
||||
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
|
||||
let node = self.expand(node);
|
||||
debug!(
|
||||
"PlanRewriter: expanded plan: {}",
|
||||
match &node {
|
||||
Ok(n) => n.to_string(),
|
||||
Err(e) => format!("Error expanding plan: {e}"),
|
||||
}
|
||||
);
|
||||
let node = node?;
|
||||
self.pop_stack();
|
||||
return Ok(Transformed::yes(node));
|
||||
}
|
||||
|
||||
@@ -15,7 +15,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use common_function::aggrs::approximate::hll::{HllState, HLL_NAME};
|
||||
use common_function::aggrs::approximate::uddsketch::{UddSketchState, UDDSKETCH_STATE_NAME};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::functions_aggregate::sum::sum_udaf;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::{Expr, LogicalPlan, Projection, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
};
|
||||
@@ -23,12 +28,190 @@ use promql::extension_plan::{
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
|
||||
/// generate the upper aggregation plan that will execute on the frontend.
|
||||
/// Basically a logical plan resembling the following:
|
||||
/// Projection:
|
||||
/// Aggregate:
|
||||
///
|
||||
/// from Aggregate
|
||||
///
|
||||
/// The upper Projection exists sole to make sure parent plan can recognize the output
|
||||
/// of the upper aggregation plan.
|
||||
pub fn step_aggr_to_upper_aggr(
|
||||
aggr_plan: &LogicalPlan,
|
||||
) -> datafusion_common::Result<Vec<LogicalPlan>> {
|
||||
let LogicalPlan::Aggregate(input_aggr) = aggr_plan else {
|
||||
return Err(datafusion_common::DataFusionError::Plan(
|
||||
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
|
||||
));
|
||||
};
|
||||
if !is_all_aggr_exprs_steppable(&input_aggr.aggr_expr) {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Some aggregate expressions are not steppable".to_string(),
|
||||
));
|
||||
}
|
||||
let mut upper_aggr_expr = vec![];
|
||||
for aggr_expr in &input_aggr.aggr_expr {
|
||||
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Aggregate function not found".to_string(),
|
||||
));
|
||||
};
|
||||
let col_name = aggr_expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
let upper_func = match aggr_func.func.name() {
|
||||
"sum" | "min" | "max" | "last_value" | "first_value" => {
|
||||
// aggr_calc(aggr_merge(input_column))) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
"count" => {
|
||||
// sum(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = sum_udaf();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
UDDSKETCH_STATE_NAME => {
|
||||
// udd_merge(bucket_size, error_rate input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
|
||||
new_aggr_func.args[2] = input_column.clone();
|
||||
new_aggr_func
|
||||
}
|
||||
HLL_NAME => {
|
||||
// hll_merge(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
_ => {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
|
||||
"Aggregate function {} is not supported for Step aggregation",
|
||||
aggr_func.func.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// deal with nested alias case
|
||||
let mut new_aggr_expr = aggr_expr.clone();
|
||||
{
|
||||
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
|
||||
*new_aggr_func = upper_func;
|
||||
}
|
||||
|
||||
upper_aggr_expr.push(new_aggr_expr);
|
||||
}
|
||||
let mut new_aggr = input_aggr.clone();
|
||||
// use lower aggregate plan as input, this will be replace by merge scan plan later
|
||||
new_aggr.input = Arc::new(LogicalPlan::Aggregate(input_aggr.clone()));
|
||||
|
||||
new_aggr.aggr_expr = upper_aggr_expr;
|
||||
|
||||
// group by expr also need to be all ref by column to avoid duplicated computing
|
||||
let mut new_group_expr = new_aggr.group_expr.clone();
|
||||
for expr in &mut new_group_expr {
|
||||
if let Expr::Column(_) = expr {
|
||||
// already a column, no need to change
|
||||
continue;
|
||||
}
|
||||
let col_name = expr.qualified_name();
|
||||
let input_column = Expr::Column(datafusion_common::Column::new(col_name.0, col_name.1));
|
||||
*expr = input_column;
|
||||
}
|
||||
new_aggr.group_expr = new_group_expr.clone();
|
||||
|
||||
let mut new_projection_exprs = new_group_expr;
|
||||
// the upper aggr expr need to be aliased to the input aggr expr's name,
|
||||
// so that the parent plan can recognize it.
|
||||
for (lower_aggr_expr, upper_aggr_expr) in
|
||||
input_aggr.aggr_expr.iter().zip(new_aggr.aggr_expr.iter())
|
||||
{
|
||||
let lower_col_name = lower_aggr_expr.qualified_name();
|
||||
let (table, col_name) = upper_aggr_expr.qualified_name();
|
||||
let aggr_out_column = Column::new(table, col_name);
|
||||
let aliased_output_aggr_expr =
|
||||
Expr::Column(aggr_out_column).alias_qualified(lower_col_name.0, lower_col_name.1);
|
||||
new_projection_exprs.push(aliased_output_aggr_expr);
|
||||
}
|
||||
let upper_aggr_plan = LogicalPlan::Aggregate(new_aggr);
|
||||
debug!("Before recompute schema: {upper_aggr_plan:?}");
|
||||
let upper_aggr_plan = upper_aggr_plan.recompute_schema()?;
|
||||
debug!("After recompute schema: {upper_aggr_plan:?}");
|
||||
// create a projection on top of the new aggregate plan
|
||||
let new_projection =
|
||||
Projection::try_new(new_projection_exprs, Arc::new(upper_aggr_plan.clone()))?;
|
||||
let projection = LogicalPlan::Projection(new_projection);
|
||||
// return the new logical plan
|
||||
Ok(vec![projection, upper_aggr_plan])
|
||||
}
|
||||
|
||||
/// Check if the given aggregate expression is steppable.
|
||||
/// As in if it can be split into multiple steps:
|
||||
/// i.e. on datanode first call `state(input)` then
|
||||
/// on frontend call `calc(merge(state))` to get the final result.
|
||||
///
|
||||
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
|
||||
let step_action = HashSet::from([
|
||||
"sum",
|
||||
"count",
|
||||
"min",
|
||||
"max",
|
||||
"first_value",
|
||||
"last_value",
|
||||
UDDSKETCH_STATE_NAME,
|
||||
HLL_NAME,
|
||||
]);
|
||||
aggr_exprs.iter().all(|expr| {
|
||||
if let Some(aggr_func) = get_aggr_func(expr) {
|
||||
if aggr_func.distinct {
|
||||
// Distinct aggregate functions are not steppable(yet).
|
||||
return false;
|
||||
}
|
||||
step_action.contains(aggr_func.func.name())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &mut alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Commutativity {
|
||||
Commutative,
|
||||
PartialCommutative,
|
||||
ConditionalCommutative(Option<Transformer>),
|
||||
TransformedCommutative(Option<Transformer>),
|
||||
TransformedCommutative {
|
||||
/// Return plans from parent to child order
|
||||
transformer: Option<StageTransformer>,
|
||||
/// whether the transformer changes the child to parent
|
||||
expand_on_parent: bool,
|
||||
},
|
||||
NonCommutative,
|
||||
Unimplemented,
|
||||
/// For unrelated plans like DDL
|
||||
@@ -55,7 +238,21 @@ impl Categorizer {
|
||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Aggregate(aggr) => {
|
||||
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
|
||||
let matches_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
|
||||
if !matches_partition && is_all_steppable {
|
||||
debug!("Plan is steppable: {plan}");
|
||||
return Commutativity::TransformedCommutative {
|
||||
transformer: Some(Arc::new(|plan: &LogicalPlan| {
|
||||
debug!("Before Step optimize: {plan}");
|
||||
let ret = step_aggr_to_upper_aggr(plan);
|
||||
debug!("After Step Optimize: {ret:?}");
|
||||
ret.ok()
|
||||
})),
|
||||
expand_on_parent: true,
|
||||
};
|
||||
}
|
||||
if !matches_partition {
|
||||
return Commutativity::NonCommutative;
|
||||
}
|
||||
for expr in &aggr.aggr_expr {
|
||||
@@ -169,7 +366,8 @@ impl Categorizer {
|
||||
| Expr::Negative(_)
|
||||
| Expr::Between(_)
|
||||
| Expr::Exists(_)
|
||||
| Expr::InList(_) => Commutativity::Commutative,
|
||||
| Expr::InList(_)
|
||||
| Expr::Case(_) => Commutativity::Commutative,
|
||||
Expr::ScalarFunction(_udf) => Commutativity::Commutative,
|
||||
Expr::AggregateFunction(_udaf) => Commutativity::Commutative,
|
||||
|
||||
@@ -177,7 +375,6 @@ impl Categorizer {
|
||||
| Expr::SimilarTo(_)
|
||||
| Expr::IsUnknown(_)
|
||||
| Expr::IsNotUnknown(_)
|
||||
| Expr::Case(_)
|
||||
| Expr::Cast(_)
|
||||
| Expr::TryCast(_)
|
||||
| Expr::WindowFunction(_)
|
||||
@@ -217,6 +414,8 @@ impl Categorizer {
|
||||
|
||||
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
|
||||
|
||||
pub type StageTransformer = Arc<dyn Fn(&LogicalPlan) -> Option<Vec<LogicalPlan>>>;
|
||||
|
||||
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
|
||||
Some(plan.clone())
|
||||
}
|
||||
|
||||
@@ -14,10 +14,12 @@
|
||||
|
||||
// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d`
|
||||
pub const HINTS_KEY: &str = "x-greptime-hints";
|
||||
/// Deprecated, use `HINTS_KEY` instead. Notes if "x-greptime-hints" is set, keys with this prefix will be ignored.
|
||||
pub const HINTS_KEY_PREFIX: &str = "x-greptime-hint-";
|
||||
|
||||
pub const READ_PREFERENCE_HINT: &str = "read_preference";
|
||||
|
||||
/// Deprecated, use `HINTS_KEY` instead.
|
||||
pub const HINT_KEYS: [&str; 7] = [
|
||||
"x-greptime-hint-auto_create_table",
|
||||
"x-greptime-hint-ttl",
|
||||
|
||||
@@ -44,7 +44,7 @@ explain analyze SELECT count(*) FROM system_metrics;
|
||||
|_|_|_AggregateExec: mode=Final, gby=[], aggr=[count(system_REDACTED
|
||||
|_|_|_CoalescePartitionsExec REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[count(system_REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
|
||||
59
tests/cases/distributed/explain/how_alias_done.sql
Normal file
59
tests/cases/distributed/explain/how_alias_done.sql
Normal file
@@ -0,0 +1,59 @@
|
||||
CREATE TABLE IF NOT EXISTS base_table (
|
||||
"time" TIMESTAMP(3) NOT NULL,
|
||||
env STRING NULL,
|
||||
service_name STRING NULL,
|
||||
city STRING NULL,
|
||||
page STRING NULL,
|
||||
lcp BIGINT NULL,
|
||||
fmp BIGINT NULL,
|
||||
fcp BIGINT NULL,
|
||||
fp BIGINT NULL,
|
||||
tti BIGINT NULL,
|
||||
fid BIGINT NULL,
|
||||
shard_key BIGINT NULL,
|
||||
TIME INDEX ("time"),
|
||||
PRIMARY KEY (env, service_name)
|
||||
) PARTITION ON COLUMNS (shard_key) (
|
||||
shard_key < 4,
|
||||
shard_key >= 4
|
||||
AND shard_key < 8,
|
||||
shard_key >= 8
|
||||
AND shard_key < 12,
|
||||
shard_key >= 12
|
||||
AND shard_key < 16,
|
||||
shard_key >= 16
|
||||
AND shard_key < 20,
|
||||
shard_key >= 20
|
||||
AND shard_key < 24,
|
||||
shard_key >= 24
|
||||
AND shard_key < 28,
|
||||
shard_key >= 28
|
||||
AND shard_key < 32,
|
||||
shard_key >= 32
|
||||
AND shard_key < 36,
|
||||
shard_key >= 36
|
||||
AND shard_key < 40,
|
||||
shard_key >= 40
|
||||
AND shard_key < 44,
|
||||
shard_key >= 44
|
||||
AND shard_key < 48,
|
||||
shard_key >= 48
|
||||
AND shard_key < 52,
|
||||
shard_key >= 52
|
||||
AND shard_key < 56,
|
||||
shard_key >= 56
|
||||
AND shard_key < 60,
|
||||
shard_key >= 60
|
||||
) ENGINE = mito WITH(
|
||||
'append_mode' = 'true',
|
||||
'compaction.twcs.max_output_file_size' = "2GB",
|
||||
'compaction.twcs.time_window' = "1h",
|
||||
'compaction.type' = "twcs",
|
||||
);
|
||||
|
||||
EXPLAIN SELECT count(*) from base_table where time >= now();
|
||||
|
||||
-- ERROR: Internal error: 1003
|
||||
EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now();
|
||||
|
||||
DROP TABLE base_table;
|
||||
@@ -111,7 +111,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
|_|_|_WindowedSortExec: expr=test_pk.t__temp__0@2 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=test_pk.t__temp__0@2 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as t, t@1 as test_pk.t__temp__0] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -133,7 +133,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMI
|
||||
|_|_|_SortPreservingMergeExec: [t@1 DESC], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
409
tests/cases/distributed/explain/step_aggr.result
Normal file
409
tests/cases/distributed/explain/step_aggr.result
Normal file
@@ -0,0 +1,409 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: count(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: sum(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: min(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: max(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: hll(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
DROP TABLE integers;
|
||||
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
@@ -0,0 +1,80 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
90
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
90
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
@@ -0,0 +1,90 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING PRIMARY KEY,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-B', 7, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
|
||||
-- count
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
EXPLAIN
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
EXPLAIN ANALYZE
|
||||
SELECT
|
||||
date_bin('1 hour' :: INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour' :: INTERVAL, ts);
|
||||
|
||||
DROP TABLE integers;
|
||||
604
tests/cases/distributed/explain/try_analyze.sql
Normal file
604
tests/cases/distributed/explain/try_analyze.sql
Normal file
@@ -0,0 +1,604 @@
|
||||
-- base table, massive row insert to this table
|
||||
CREATE TABLE IF NOT EXISTS base_table (
|
||||
"time" TIMESTAMP(3) NOT NULL,
|
||||
env STRING NULL,
|
||||
service_name STRING NULL,
|
||||
city STRING NULL,
|
||||
page STRING NULL,
|
||||
lcp BIGINT NULL,
|
||||
fmp BIGINT NULL,
|
||||
fcp BIGINT NULL,
|
||||
fp BIGINT NULL,
|
||||
tti BIGINT NULL,
|
||||
fid BIGINT NULL,
|
||||
shard_key BIGINT NULL,
|
||||
TIME INDEX ("time"),
|
||||
PRIMARY KEY (env, service_name)
|
||||
) PARTITION ON COLUMNS (shard_key) (
|
||||
shard_key < 4,
|
||||
shard_key >= 4
|
||||
AND shard_key < 8,
|
||||
shard_key >= 8
|
||||
AND shard_key < 12,
|
||||
shard_key >= 12
|
||||
AND shard_key < 16,
|
||||
shard_key >= 16
|
||||
AND shard_key < 20,
|
||||
shard_key >= 20
|
||||
AND shard_key < 24,
|
||||
shard_key >= 24
|
||||
AND shard_key < 28,
|
||||
shard_key >= 28
|
||||
AND shard_key < 32,
|
||||
shard_key >= 32
|
||||
AND shard_key < 36,
|
||||
shard_key >= 36
|
||||
AND shard_key < 40,
|
||||
shard_key >= 40
|
||||
AND shard_key < 44,
|
||||
shard_key >= 44
|
||||
AND shard_key < 48,
|
||||
shard_key >= 48
|
||||
AND shard_key < 52,
|
||||
shard_key >= 52
|
||||
AND shard_key < 56,
|
||||
shard_key >= 56
|
||||
AND shard_key < 60,
|
||||
shard_key >= 60
|
||||
) ENGINE = mito WITH(
|
||||
'append_mode' = 'true',
|
||||
'compaction.twcs.max_output_file_size' = "2GB",
|
||||
'compaction.twcs.time_window' = "1h",
|
||||
'compaction.type' = "twcs",
|
||||
);
|
||||
|
||||
EXPLAIN SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
|
||||
EXPLAIN ANALYZE SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
EXPLAIN SELECT
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS lcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_lcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN lcp > 0
|
||||
AND lcp < 3000000 THEN lcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_lcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fmp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fmp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fmp > 0
|
||||
AND fmp < 3000000 THEN fmp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fmp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fcp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fcp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fcp > 0
|
||||
AND fcp < 3000000 THEN fcp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fcp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fp_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fp,
|
||||
min(
|
||||
CASE
|
||||
WHEN fp > 0
|
||||
AND fp < 3000000 THEN fp
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fp,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS tti_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_tti,
|
||||
min(
|
||||
CASE
|
||||
WHEN tti > 0
|
||||
AND tti < 3000000 THEN tti
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_tti,
|
||||
uddsketch_state(
|
||||
128,
|
||||
0.01,
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS fid_state,
|
||||
max(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS max_fid,
|
||||
min(
|
||||
CASE
|
||||
WHEN fid > 0
|
||||
AND fid < 3000000 THEN fid
|
||||
ELSE NULL
|
||||
END
|
||||
) AS min_fid,
|
||||
max(shard_key) AS shard_key,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0)
|
||||
FROM
|
||||
base_table
|
||||
WHERE
|
||||
(
|
||||
(
|
||||
lcp > 0
|
||||
AND lcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fmp > 0
|
||||
AND fmp < 3000000
|
||||
)
|
||||
OR (
|
||||
fcp > 0
|
||||
AND fcp < 3000000
|
||||
)
|
||||
OR (
|
||||
fp > 0
|
||||
AND fp < 3000000
|
||||
)
|
||||
OR (
|
||||
tti > 0
|
||||
AND tti < 3000000
|
||||
)
|
||||
OR (
|
||||
fid > 0
|
||||
AND fid < 3000000
|
||||
)
|
||||
) AND time >= now()
|
||||
GROUP BY
|
||||
env,
|
||||
service_name,
|
||||
city,
|
||||
page,
|
||||
date_bin('60 seconds' :: INTERVAL, time) :: TIMESTAMP(0);
|
||||
|
||||
EXPLAIN SELECT count(*) from base_table where time >= now();
|
||||
|
||||
-- ERROR: Internal error: 1003
|
||||
EXPLAIN ANALYZE SELECT count(*) from base_table where time >= now();
|
||||
|
||||
DROP TABLE base_table;
|
||||
@@ -72,7 +72,7 @@ ORDER BY
|
||||
| 1_| 0_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time, peak_current@1 as peak_current] REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [collect_time_utc@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[collect_time_utc@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -121,7 +121,7 @@ ORDER BY
|
||||
| 1_| 0_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time_0, peak_current@1 as peak_current] REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [collect_time_utc@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[collect_time_utc@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -173,7 +173,7 @@ ORDER BY
|
||||
|_|_|_SortPreservingMergeExec: [collect_time@0 DESC] REDACTED
|
||||
|_|_|_WindowedSortExec: expr=collect_time@0 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_PartSortExec: expr=collect_time@0 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -226,7 +226,7 @@ ORDER BY
|
||||
|_|_|_WindowedSortExec: expr=collect_time@1 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_PartSortExec: expr=collect_time@1 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_ProjectionExec: expr=[collect_time_utc@1 as collect_time_utc, collect_time@0 as collect_time, peak_current@2 as peak_current] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -23,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -45,7 +45,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -66,7 +66,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -89,7 +89,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries", "projection": ["i", "j", "k"], "filters": ["j >= TimestampMillisecond(-300000, None)", "j <= TimestampMillisecond(310000, None)"], "REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -120,11 +120,11 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -151,7 +151,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_ProjectionExec: expr=[j@0 as j, prom_rate(j_range,i,test.j,Int64(10000))@1 as prom_rate(j_range,i,j,Int64(10000)), k@2 as k, l@3 as l] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
@@ -160,7 +160,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -98,7 +98,7 @@ EXPLAIN ANALYZE SELECT DISTINCT a FROM test ORDER BY a;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -32,14 +32,14 @@ select sum(val) from t group by host;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[sum(t.val)] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_ProjectionExec: expr=[sum(t.val)@1 as sum(t.val)] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[host@0 as host], aggr=[sum(t.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[sum(t.val)] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -62,9 +62,9 @@ select sum(val) from t;
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
@@ -90,9 +90,9 @@ select sum(val) from t group by idc;
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val, idc@3 as idc] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -302,13 +302,13 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
|
||||
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED
|
||||
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED
|
||||
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED
|
||||
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED
|
||||
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -70,7 +70,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -103,7 +103,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -136,7 +136,7 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=REDACTED fetch=4 REDACTED
|
||||
|_|_|_FilterExec: i@0 > 2 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -170,7 +170,7 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=4 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=4 REDACTED
|
||||
|_|_|_FilterExec: i@0 > 2 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -203,7 +203,7 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=4 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=4 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":2, "mem_ranges":1, "files":1, "file_ranges":1} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -285,7 +285,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -307,7 +307,7 @@ EXPLAIN ANALYZE VERBOSE SELECT * FROM test_pk ORDER BY t LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges), projection=["pk", "i", "t"], REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":4, "mem_ranges":1, "REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -340,7 +340,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 DESC], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -374,7 +374,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -396,7 +396,7 @@ EXPLAIN ANALYZE VERBOSE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges), projection=["pk", "i", "t"], filters=[pk > Int32(7)], REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":4, "mem_ranges":1, "REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -78,7 +78,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
|
||||
|_|_|_CoalescePartitionsExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 10_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -89,7 +89,7 @@ explain analyze select * from demo where idc='idc1';
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
+-+-+-+
|
||||
@@ -115,7 +115,7 @@ explain analyze SELECT * FROM demo where host in ('test1');
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -62,7 +62,7 @@ EXPLAIN ANALYZE SELECT * FROM skipping_table WHERE id = 'id2' ORDER BY `name`;
|
||||
|_|_|_SortExec: expr=[name@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: id@1 = id2 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=3 (0 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":3, "mem_ranges":0, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
@@ -84,7 +84,7 @@ EXPLAIN ANALYZE SELECT * FROM skipping_table WHERE id = 'id5' ORDER BY `name`;
|
||||
|_|_|_SortExec: expr=[name@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: id@1 = id5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=3 (0 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":3, "mem_ranges":0, "files":3, "file_ranges":3} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -21,7 +21,7 @@ tql analyze (1, 3, '1s') t1{ a = "a" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
+-+-+-+
|
||||
@@ -41,7 +41,7 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 6_|
|
||||
+-+-+-+
|
||||
@@ -61,7 +61,7 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -47,7 +47,7 @@ TQL analyze (0, 10, '1s') sum by(job) (irate(cpu_usage{job="fire"}[5s])) / 1e9;
|
||||
|_|_|_FilterExec: job@0 = fire AND ts@2 >= -305000000000 AND ts@2 <= 310000000000 REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -35,7 +35,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -86,7 +86,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
@@ -94,7 +94,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -146,10 +146,10 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 ASC, l@3 ASC, j@1 ASC] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SortPreservingMergeExec: [k@2 ASC, l@3 ASC, j@1 ASC] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -46,7 +46,7 @@ explain analyze
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[last_value(t.host) ORDER BY [t.ts ASC NULLS LAST], last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST], last_value(t.val) ORDER BY [t.ts ASC NULLS LAST]] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), selector=LastRow REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "selector":"LastRow" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -104,7 +104,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts, t@1 as t] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
@@ -126,7 +126,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMI
|
||||
|_|_|_WindowedSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED limit=5 REDACTED
|
||||
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -72,7 +72,7 @@ ORDER BY
|
||||
| 1_| 0_|_SortPreservingMergeExec: [collect_time@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[collect_time@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time, peak_current@1 as peak_current] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -120,7 +120,7 @@ ORDER BY
|
||||
| 1_| 0_|_SortPreservingMergeExec: [collect_time_0@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[collect_time_0@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time_0, peak_current@1 as peak_current] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -172,7 +172,7 @@ ORDER BY
|
||||
|_|_|_WindowedSortExec: expr=true_collect_time@0 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_PartSortExec: expr=true_collect_time@0 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_ProjectionExec: expr=[collect_time@0 as true_collect_time, collect_time_utc@1 as collect_time, peak_current@2 as peak_current] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
@@ -224,7 +224,7 @@ ORDER BY
|
||||
|_|_|_WindowedSortExec: expr=true_collect_time@1 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_PartSortExec: expr=true_collect_time@1 DESC num_ranges=REDACTED REDACTED
|
||||
|_|_|_ProjectionExec: expr=[collect_time_utc@1 as collect_time, collect_time@0 as true_collect_time, peak_current@2 as peak_current] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 8_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -23,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -45,7 +45,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -66,7 +66,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -89,7 +89,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, {"partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries", "projection": ["i", "j", "k"], "filters": ["j >= TimestampMillisecond(-300000, None)", "j <= TimestampMillisecond(310000, None)"], "REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -120,11 +120,11 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -150,7 +150,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_rate(j_range,i,j,Int64(10000))@1 IS NOT NULL REDACTED
|
||||
@@ -158,7 +158,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
Reference in New Issue
Block a user