perf: acclerate scatter query (#4607)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-08-26 11:03:30 +08:00
committed by GitHub
parent 3973d6b01f
commit da337a9635

View File

@@ -90,6 +90,12 @@ const FIELD_COLUMN_MATCHER: &str = "__field__";
/// Special modifier for cross schema query
const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
/// Threshold for scatter scan mode
const MAX_SCATTER_POINTS: i64 = 400;
/// Interval 1 hour in millisecond
const INTERVAL_1H: i64 = 60 * 60 * 1000;
#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
@@ -726,21 +732,10 @@ impl PromPlanner {
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let range_ms = self.ctx.range.unwrap_or_default();
let mut scan_filters = self.matchers_to_expr(label_matchers.clone())?;
scan_filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range_ms),
None,
),
)));
scan_filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
None,
),
)));
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
// make table scan with filter exprs
let table_ref = self.table_ref()?;
let mut table_scan = self
@@ -967,6 +962,54 @@ impl PromPlanner {
Ok(table_ref)
}
fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
let start = self.ctx.start;
let end = self.ctx.end;
let lookback_delta = self.ctx.lookback_delta;
let range = self.ctx.range.unwrap_or_default();
let interval = self.ctx.interval;
let time_index_expr = self.create_time_index_column_expr()?;
let num_points = (end - start) / interval;
// Scan a continuous time range
if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
let single_time_range = time_index_expr
.clone()
.gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
None,
)))
.and(
time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
None,
))),
);
return Ok(Some(single_time_range));
}
// Otherwise scan scatter ranges separately
let mut filters = Vec::with_capacity(num_points as usize);
for timestamp in (start..end).step_by(interval as usize) {
filters.push(
time_index_expr
.clone()
.gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration - lookback_delta - range),
None,
)))
.and(time_index_expr.clone().lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration + lookback_delta),
None,
),
))),
)
}
Ok(filters.into_iter().reduce(DfExpr::or))
}
/// Create a table scan plan and a filter plan with given filter.
///
/// # Panic