feat: use DataFrame to replace SQL for Prometheus remote read (#1774)

* feat: debug QueryEngineState

* feat: impl read_table to create DataFrame for a table

* fix: clippy warnings

* feat: use DataFrame to handle prometheus remote read quries

* Update src/frontend/src/instance/prometheus.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: CR comments

---------

Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
dennis zhuang
2023-06-14 15:39:28 +08:00
committed by GitHub
parent fb35e09072
commit 09747ea206
12 changed files with 292 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -8534,6 +8534,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"datafusion",
"datatypes",
"derive_builder 0.12.0",
"digest",

View File

@@ -279,6 +279,13 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to read table: {table_name}, source: {source}"))]
ReadTable {
table_name: String,
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to execute logical plan, source: {}", source))]
ExecLogicalPlan {
#[snafu(backtrace)]
@@ -363,13 +370,22 @@ pub enum Error {
},
// TODO(ruihang): merge all query execution error kinds
#[snafu(display("failed to execute PromQL query {}, source: {}", query, source))]
#[snafu(display("Failed to execute PromQL query {}, source: {}", query, source))]
ExecutePromql {
query: String,
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display(
"Failed to create logical plan for prometheus query, source: {}",
source
))]
PrometheusRemoteQueryPlan {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to describe schema for given statement, source: {}", source))]
DescribeStatement {
#[snafu(backtrace)]
@@ -559,7 +575,8 @@ impl ErrorExt for Error {
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),
Error::RuntimeResource { source, .. } => source.status_code(),
Error::ExecutePromql { source, .. } => source.status_code(),
Error::PrometheusRemoteQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
@@ -621,6 +638,7 @@ impl ErrorExt for Error {
Error::ExecuteStatement { source, .. }
| Error::PlanStatement { source }
| Error::ParseQuery { source }
| Error::ReadTable { source, .. }
| Error::ExecLogicalPlan { source }
| Error::DescribeStatement { source } => source.status_code(),

View File

@@ -14,9 +14,8 @@
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::greptime_request::Request;
use api::v1::{query_request, QueryRequest};
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
@@ -25,11 +24,14 @@ use metrics::counter;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PrometheusRemoteQueryPlanSnafu, ReadTableSnafu, Result,
TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES;
@@ -75,6 +77,45 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
}
impl Instance {
async fn handle_remote_query(
&self,
ctx: &QueryContextRef,
catalog_name: &str,
schema_name: &str,
table_name: &str,
query: &Query,
) -> Result<Output> {
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let dataframe = self
.query_engine
.read_table(table)
.with_context(|_| ReadTableSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let logical_plan =
prometheus::query_to_plan(dataframe, query).context(PrometheusRemoteQueryPlanSnafu)?;
logging::debug!(
"Prometheus remote read, table: {}, logical plan: {}",
table_name,
logical_plan.display_indent(),
);
self.query_engine
.execute(logical_plan, ctx.clone())
.await
.context(ExecLogicalPlanSnafu)
}
async fn handle_remote_queries(
&self,
ctx: QueryContextRef,
@@ -82,22 +123,19 @@ impl Instance {
) -> ServerResult<Vec<(String, Output)>> {
let mut results = Vec::with_capacity(queries.len());
for query in queries {
let (table_name, sql) = prometheus::query_to_sql(query)?;
logging::debug!(
"prometheus remote read, table: {}, sql: {}",
table_name,
sql
);
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
for query in queries {
let table_name = prometheus::table_name(query)?;
let query = Request::Query(QueryRequest {
query: Some(query_request::Query::Sql(sql.to_string())),
});
let output = self
.do_query(query, ctx.clone())
.handle_remote_query(&ctx, &catalog_name, &schema_name, &table_name, query)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
.with_context(|_| error::ExecuteQuerySnafu {
query: format!("{query:#?}"),
})?;
results.push((table_name, output));
}

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion::dataframe::DataFrame as DfDataFrame;
/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a Pandas DataFrame or Spark DataFrame
#[derive(Clone)]
pub enum DataFrame {
DataFusion(DfDataFrame),
}

View File

@@ -17,6 +17,7 @@
mod error;
mod planner;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
@@ -43,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
@@ -207,6 +209,10 @@ impl DatafusionQueryEngine {
#[async_trait]
impl QueryEngine for DatafusionQueryEngine {
fn as_any(&self) -> &dyn Any {
self
}
fn planner(&self) -> Arc<dyn LogicalPlanner> {
Arc::new(DfLogicalPlanner::new(self.state.clone()))
}
@@ -249,6 +255,18 @@ impl QueryEngine for DatafusionQueryEngine {
fn register_function(&self, func: FunctionRef) {
self.state.register_udf(create_udf(func));
}
fn read_table(&self, table: TableRef) -> Result<DataFrame> {
Ok(DataFrame::DataFusion(
self.state
.read_table(table)
.context(error::DatafusionSnafu {
msg: "Fail to create dataframe for table",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
))
}
}
impl LogicalOptimizer for DatafusionQueryEngine {
@@ -374,6 +392,7 @@ impl QueryExecutor for DatafusionQueryEngine {
#[cfg(test)]
mod tests {
use std::borrow::Cow::Borrowed;
use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
@@ -381,12 +400,14 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util;
use datafusion::prelude::{col, lit};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{UInt64Vector, VectorRef};
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use table::table::numbers::NumbersTable;
use super::*;
use crate::parser::QueryLanguageParser;
use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
@@ -470,6 +491,42 @@ mod tests {
}
}
#[tokio::test]
async fn test_read_table() {
let engine = create_test_engine().await;
let engine = engine
.as_any()
.downcast_ref::<DatafusionQueryEngine>()
.unwrap();
let table = engine
.find_table(&ResolvedTableReference {
catalog: Borrowed("greptime"),
schema: Borrowed("public"),
table: Borrowed("numbers"),
})
.await
.unwrap();
let DataFrame::DataFusion(df) = engine.read_table(table).unwrap();
let df = df
.select_columns(&["number"])
.unwrap()
.filter(col("number").lt(lit(10)))
.unwrap();
let batches = df.collect().await.unwrap();
assert_eq!(1, batches.len());
let batch = &batches[0];
assert_eq!(1, batch.num_columns());
assert_eq!(batch.column(0).len(), 10);
assert_eq!(
Helper::try_into_vector(batch.column(0)).unwrap(),
Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
);
}
#[tokio::test]
async fn test_describe() {
let engine = create_test_engine().await;

View File

@@ -14,6 +14,7 @@
#![feature(let_chains)]
pub mod dataframe;
pub mod datafusion;
pub mod dist_plan;
pub mod error;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datatypes::schema::Schema;
@@ -46,4 +46,17 @@ impl LogicalPlan {
}
}
}
/// Return a `format`able structure that produces a single line
/// per node. For example:
///
/// ```text
/// Projection: employee.id
/// Filter: employee.state Eq Utf8(\"CO\")\
/// CsvScan: employee projection=Some([0, 3])
/// ```
pub fn display_indent(&self) -> impl Display + '_ {
let LogicalPlan::DfPlan(plan) = self;
plan.display_indent()
}
}

View File

@@ -16,6 +16,7 @@ mod context;
pub mod options;
mod state;
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
@@ -30,7 +31,9 @@ use datatypes::schema::Schema;
use partition::manager::PartitionRuleManager;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use table::TableRef;
use crate::dataframe::DataFrame;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::LogicalPlan;
@@ -47,6 +50,10 @@ pub trait SqlStatementExecutor: Send + Sync {
#[async_trait]
pub trait QueryEngine: Send + Sync {
/// Returns the query engine as Any
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
fn planner(&self) -> Arc<dyn LogicalPlanner>;
fn name(&self) -> &str;
@@ -60,6 +67,9 @@ pub trait QueryEngine: Send + Sync {
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);
fn register_function(&self, func: FunctionRef);
/// Create a DataFrame from a table.
fn read_table(&self, table: TableRef) -> Result<DataFrame>;
}
pub struct QueryEngineFactory {

View File

@@ -24,6 +24,7 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::SessionContext;
use common_query::prelude::ScalarUdf;
use datafusion::catalog::catalog::MemoryCatalogList;
use datafusion::dataframe::DataFrame;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -38,6 +39,8 @@ use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManager;
use promql::extension_plan::PromExtensionPlanner;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::extension_serializer::ExtensionSerializer;
@@ -59,8 +62,9 @@ pub struct QueryEngineState {
impl fmt::Debug for QueryEngineState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// TODO(dennis) better debug info
write!(f, "QueryEngineState: <datafusion context>")
f.debug_struct("QueryEngineState")
.field("state", &self.df_context.state())
.finish()
}
}
@@ -188,6 +192,12 @@ impl QueryEngineState {
pub(crate) fn session_state(&self) -> SessionState {
self.df_context.state()
}
/// Create a DataFrame for a table
pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
self.df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table)))
}
}
struct DfQueryPlanner {

View File

@@ -32,6 +32,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
derive_builder = "0.12"
digest = "0.10"

View File

@@ -279,6 +279,12 @@ pub enum Error {
source: tikv_jemalloc_ctl::Error,
location: Location,
},
#[snafu(display("DataFrame operation error, source: {source}, location: {location}"))]
DataFrame {
source: datafusion::error::DataFusionError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -317,6 +323,7 @@ impl ErrorExt for Error {
| InvalidPromRemoteRequest { .. }
| InvalidFlightTicket { .. }
| InvalidPrepareStatement { .. }
| DataFrame { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => {

View File

@@ -24,8 +24,11 @@ use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use snafu::{ensure, OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
@@ -40,14 +43,11 @@ pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
}
/// Generate a sql from a remote request query
/// TODO(dennis): maybe use logical plan in future to prevent sql injection
pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
/// Get table name from remote query
pub fn table_name(q: &Query) -> Result<String> {
let label_matches = &q.matchers;
let table_name = label_matches
label_matches
.iter()
.find_map(|m| {
if m.name == METRIC_NAME_LABEL {
@@ -58,13 +58,22 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?;
})
}
let mut conditions: Vec<String> = Vec::with_capacity(label_matches.len());
/// Create a DataFrame from a remote Query
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
let DataFrame::DataFusion(dataframe) = dataframe;
conditions.push(format!(
"{TIMESTAMP_COLUMN_NAME}>={start_timestamp_ms} AND {TIMESTAMP_COLUMN_NAME}<={end_timestamp_ms}",
));
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
let label_matches = &q.matchers;
let mut conditions = Vec::with_capacity(label_matches.len() + 1);
conditions.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit(start_timestamp_ms)));
conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit(end_timestamp_ms)));
for m in label_matches {
let name = &m.name;
@@ -81,28 +90,30 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
match m_type {
MatcherType::Eq => {
conditions.push(format!("{name}='{value}'"));
conditions.push(col(name).eq(lit(value)));
}
MatcherType::Neq => {
conditions.push(format!("{name}!='{value}'"));
conditions.push(col(name).not_eq(lit(value)));
}
// Case sensitive regexp match
MatcherType::Re => {
conditions.push(format!("{name}~'{value}'"));
conditions.push(regexp_match(vec![col(name), lit(value)]).is_not_null());
}
// Case sensitive regexp not match
MatcherType::Nre => {
conditions.push(format!("{name}!~'{value}'"));
conditions.push(regexp_match(vec![col(name), lit(value)]).is_null());
}
}
}
let conditions = conditions.join(" AND ");
// Safety: conditions MUST not be empty, reduce always return Some(expr).
let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
Ok((
table_name.to_string(),
format!("select * from {table_name} where {conditions} order by {TIMESTAMP_COLUMN_NAME}",),
))
let dataframe = dataframe
.filter(conditions)
.context(error::DataFrameSnafu)?;
Ok(LogicalPlan::DfPlan(dataframe.into_parts().1))
}
#[inline]
@@ -433,8 +444,11 @@ mod tests {
use std::sync::Arc;
use api::prometheus::remote::LabelMatcher;
use datafusion::prelude::SessionContext;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use table::table::adapter::DfTableProviderAdapter;
use table::test_util::MemTable;
use super::*;
@@ -443,14 +457,14 @@ mod tests {
const RE_TYPE: i32 = MatcherType::Re as i32;
#[test]
fn test_query_to_sql() {
fn test_table_name() {
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![],
..Default::default()
};
let err = query_to_sql(&q).unwrap_err();
let err = table_name(&q).unwrap_err();
assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
let q = Query {
@@ -463,9 +477,56 @@ mod tests {
}],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
assert_eq!("test", table_name(&q).unwrap());
}
#[test]
fn test_query_to_plan() {
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
}],
..Default::default()
};
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(
FIELD_COLUMN_NAME,
ConcreteDataType::float64_datatype(),
true,
),
ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
]));
let recordbatch = RecordBatch::new(
schema,
vec![
Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
Arc::new(StringVector::from(vec!["host1"])) as _,
Arc::new(StringVector::from(vec!["job"])) as _,
],
)
.unwrap();
let ctx = SessionContext::new();
let table = Arc::new(MemTable::new("test", recordbatch));
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
let dataframe = ctx.read_table(table_provider.clone()).unwrap();
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000)\n TableScan: ?table?", display_string);
let q = Query {
start_timestamp_ms: 1000,
@@ -489,9 +550,12 @@ mod tests {
],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
let dataframe = ctx.read_table(table_provider).unwrap();
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
}
#[test]