feat: allow cross-schema query in promql (#3545)

* feat: add __schema__ tag for promql parser

* feat: disable matcher op other than equals

* test: add more test to ensure context getting reset

* test: add integration test

* test: refactor tests

* refactor: remove duplicated test code

* refactor: update according to review comments

* test: add sqlness test for cross schema scenario

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ning Sun
2024-03-29 00:41:01 -07:00
committed by GitHub
parent f49cd0ca18
commit 500f9f10fc
6 changed files with 368 additions and 101 deletions

View File

@@ -135,6 +135,13 @@ pub enum Error {
operator: String,
location: Location,
},
#[snafu(display("Matcher operator {matcher_op} is not supported for {matcher}"))]
UnsupportedMatcherOp {
matcher_op: String,
matcher: String,
location: Location,
},
}
impl ErrorExt for Error {
@@ -157,6 +164,7 @@ impl ErrorExt for Error {
| DataFusionPlanning { .. }
| MultiFieldsNotSupported { .. }
| UnexpectedPlanExpr { .. }
| UnsupportedMatcherOp { .. }
| IllegalRange { .. } => StatusCode::InvalidArguments,
UnknownTable { .. } | EmptyRange { .. } => StatusCode::Internal,

View File

@@ -49,8 +49,8 @@ use crate::error::{
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu,
MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, Result,
TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu,
UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
ZeroRangeSelectorSnafu,
UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu,
UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
};
use crate::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
@@ -79,6 +79,9 @@ const DEFAULT_FIELD_COLUMN: &str = "value";
/// Special modifier to project field columns under multi-field mode
const FIELD_COLUMN_MATCHER: &str = "__field__";
/// Special modifier for cross schema query
const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
@@ -93,6 +96,7 @@ struct PromPlannerContext {
field_columns: Vec<String>,
tag_columns: Vec<String>,
field_column_matcher: Option<Vec<Matcher>>,
schema_name: Option<String>,
/// The range in millisecond of range selector. None if there is no range selector.
range: Option<Millisecond>,
}
@@ -115,9 +119,16 @@ impl PromPlannerContext {
self.field_columns = vec![];
self.tag_columns = vec![];
self.field_column_matcher = None;
self.schema_name = None;
self.range = None;
}
/// Reset table name and schema to empty
fn reset_table_name_and_schema(&mut self) {
self.table_name = Some(String::new());
self.schema_name = None;
}
/// Check if `le` is present in tag columns
fn has_le_tag(&self) -> bool {
self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
@@ -205,7 +216,7 @@ impl PromPlanner {
(Some(lhs), Some(rhs)) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.table_name = Some(String::new());
self.ctx.reset_table_name_and_schema();
let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
let mut field_expr = field_expr_builder(lhs, rhs)?;
@@ -292,16 +303,16 @@ impl PromPlanner {
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone()).await?;
let left_field_columns = self.ctx.field_columns.clone();
let mut left_table_ref = OwnedTableReference::bare(
self.ctx.table_name.clone().unwrap_or_default(),
);
let mut left_table_ref = self
.table_ref()
.unwrap_or_else(|_| OwnedTableReference::bare(""));
let left_context = self.ctx.clone();
let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
let right_field_columns = self.ctx.field_columns.clone();
let mut right_table_ref = OwnedTableReference::bare(
self.ctx.table_name.clone().unwrap_or_default(),
);
let mut right_table_ref = self
.table_ref()
.unwrap_or_else(|_| OwnedTableReference::bare(""));
let right_context = self.ctx.clone();
// TODO(ruihang): avoid join if left and right are the same table
@@ -375,7 +386,7 @@ impl PromPlanner {
PromExpr::NumberLiteral(NumberLiteral { val }) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.table_name = Some(String::new());
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(*val);
LogicalPlan::Extension(Extension {
@@ -395,7 +406,7 @@ impl PromPlanner {
PromExpr::StringLiteral(StringLiteral { val }) => {
self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
self.ctx.table_name = Some(String::new());
self.ctx.reset_table_name_and_schema();
let literal_expr = df_prelude::lit(val.to_string());
LogicalPlan::Extension(Extension {
@@ -489,7 +500,7 @@ impl PromPlanner {
self.prom_expr_to_plan(prom_expr).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.table_name = Some(String::new());
self.ctx.reset_table_name_and_schema();
LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
@@ -595,6 +606,15 @@ impl PromPlanner {
.field_column_matcher
.get_or_insert_default()
.push(matcher.clone());
} else if matcher.name == SCHEMA_COLUMN_MATCHER {
ensure!(
matcher.op == MatchOp::Equal,
UnsupportedMatcherOpSnafu {
matcher: matcher.name.to_string(),
matcher_op: matcher.op.to_string(),
}
);
self.ctx.schema_name = Some(matcher.value.clone());
} else if matcher.name != METRIC_NAME {
let _ = matchers.insert(matcher.clone());
}
@@ -609,8 +629,6 @@ impl PromPlanner {
label_matchers: Matchers,
is_range_selector: bool,
) -> Result<LogicalPlan> {
let table_name = self.ctx.table_name.clone().unwrap();
// make filter exprs
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
@@ -633,8 +651,9 @@ impl PromPlanner {
)));
// make table scan with filter exprs
let table_ref = self.table_ref()?;
let mut table_scan = self
.create_table_scan_plan(&table_name, scan_filters.clone())
.create_table_scan_plan(table_ref.clone(), scan_filters.clone())
.await?;
// make a projection plan if there is any `__field__` matcher
@@ -730,7 +749,9 @@ impl PromPlanner {
self.ctx
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu { table: table_name })?,
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?,
is_range_selector,
divide_plan,
);
@@ -838,16 +859,32 @@ impl PromPlanner {
Ok(exprs)
}
fn table_ref(&self) -> Result<OwnedTableReference> {
let table_name = self
.ctx
.table_name
.clone()
.context(TableNameNotFoundSnafu)?;
// set schema name if `__schema__` is given
let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
TableReference::partial(schema_name, &table_name)
} else {
TableReference::bare(&table_name)
};
Ok(table_ref.to_owned_reference())
}
/// Create a table scan plan and a filter plan with given filter.
///
/// # Panic
/// If the filter is empty
async fn create_table_scan_plan(
&mut self,
table_name: &str,
table_ref: OwnedTableReference,
filter: Vec<DfExpr>,
) -> Result<LogicalPlan> {
let table_ref = OwnedTableReference::bare(table_name.to_string());
let provider = self
.table_provider
.resolve_table(table_ref.clone())
@@ -865,14 +902,10 @@ impl PromPlanner {
/// Setup [PromPlannerContext]'s state fields.
async fn setup_context(&mut self) -> Result<()> {
let table_name = self
.ctx
.table_name
.clone()
.context(TableNameNotFoundSnafu)?;
let table_ref = self.table_ref()?;
let table = self
.table_provider
.resolve_table(TableReference::bare(&table_name))
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?
.as_any()
@@ -888,7 +921,9 @@ impl PromPlanner {
let time_index = table
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu { table: table_name })?
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.name
.clone();
self.ctx.time_index_column = Some(time_index);
@@ -1224,7 +1259,7 @@ impl PromPlanner {
}
utils::conjunction(exprs).context(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
table: self.table_ref()?.to_quoted_string(),
})
}
@@ -1354,7 +1389,7 @@ impl PromPlanner {
// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.table_name = Some(String::new());
self.ctx.reset_table_name_and_schema();
self.ctx.tag_columns = vec![];
self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
Ok(LogicalPlan::Extension(Extension {
@@ -2014,57 +2049,61 @@ mod test {
use super::*;
async fn build_test_table_provider(
table_name: String,
table_name_tuples: &[(String, String)],
num_tag: usize,
num_field: usize,
) -> DfTableSourceProvider {
let mut columns = vec![];
for i in 0..num_tag {
columns.push(ColumnSchema::new(
format!("tag_{i}"),
ConcreteDataType::string_datatype(),
false,
));
}
columns.push(
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
);
for i in 0..num_field {
columns.push(ColumnSchema::new(
format!("field_{i}"),
ConcreteDataType::float64_datatype(),
true,
));
}
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices((0..num_tag).collect())
.value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name)
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.is_ok());
for (schema_name, table_name) in table_name_tuples {
let mut columns = vec![];
for i in 0..num_tag {
columns.push(ColumnSchema::new(
format!("tag_{i}"),
ConcreteDataType::string_datatype(),
false,
));
}
columns.push(
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
);
for i in 0..num_field {
columns.push(ColumnSchema::new(
format!("field_{i}"),
ConcreteDataType::float64_datatype(),
true,
));
}
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices((0..num_tag).collect())
.value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(table_name.to_string())
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: schema_name.to_string(),
table_name: table_name.to_string(),
table_id: 1024,
table,
})
.is_ok());
}
DfTableSourceProvider::new(catalog_list, false, QueryContext::arc().as_ref())
}
@@ -2096,7 +2135,12 @@ mod test {
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
1,
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
.await
.unwrap();
@@ -2301,7 +2345,12 @@ mod test {
};
// test group by
let table_provider = build_test_table_provider("some_metric".to_string(), 2, 2).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
2,
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
@@ -2326,7 +2375,12 @@ mod test {
labels: vec![String::from("tag_1")].into_iter().collect(),
}));
}
let table_provider = build_test_table_provider("some_metric".to_string(), 2, 2).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
2,
2,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
.await
.unwrap();
@@ -2446,7 +2500,12 @@ mod test {
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
1,
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
.await
.unwrap();
@@ -2485,7 +2544,18 @@ mod test {
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider("some_metric".to_string(), 1, 1).await;
let table_provider = build_test_table_provider(
&[
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
(
"greptime_private".to_string(),
"some_alt_metric".to_string(),
),
],
1,
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
.await
.unwrap();
@@ -2724,7 +2794,12 @@ mod test {
for case in cases {
let prom_expr = parser::parse(case.0).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3,
3,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
@@ -2743,9 +2818,66 @@ mod test {
for case in bad_cases {
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider("some_metric".to_string(), 3, 3).await;
let table_provider = build_test_table_provider(
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3,
3,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone()).await;
assert!(plan.is_err(), "case: {:?}", case);
}
}
#[tokio::test]
async fn custom_schema() {
let query = "some_alt_metric{__schema__=\"greptime_private\"}";
let expected = String::from(
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: greptime_private.some_alt_metric.tag_0 DESC NULLS LAST, greptime_private.some_alt_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn only_equals_is_supported_for_special_matcher() {
let queries = &[
"some_alt_metric{__schema__!=\"greptime_private\"}",
"some_alt_metric{__schema__=~\"lalala\"}",
];
for query in queries {
let prom_expr = parser::parse(query).unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let table_provider = build_test_table_provider(
&[
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
(
"greptime_private".to_string(),
"some_alt_metric".to_string(),
),
],
1,
1,
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt).await;
assert!(plan.is_err(), "query: {:?}", query);
}
}
}

View File

@@ -110,7 +110,7 @@ impl QueryContext {
.build()
}
pub fn with_db_name(db_name: Option<&String>) -> QueryContextRef {
pub fn with_db_name(db_name: Option<&str>) -> QueryContextRef {
let (catalog, schema) = db_name
.map(|db| {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use common_query::Output;
use frontend::instance::Instance;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use rstest::rstest;
@@ -27,6 +28,35 @@ use super::test_util::{
};
use crate::tests::test_util::MockInstance;
#[allow(clippy::too_many_arguments)]
async fn promql_query(
ins: Arc<Instance>,
promql: &str,
query_ctx: Arc<QueryContext>,
start: SystemTime,
end: SystemTime,
interval: Duration,
lookback: Duration,
) -> operator::error::Result<Output> {
let query = PromQuery {
query: promql.to_string(),
..PromQuery::default()
};
let QueryStatement::Promql(mut eval_stmt) =
QueryLanguageParser::parse_promql(&query, &query_ctx).unwrap()
else {
unreachable!()
};
eval_stmt.start = start;
eval_stmt.end = end;
eval_stmt.interval = interval;
eval_stmt.lookback_delta = lookback;
ins.statement_executor()
.execute_stmt(QueryStatement::Promql(eval_stmt), query_ctx)
.await
}
#[allow(clippy::too_many_arguments)]
async fn create_insert_query_assert(
instance: Arc<Instance>,
@@ -54,25 +84,17 @@ async fn create_insert_query_assert(
let _ = v.unwrap();
});
let query = PromQuery {
query: promql.to_string(),
..PromQuery::default()
};
let QueryStatement::Promql(mut eval_stmt) =
QueryLanguageParser::parse_promql(&query, &QueryContext::arc()).unwrap()
else {
unreachable!()
};
eval_stmt.start = start;
eval_stmt.end = end;
eval_stmt.interval = interval;
eval_stmt.lookback_delta = lookback;
let query_output = instance
.statement_executor()
.execute_stmt(QueryStatement::Promql(eval_stmt), QueryContext::arc())
.await
.unwrap();
let query_output = promql_query(
instance,
promql,
QueryContext::arc(),
start,
end,
interval,
lookback,
)
.await
.unwrap();
check_unordered_output_stream(query_output, expected).await;
}
@@ -524,3 +546,85 @@ async fn binary_op_plain_columns(instance: Arc<dyn MockInstance>) {
)
.await;
}
#[apply(both_instances_cases)]
async fn cross_schema_query(instance: Arc<dyn MockInstance>) {
let ins = instance.frontend();
ins.do_query(
AGGREGATORS_CREATE_TABLE,
QueryContext::with_db_name(Some("greptime_private")),
)
.await
.into_iter()
.for_each(|v| {
let _ = v.unwrap();
});
ins.do_query(
AGGREGATORS_INSERT_DATA,
QueryContext::with_db_name(Some("greptime_private")),
)
.await
.into_iter()
.for_each(|v| {
let _ = v.unwrap();
});
let start = UNIX_EPOCH;
let end = unix_epoch_plus_100s();
let interval = Duration::from_secs(60);
let lookback_delta = Duration::from_secs(0);
let query_output = promql_query(
ins.clone(),
r#"http_requests{__schema__="greptime_private"}"#,
QueryContext::arc(),
start,
end,
interval,
lookback_delta,
)
.await
.unwrap();
let expected = r#"+------------+----------+------------+-------+---------------------+
| job | instance | group | value | ts |
+------------+----------+------------+-------+---------------------+
| api-server | 0 | production | 100.0 | 1970-01-01T00:00:00 |
| api-server | 0 | canary | 300.0 | 1970-01-01T00:00:00 |
| api-server | 1 | production | 200.0 | 1970-01-01T00:00:00 |
| api-server | 1 | canary | 400.0 | 1970-01-01T00:00:00 |
| app-server | 0 | canary | 700.0 | 1970-01-01T00:00:00 |
| app-server | 0 | production | 500.0 | 1970-01-01T00:00:00 |
| app-server | 1 | canary | 800.0 | 1970-01-01T00:00:00 |
| app-server | 1 | production | 600.0 | 1970-01-01T00:00:00 |
+------------+----------+------------+-------+---------------------+"#;
check_unordered_output_stream(query_output, expected).await;
let query_output = promql_query(
ins.clone(),
r#"http_requests"#,
QueryContext::arc(),
start,
end,
interval,
lookback_delta,
)
.await;
assert!(query_output.is_err());
let query_output = promql_query(
ins.clone(),
r#"http_requests"#,
QueryContext::with_db_name(Some("greptime_private")),
start,
end,
interval,
lookback_delta,
)
.await
.unwrap();
check_unordered_output_stream(query_output, expected).await;
}

View File

@@ -32,6 +32,23 @@ TQL EVAL (0, 10, '5s') {__name__="test"};
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+---------------------+---+
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__schema__="public"};
+-----+---------------------+---+
| i | j | k |
+-----+---------------------+---+
| 1.0 | 1970-01-01T00:00:05 | b |
| 1.0 | 1970-01-01T00:00:10 | b |
| 2.0 | 1970-01-01T00:00:05 | a |
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+---------------------+---+
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__schema__="greptime_private"};
Error: 4001(TableNotFound), Table not found: greptime.greptime_private.test
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') {__name__="test", __field__="i"};

View File

@@ -10,6 +10,12 @@ TQL EVAL (0, 10, '5s') test;
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') {__name__="test"};
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__schema__="public"};
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') test{__schema__="greptime_private"};
-- SQLNESS SORT_RESULT 2 1
TQL EVAL (0, 10, '5s') {__name__="test", __field__="i"};