From 09747ea20674af43e3ed64895c22871835b76be7 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 14 Jun 2023 15:39:28 +0800 Subject: [PATCH] feat: use DataFrame to replace SQL for Prometheus remote read (#1774) * feat: debug QueryEngineState * feat: impl read_table to create DataFrame for a table * fix: clippy warnings * feat: use DataFrame to handle prometheus remote read quries * Update src/frontend/src/instance/prometheus.rs Co-authored-by: LFC * chore: CR comments --------- Co-authored-by: LFC --- Cargo.lock | 1 + src/frontend/src/error.rs | 22 ++++- src/frontend/src/instance/prometheus.rs | 68 ++++++++++--- src/query/src/dataframe.rs | 22 +++++ src/query/src/datafusion.rs | 59 +++++++++++- src/query/src/lib.rs | 1 + src/query/src/plan.rs | 15 ++- src/query/src/query_engine.rs | 10 ++ src/query/src/query_engine/state.rs | 14 ++- src/servers/Cargo.toml | 1 + src/servers/src/error.rs | 7 ++ src/servers/src/prometheus.rs | 122 ++++++++++++++++++------ 12 files changed, 292 insertions(+), 50 deletions(-) create mode 100644 src/query/src/dataframe.rs diff --git a/Cargo.lock b/Cargo.lock index 1d42daee07..93b9319b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8534,6 +8534,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "datafusion", "datatypes", "derive_builder 0.12.0", "digest", diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index fbc545714a..7aba14e118 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -279,6 +279,13 @@ pub enum Error { source: query::error::Error, }, + #[snafu(display("Failed to read table: {table_name}, source: {source}"))] + ReadTable { + table_name: String, + #[snafu(backtrace)] + source: query::error::Error, + }, + #[snafu(display("Failed to execute logical plan, source: {}", source))] ExecLogicalPlan { #[snafu(backtrace)] @@ -363,13 +370,22 @@ pub enum Error { }, // TODO(ruihang): merge all query execution error kinds - #[snafu(display("failed to execute PromQL query {}, source: {}", query, source))] + #[snafu(display("Failed to execute PromQL query {}, source: {}", query, source))] ExecutePromql { query: String, #[snafu(backtrace)] source: servers::error::Error, }, + #[snafu(display( + "Failed to create logical plan for prometheus query, source: {}", + source + ))] + PrometheusRemoteQueryPlan { + #[snafu(backtrace)] + source: servers::error::Error, + }, + #[snafu(display("Failed to describe schema for given statement, source: {}", source))] DescribeStatement { #[snafu(backtrace)] @@ -559,7 +575,8 @@ impl ErrorExt for Error { Error::HandleHeartbeatResponse { source, .. } => source.status_code(), Error::RuntimeResource { source, .. } => source.status_code(), - Error::ExecutePromql { source, .. } => source.status_code(), + Error::PrometheusRemoteQueryPlan { source, .. } + | Error::ExecutePromql { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), @@ -621,6 +638,7 @@ impl ErrorExt for Error { Error::ExecuteStatement { source, .. } | Error::PlanStatement { source } | Error::ParseQuery { source } + | Error::ReadTable { source, .. } | Error::ExecLogicalPlan { source } | Error::DescribeStatement { source } => source.status_code(), diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index f06db0fcff..3d85757a04 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -14,9 +14,8 @@ use api::prometheus::remote::read_request::ResponseType; use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; -use api::v1::greptime_request::Request; -use api::v1::{query_request, QueryRequest}; use async_trait::async_trait; +use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; @@ -25,11 +24,14 @@ use metrics::counter; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; -use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use crate::error::{ + CatalogSnafu, ExecLogicalPlanSnafu, PrometheusRemoteQueryPlanSnafu, ReadTableSnafu, Result, + TableNotFoundSnafu, +}; use crate::instance::Instance; use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES; @@ -75,6 +77,45 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult Result { + let table = self + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })?; + + let dataframe = self + .query_engine + .read_table(table) + .with_context(|_| ReadTableSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })?; + + let logical_plan = + prometheus::query_to_plan(dataframe, query).context(PrometheusRemoteQueryPlanSnafu)?; + + logging::debug!( + "Prometheus remote read, table: {}, logical plan: {}", + table_name, + logical_plan.display_indent(), + ); + + self.query_engine + .execute(logical_plan, ctx.clone()) + .await + .context(ExecLogicalPlanSnafu) + } + async fn handle_remote_queries( &self, ctx: QueryContextRef, @@ -82,22 +123,19 @@ impl Instance { ) -> ServerResult> { let mut results = Vec::with_capacity(queries.len()); - for query in queries { - let (table_name, sql) = prometheus::query_to_sql(query)?; - logging::debug!( - "prometheus remote read, table: {}, sql: {}", - table_name, - sql - ); + let catalog_name = ctx.current_catalog(); + let schema_name = ctx.current_schema(); + + for query in queries { + let table_name = prometheus::table_name(query)?; - let query = Request::Query(QueryRequest { - query: Some(query_request::Query::Sql(sql.to_string())), - }); let output = self - .do_query(query, ctx.clone()) + .handle_remote_query(&ctx, &catalog_name, &schema_name, &table_name, query) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; + .with_context(|_| error::ExecuteQuerySnafu { + query: format!("{query:#?}"), + })?; results.push((table_name, output)); } diff --git a/src/query/src/dataframe.rs b/src/query/src/dataframe.rs new file mode 100644 index 0000000000..0e87f3e54a --- /dev/null +++ b/src/query/src/dataframe.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion::dataframe::DataFrame as DfDataFrame; + +/// DataFrame represents a logical set of rows with the same named columns. +/// Similar to a Pandas DataFrame or Spark DataFrame +#[derive(Clone)] +pub enum DataFrame { + DataFusion(DfDataFrame), +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 19e7cf915a..efc868d1cd 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -17,6 +17,7 @@ mod error; mod planner; +use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -43,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::requests::{DeleteRequest, InsertRequest}; use table::TableRef; +use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, @@ -207,6 +209,10 @@ impl DatafusionQueryEngine { #[async_trait] impl QueryEngine for DatafusionQueryEngine { + fn as_any(&self) -> &dyn Any { + self + } + fn planner(&self) -> Arc { Arc::new(DfLogicalPlanner::new(self.state.clone())) } @@ -249,6 +255,18 @@ impl QueryEngine for DatafusionQueryEngine { fn register_function(&self, func: FunctionRef) { self.state.register_udf(create_udf(func)); } + + fn read_table(&self, table: TableRef) -> Result { + Ok(DataFrame::DataFusion( + self.state + .read_table(table) + .context(error::DatafusionSnafu { + msg: "Fail to create dataframe for table", + }) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?, + )) + } } impl LogicalOptimizer for DatafusionQueryEngine { @@ -374,6 +392,7 @@ impl QueryExecutor for DatafusionQueryEngine { #[cfg(test)] mod tests { + use std::borrow::Cow::Borrowed; use std::sync::Arc; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; @@ -381,12 +400,14 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util; + use datafusion::prelude::{col, lit}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; - use datatypes::vectors::{UInt64Vector, VectorRef}; + use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef}; use session::context::QueryContext; use table::table::numbers::NumbersTable; + use super::*; use crate::parser::QueryLanguageParser; use crate::query_engine::{QueryEngineFactory, QueryEngineRef}; @@ -470,6 +491,42 @@ mod tests { } } + #[tokio::test] + async fn test_read_table() { + let engine = create_test_engine().await; + + let engine = engine + .as_any() + .downcast_ref::() + .unwrap(); + let table = engine + .find_table(&ResolvedTableReference { + catalog: Borrowed("greptime"), + schema: Borrowed("public"), + table: Borrowed("numbers"), + }) + .await + .unwrap(); + + let DataFrame::DataFusion(df) = engine.read_table(table).unwrap(); + let df = df + .select_columns(&["number"]) + .unwrap() + .filter(col("number").lt(lit(10))) + .unwrap(); + let batches = df.collect().await.unwrap(); + assert_eq!(1, batches.len()); + let batch = &batches[0]; + + assert_eq!(1, batch.num_columns()); + assert_eq!(batch.column(0).len(), 10); + + assert_eq!( + Helper::try_into_vector(batch.column(0)).unwrap(), + Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef + ); + } + #[tokio::test] async fn test_describe() { let engine = create_test_engine().await; diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 8405c00fe6..4e18a86c00 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -14,6 +14,7 @@ #![feature(let_chains)] +pub mod dataframe; pub mod datafusion; pub mod dist_plan; pub mod error; diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 0f406bea17..b24ddc4504 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use datafusion_expr::LogicalPlan as DfLogicalPlan; use datatypes::schema::Schema; @@ -46,4 +46,17 @@ impl LogicalPlan { } } } + + /// Return a `format`able structure that produces a single line + /// per node. For example: + /// + /// ```text + /// Projection: employee.id + /// Filter: employee.state Eq Utf8(\"CO\")\ + /// CsvScan: employee projection=Some([0, 3]) + /// ``` + pub fn display_indent(&self) -> impl Display + '_ { + let LogicalPlan::DfPlan(plan) = self; + plan.display_indent() + } } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 20e3cc9fe2..5880f405c6 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -16,6 +16,7 @@ mod context; pub mod options; mod state; +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; @@ -30,7 +31,9 @@ use datatypes::schema::Schema; use partition::manager::PartitionRuleManager; use session::context::QueryContextRef; use sql::statements::statement::Statement; +use table::TableRef; +use crate::dataframe::DataFrame; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; use crate::plan::LogicalPlan; @@ -47,6 +50,10 @@ pub trait SqlStatementExecutor: Send + Sync { #[async_trait] pub trait QueryEngine: Send + Sync { + /// Returns the query engine as Any + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + fn planner(&self) -> Arc; fn name(&self) -> &str; @@ -60,6 +67,9 @@ pub trait QueryEngine: Send + Sync { fn register_aggregate_function(&self, func: AggregateFunctionMetaRef); fn register_function(&self, func: FunctionRef); + + /// Create a DataFrame from a table. + fn read_table(&self, table: TableRef) -> Result; } pub struct QueryEngineFactory { diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index e69af3696f..a099df4240 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -24,6 +24,7 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::SessionContext; use common_query::prelude::ScalarUdf; use datafusion::catalog::catalog::MemoryCatalogList; +use datafusion::dataframe::DataFrame; use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -38,6 +39,8 @@ use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; use partition::manager::PartitionRuleManager; use promql::extension_plan::PromExtensionPlanner; +use table::table::adapter::DfTableProviderAdapter; +use table::TableRef; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; use crate::extension_serializer::ExtensionSerializer; @@ -59,8 +62,9 @@ pub struct QueryEngineState { impl fmt::Debug for QueryEngineState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - // TODO(dennis) better debug info - write!(f, "QueryEngineState: ") + f.debug_struct("QueryEngineState") + .field("state", &self.df_context.state()) + .finish() } } @@ -188,6 +192,12 @@ impl QueryEngineState { pub(crate) fn session_state(&self) -> SessionState { self.df_context.state() } + + /// Create a DataFrame for a table + pub fn read_table(&self, table: TableRef) -> DfResult { + self.df_context + .read_table(Arc::new(DfTableProviderAdapter::new(table))) + } } struct DfQueryPlanner { diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 693453a023..9212a1cd1d 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -32,6 +32,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion.workspace = true datatypes = { path = "../datatypes" } derive_builder = "0.12" digest = "0.10" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 1981af3fe0..2f224da225 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -279,6 +279,12 @@ pub enum Error { source: tikv_jemalloc_ctl::Error, location: Location, }, + + #[snafu(display("DataFrame operation error, source: {source}, location: {location}"))] + DataFrame { + source: datafusion::error::DataFusionError, + location: Location, + }, } pub type Result = std::result::Result; @@ -317,6 +323,7 @@ impl ErrorExt for Error { | InvalidPromRemoteRequest { .. } | InvalidFlightTicket { .. } | InvalidPrepareStatement { .. } + | DataFrame { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index d285b4075f..5ef3b0a9f8 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -24,8 +24,11 @@ use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; use common_grpc::writer::{LinesWriter, Precision}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; +use datafusion::prelude::{col, lit, regexp_match, Expr}; use datatypes::prelude::{ConcreteDataType, Value}; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; +use query::dataframe::DataFrame; +use query::plan::LogicalPlan; use snafu::{ensure, OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; @@ -40,14 +43,11 @@ pub struct Metrics { pub exposition: MetricsExposition, } -/// Generate a sql from a remote request query -/// TODO(dennis): maybe use logical plan in future to prevent sql injection -pub fn query_to_sql(q: &Query) -> Result<(String, String)> { - let start_timestamp_ms = q.start_timestamp_ms; - let end_timestamp_ms = q.end_timestamp_ms; - +/// Get table name from remote query +pub fn table_name(q: &Query) -> Result { let label_matches = &q.matchers; - let table_name = label_matches + + label_matches .iter() .find_map(|m| { if m.name == METRIC_NAME_LABEL { @@ -58,13 +58,22 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> { }) .context(error::InvalidPromRemoteRequestSnafu { msg: "missing '__name__' label in timeseries", - })?; + }) +} - let mut conditions: Vec = Vec::with_capacity(label_matches.len()); +/// Create a DataFrame from a remote Query +pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { + let DataFrame::DataFusion(dataframe) = dataframe; - conditions.push(format!( - "{TIMESTAMP_COLUMN_NAME}>={start_timestamp_ms} AND {TIMESTAMP_COLUMN_NAME}<={end_timestamp_ms}", - )); + let start_timestamp_ms = q.start_timestamp_ms; + let end_timestamp_ms = q.end_timestamp_ms; + + let label_matches = &q.matchers; + + let mut conditions = Vec::with_capacity(label_matches.len() + 1); + + conditions.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit(start_timestamp_ms))); + conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit(end_timestamp_ms))); for m in label_matches { let name = &m.name; @@ -81,28 +90,30 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> { match m_type { MatcherType::Eq => { - conditions.push(format!("{name}='{value}'")); + conditions.push(col(name).eq(lit(value))); } MatcherType::Neq => { - conditions.push(format!("{name}!='{value}'")); + conditions.push(col(name).not_eq(lit(value))); } // Case sensitive regexp match MatcherType::Re => { - conditions.push(format!("{name}~'{value}'")); + conditions.push(regexp_match(vec![col(name), lit(value)]).is_not_null()); } // Case sensitive regexp not match MatcherType::Nre => { - conditions.push(format!("{name}!~'{value}'")); + conditions.push(regexp_match(vec![col(name), lit(value)]).is_null()); } } } - let conditions = conditions.join(" AND "); + // Safety: conditions MUST not be empty, reduce always return Some(expr). + let conditions = conditions.into_iter().reduce(Expr::and).unwrap(); - Ok(( - table_name.to_string(), - format!("select * from {table_name} where {conditions} order by {TIMESTAMP_COLUMN_NAME}",), - )) + let dataframe = dataframe + .filter(conditions) + .context(error::DataFrameSnafu)?; + + Ok(LogicalPlan::DfPlan(dataframe.into_parts().1)) } #[inline] @@ -433,8 +444,11 @@ mod tests { use std::sync::Arc; use api::prometheus::remote::LabelMatcher; + use datafusion::prelude::SessionContext; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; + use table::table::adapter::DfTableProviderAdapter; + use table::test_util::MemTable; use super::*; @@ -443,14 +457,14 @@ mod tests { const RE_TYPE: i32 = MatcherType::Re as i32; #[test] - fn test_query_to_sql() { + fn test_table_name() { let q = Query { start_timestamp_ms: 1000, end_timestamp_ms: 2000, matchers: vec![], ..Default::default() }; - let err = query_to_sql(&q).unwrap_err(); + let err = table_name(&q).unwrap_err(); assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. })); let q = Query { @@ -463,9 +477,56 @@ mod tests { }], ..Default::default() }; - let (table, sql) = query_to_sql(&q).unwrap(); - assert_eq!("test", table); - assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql); + assert_eq!("test", table_name(&q).unwrap()); + } + + #[test] + fn test_query_to_plan() { + let q = Query { + start_timestamp_ms: 1000, + end_timestamp_ms: 2000, + matchers: vec![LabelMatcher { + name: METRIC_NAME_LABEL.to_string(), + value: "test".to_string(), + r#type: EQ_TYPE, + }], + ..Default::default() + }; + + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new( + TIMESTAMP_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + ColumnSchema::new( + FIELD_COLUMN_NAME, + ConcreteDataType::float64_datatype(), + true, + ), + ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true), + ColumnSchema::new("job", ConcreteDataType::string_datatype(), true), + ])); + let recordbatch = RecordBatch::new( + schema, + vec![ + Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _, + Arc::new(Float64Vector::from_vec(vec![3.0])) as _, + Arc::new(StringVector::from(vec!["host1"])) as _, + Arc::new(StringVector::from(vec!["job"])) as _, + ], + ) + .unwrap(); + + let ctx = SessionContext::new(); + let table = Arc::new(MemTable::new("test", recordbatch)); + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + + let dataframe = ctx.read_table(table_provider.clone()).unwrap(); + let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap(); + let display_string = format!("{}", plan.display_indent()); + + assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000)\n TableScan: ?table?", display_string); let q = Query { start_timestamp_ms: 1000, @@ -489,9 +550,12 @@ mod tests { ], ..Default::default() }; - let (table, sql) = query_to_sql(&q).unwrap(); - assert_eq!("test", table); - assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql); + + let dataframe = ctx.read_table(table_provider).unwrap(); + let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap(); + let display_string = format!("{}", plan.display_indent()); + + assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string); } #[test]