mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
refactor: avoid empty display in errors (#5858)
* refactor: avoid empty display in errors * fix: resolve PR comments
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1988,6 +1988,7 @@ dependencies = [
|
||||
name = "common-error"
|
||||
version = "0.14.0"
|
||||
dependencies = [
|
||||
"common-macro",
|
||||
"http 1.1.0",
|
||||
"snafu 0.8.5",
|
||||
"strum 0.27.1",
|
||||
|
||||
@@ -12,3 +12,6 @@ http.workspace = true
|
||||
snafu.workspace = true
|
||||
strum.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-macro.workspace = true
|
||||
|
||||
@@ -42,7 +42,7 @@ pub trait ErrorExt: StackError {
|
||||
if let Some(external_error) = error.source() {
|
||||
let external_root = external_error.sources().last().unwrap();
|
||||
|
||||
if error.to_string().is_empty() {
|
||||
if error.transparent() {
|
||||
format!("{external_root}")
|
||||
} else {
|
||||
format!("{error}: {external_root}")
|
||||
@@ -86,6 +86,14 @@ pub trait StackError: std::error::Error {
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Indicates whether this error is "transparent", that it delegates its "display" and "source"
|
||||
/// to the underlying error. Could be useful when you are just wrapping some external error,
|
||||
/// **AND** can not or would not provide meaningful contextual info. For example, the
|
||||
/// `DataFusionError`.
|
||||
fn transparent(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized + StackError> StackError for Arc<T> {
|
||||
|
||||
111
src/common/error/tests/ext.rs
Normal file
111
src/common/error/tests/ext.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::{ErrorExt, PlainError, StackError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, ResultExt, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[stack_trace_debug]
|
||||
enum MyError {
|
||||
#[snafu(display(r#"A normal error with "display" attribute, message "{}""#, message))]
|
||||
Normal {
|
||||
message: String,
|
||||
#[snafu(source)]
|
||||
error: PlainError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
Transparent {
|
||||
#[snafu(source)]
|
||||
error: PlainError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for MyError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::Unexpected
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
fn normal_error() -> Result<(), MyError> {
|
||||
let plain_error = PlainError::new("<root cause>".to_string(), StatusCode::Unexpected);
|
||||
Err(plain_error).context(NormalSnafu { message: "blabla" })
|
||||
}
|
||||
|
||||
fn transparent_error() -> Result<(), MyError> {
|
||||
let plain_error = PlainError::new("<root cause>".to_string(), StatusCode::Unexpected);
|
||||
Err(plain_error)?
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_output_msg() {
|
||||
let result = normal_error();
|
||||
assert_eq!(
|
||||
result.unwrap_err().output_msg(),
|
||||
r#"A normal error with "display" attribute, message "blabla": <root cause>"#
|
||||
);
|
||||
|
||||
let result = transparent_error();
|
||||
assert_eq!(result.unwrap_err().output_msg(), "<root cause>");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_string() {
|
||||
let result = normal_error();
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
r#"A normal error with "display" attribute, message "blabla""#
|
||||
);
|
||||
|
||||
let result = transparent_error();
|
||||
assert_eq!(result.unwrap_err().to_string(), "<root cause>");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_debug_format() {
|
||||
let result = normal_error();
|
||||
assert_eq!(
|
||||
format!("{:?}", result.unwrap_err()),
|
||||
r#"0: A normal error with "display" attribute, message "blabla", at src/common/error/tests/ext.rs:55:22
|
||||
1: PlainError { msg: "<root cause>", status_code: Unexpected }"#
|
||||
);
|
||||
|
||||
let result = transparent_error();
|
||||
assert_eq!(
|
||||
format!("{:?}", result.unwrap_err()),
|
||||
r#"0: <transparent>, at src/common/error/tests/ext.rs:60:5
|
||||
1: PlainError { msg: "<root cause>", status_code: Unexpected }"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transparent_flag() {
|
||||
let result = normal_error();
|
||||
assert!(!result.unwrap_err().transparent());
|
||||
|
||||
let result = transparent_error();
|
||||
assert!(result.unwrap_err().transparent());
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! implement `::common_error::ext::StackError`
|
||||
|
||||
use proc_macro2::{Span, TokenStream as TokenStream2};
|
||||
use proc_macro2::{Literal, Span, TokenStream as TokenStream2, TokenTree};
|
||||
use quote::{quote, quote_spanned};
|
||||
use syn::spanned::Spanned;
|
||||
use syn::{parenthesized, Attribute, Ident, ItemEnum, Variant};
|
||||
@@ -32,6 +32,7 @@ pub fn stack_trace_style_impl(args: TokenStream2, input: TokenStream2) -> TokenS
|
||||
variants.push(variant);
|
||||
}
|
||||
|
||||
let transparent_fn = build_transparent_fn(enum_name.clone(), &variants);
|
||||
let debug_fmt_fn = build_debug_fmt_impl(enum_name.clone(), variants.clone());
|
||||
let next_fn = build_next_impl(enum_name.clone(), variants);
|
||||
let debug_impl = build_debug_impl(enum_name.clone());
|
||||
@@ -43,6 +44,7 @@ pub fn stack_trace_style_impl(args: TokenStream2, input: TokenStream2) -> TokenS
|
||||
impl ::common_error::ext::StackError for #enum_name {
|
||||
#debug_fmt_fn
|
||||
#next_fn
|
||||
#transparent_fn
|
||||
}
|
||||
|
||||
#debug_impl
|
||||
@@ -115,6 +117,7 @@ struct ErrorVariant {
|
||||
has_source: bool,
|
||||
has_external_cause: bool,
|
||||
display: TokenStream2,
|
||||
transparent: bool,
|
||||
span: Span,
|
||||
cfg_attr: Option<Attribute>,
|
||||
}
|
||||
@@ -140,6 +143,7 @@ impl ErrorVariant {
|
||||
}
|
||||
|
||||
let mut display = None;
|
||||
let mut transparent = false;
|
||||
let mut cfg_attr = None;
|
||||
for attr in variant.attrs {
|
||||
if attr.path().is_ident("snafu") {
|
||||
@@ -150,17 +154,29 @@ impl ErrorVariant {
|
||||
let display_ts: TokenStream2 = content.parse()?;
|
||||
display = Some(display_ts);
|
||||
Ok(())
|
||||
} else if meta.path.is_ident("transparent") {
|
||||
display = Some(TokenStream2::from(TokenTree::Literal(Literal::string(
|
||||
"<transparent>",
|
||||
))));
|
||||
transparent = true;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(meta.error("unrecognized repr"))
|
||||
}
|
||||
})
|
||||
.expect("Each error should contains a display attribute");
|
||||
.unwrap_or_else(|e| panic!("{e}"));
|
||||
}
|
||||
|
||||
if attr.path().is_ident("cfg") {
|
||||
cfg_attr = Some(attr);
|
||||
}
|
||||
}
|
||||
let display = display.unwrap_or_else(|| {
|
||||
panic!(
|
||||
r#"Error "{}" must be annotated with attribute "display" or "transparent"."#,
|
||||
variant.ident,
|
||||
)
|
||||
});
|
||||
|
||||
let field_ident = variant
|
||||
.fields
|
||||
@@ -174,7 +190,8 @@ impl ErrorVariant {
|
||||
has_location,
|
||||
has_source,
|
||||
has_external_cause,
|
||||
display: display.unwrap(),
|
||||
display,
|
||||
transparent,
|
||||
span,
|
||||
cfg_attr,
|
||||
}
|
||||
@@ -275,4 +292,44 @@ impl ErrorVariant {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_transparent_match_arm(&self) -> TokenStream2 {
|
||||
let cfg = if let Some(cfg) = &self.cfg_attr {
|
||||
quote_spanned!(cfg.span() => #cfg)
|
||||
} else {
|
||||
quote! {}
|
||||
};
|
||||
let name = &self.name;
|
||||
let fields = &self.fields;
|
||||
|
||||
if self.transparent {
|
||||
quote_spanned! {
|
||||
self.span => #cfg #[allow(unused_variables)] #name { #(#fields),* } => {
|
||||
true
|
||||
},
|
||||
}
|
||||
} else {
|
||||
quote_spanned! {
|
||||
self.span => #cfg #[allow(unused_variables)] #name { #(#fields),* } =>{
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_transparent_fn(enum_name: Ident, variants: &[ErrorVariant]) -> TokenStream2 {
|
||||
let match_arms = variants
|
||||
.iter()
|
||||
.map(|v| v.build_transparent_match_arm())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
quote! {
|
||||
fn transparent(&self) -> bool {
|
||||
use #enum_name::*;
|
||||
match self {
|
||||
#(#match_arms)*
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,7 +298,7 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(df_record_batch)) => {
|
||||
let df_record_batch = df_record_batch.context(error::PollStreamSnafu)?;
|
||||
let df_record_batch = df_record_batch?;
|
||||
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
|
||||
self.schema(),
|
||||
df_record_batch,
|
||||
|
||||
@@ -65,7 +65,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(""))]
|
||||
#[snafu(transparent)]
|
||||
PollStream {
|
||||
#[snafu(source)]
|
||||
error: datafusion::error::DataFusionError,
|
||||
|
||||
@@ -50,9 +50,9 @@ use crate::dataframe::DataFrame;
|
||||
pub use crate::datafusion::planner::DfContextProviderAdapter;
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
use crate::error::{
|
||||
CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
|
||||
MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
|
||||
TableMutationSnafu, TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
|
||||
CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
|
||||
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
|
||||
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::executor::QueryExecutor;
|
||||
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
|
||||
@@ -308,8 +308,7 @@ impl DatafusionQueryEngine {
|
||||
let physical_plan = state
|
||||
.query_planner()
|
||||
.create_physical_plan(&optimized_plan, state)
|
||||
.await
|
||||
.context(DataFusionSnafu)?;
|
||||
.await?;
|
||||
|
||||
Ok(physical_plan)
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ use datafusion_sql::parser::Statement as DfStatement;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{Location, ResultExt};
|
||||
|
||||
use crate::error::{CatalogSnafu, DataFusionSnafu, Result};
|
||||
use crate::error::{CatalogSnafu, Result};
|
||||
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
|
||||
|
||||
pub struct DfContextProviderAdapter {
|
||||
@@ -70,9 +70,7 @@ impl DfContextProviderAdapter {
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Self> {
|
||||
let table_names = if let Some(df_stmt) = df_stmt {
|
||||
session_state
|
||||
.resolve_table_references(df_stmt)
|
||||
.context(DataFusionSnafu)?
|
||||
session_state.resolve_table_references(df_stmt)?
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
@@ -126,7 +126,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(""))]
|
||||
#[snafu(transparent)]
|
||||
DataFusion {
|
||||
#[snafu(source)]
|
||||
error: DataFusionError,
|
||||
|
||||
@@ -19,12 +19,11 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
|
||||
use datafusion_common::TableReference;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Join, LogicalPlan, Operator};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
pub use table::metadata::TableType;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::error::{DataFusionSnafu, Result};
|
||||
use crate::error::Result;
|
||||
|
||||
struct TableNamesExtractAndRewriter {
|
||||
pub(crate) table_names: HashSet<TableName>,
|
||||
@@ -119,7 +118,7 @@ pub fn extract_and_rewrite_full_table_names(
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<(HashSet<TableName>, LogicalPlan)> {
|
||||
let mut extractor = TableNamesExtractAndRewriter::new(query_ctx);
|
||||
let plan = plan.rewrite(&mut extractor).context(DataFusionSnafu)?;
|
||||
let plan = plan.rewrite(&mut extractor)?;
|
||||
Ok((extractor.table_names, plan.data))
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use snafu::ResultExt;
|
||||
use sql::ast::Expr as SqlExpr;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
|
||||
use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
|
||||
use crate::log_query::planner::LogQueryPlanner;
|
||||
use crate::parser::QueryStatement;
|
||||
use crate::promql::planner::PromPlanner;
|
||||
@@ -118,8 +118,7 @@ impl DfLogicalPlanner {
|
||||
let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
|
||||
let plan = self
|
||||
.engine_state
|
||||
.optimize_by_extension_rules(plan, &context)
|
||||
.context(DataFusionSnafu)?;
|
||||
.optimize_by_extension_rules(plan, &context)?;
|
||||
common_telemetry::debug!("Logical planner, optimize result: {plan}");
|
||||
|
||||
Ok(plan)
|
||||
@@ -154,9 +153,7 @@ impl DfLogicalPlanner {
|
||||
|
||||
let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
|
||||
|
||||
sql_to_rel
|
||||
.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())
|
||||
.context(DataFusionSnafu)
|
||||
Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -183,10 +180,7 @@ impl DfLogicalPlanner {
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.engine_state
|
||||
.optimize_logical_plan(plan)
|
||||
.context(DataFusionSnafu)
|
||||
.map(Into::into)
|
||||
Ok(self.engine_state.optimize_logical_plan(plan)?)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,9 +55,9 @@ use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
|
||||
use futures::{ready, Stream};
|
||||
use futures_util::StreamExt;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result};
|
||||
use crate::error::{RangeQuerySnafu, Result};
|
||||
|
||||
type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
|
||||
|
||||
@@ -373,25 +373,22 @@ impl RangeSelect {
|
||||
Ok((None, Arc::new(field)))
|
||||
},
|
||||
)
|
||||
.collect::<DfResult<Vec<_>>>()
|
||||
.context(DataFusionSnafu)?;
|
||||
.collect::<DfResult<Vec<_>>>()?;
|
||||
// add align_ts
|
||||
let ts_field = time_index
|
||||
.to_field(input.schema().as_ref())
|
||||
.context(DataFusionSnafu)?;
|
||||
let ts_field = time_index.to_field(input.schema().as_ref())?;
|
||||
let time_index_name = ts_field.1.name().clone();
|
||||
fields.push(ts_field);
|
||||
// add by
|
||||
let by_fields = exprlist_to_fields(&by, &input).context(DataFusionSnafu)?;
|
||||
let by_fields = exprlist_to_fields(&by, &input)?;
|
||||
fields.extend(by_fields.clone());
|
||||
let schema_before_project = Arc::new(
|
||||
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())
|
||||
.context(DataFusionSnafu)?,
|
||||
);
|
||||
let by_schema = Arc::new(
|
||||
DFSchema::new_with_metadata(by_fields, input.schema().metadata().clone())
|
||||
.context(DataFusionSnafu)?,
|
||||
);
|
||||
let schema_before_project = Arc::new(DFSchema::new_with_metadata(
|
||||
fields,
|
||||
input.schema().metadata().clone(),
|
||||
)?);
|
||||
let by_schema = Arc::new(DFSchema::new_with_metadata(
|
||||
by_fields,
|
||||
input.schema().metadata().clone(),
|
||||
)?);
|
||||
// If the results of project plan can be obtained directly from range plan without any additional
|
||||
// calculations, no project plan is required. We can simply project the final output of the range
|
||||
// plan to produce the final result.
|
||||
@@ -421,10 +418,10 @@ impl RangeSelect {
|
||||
(f.0.cloned(), Arc::new(f.1.clone()))
|
||||
})
|
||||
.collect();
|
||||
Arc::new(
|
||||
DFSchema::new_with_metadata(project_field, input.schema().metadata().clone())
|
||||
.context(DataFusionSnafu)?,
|
||||
)
|
||||
Arc::new(DFSchema::new_with_metadata(
|
||||
project_field,
|
||||
input.schema().metadata().clone(),
|
||||
)?)
|
||||
} else {
|
||||
schema_before_project.clone()
|
||||
};
|
||||
|
||||
@@ -43,8 +43,7 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu,
|
||||
UnknownTableSnafu,
|
||||
CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
|
||||
};
|
||||
use crate::plan::ExtractExpr;
|
||||
use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
|
||||
@@ -385,8 +384,7 @@ impl RangePlanRewriter {
|
||||
let new_expr = expr
|
||||
.iter()
|
||||
.map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
|
||||
.collect::<DFResult<Vec<_>>>()
|
||||
.context(DataFusionSnafu)?;
|
||||
.collect::<DFResult<Vec<_>>>()?;
|
||||
if range_rewriter.by.is_empty() {
|
||||
range_rewriter.by = default_by;
|
||||
}
|
||||
@@ -408,9 +406,7 @@ impl RangePlanRewriter {
|
||||
} else {
|
||||
let project_plan = LogicalPlanBuilder::from(range_plan)
|
||||
.project(new_expr)
|
||||
.context(DataFusionSnafu)?
|
||||
.build()
|
||||
.context(DataFusionSnafu)?;
|
||||
.and_then(|x| x.build())?;
|
||||
Ok(Some(project_plan))
|
||||
}
|
||||
}
|
||||
@@ -436,8 +432,7 @@ impl RangePlanRewriter {
|
||||
}
|
||||
);
|
||||
LogicalPlanBuilder::from(inputs[0].clone())
|
||||
.explain(*verbose, true)
|
||||
.context(DataFusionSnafu)?
|
||||
.explain(*verbose, true)?
|
||||
.build()
|
||||
}
|
||||
LogicalPlan::Explain(Explain { verbose, .. }) => {
|
||||
@@ -448,8 +443,7 @@ impl RangePlanRewriter {
|
||||
}
|
||||
);
|
||||
LogicalPlanBuilder::from(inputs[0].clone())
|
||||
.explain(*verbose, false)
|
||||
.context(DataFusionSnafu)?
|
||||
.explain(*verbose, false)?
|
||||
.build()
|
||||
}
|
||||
LogicalPlan::Distinct(Distinct::On(DistinctOn {
|
||||
@@ -470,13 +464,11 @@ impl RangePlanRewriter {
|
||||
on_expr.clone(),
|
||||
select_expr.clone(),
|
||||
sort_expr.clone(),
|
||||
)
|
||||
.context(DataFusionSnafu)?
|
||||
)?
|
||||
.build()
|
||||
}
|
||||
_ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
|
||||
}
|
||||
.context(DataFusionSnafu)?;
|
||||
}?;
|
||||
Ok(Some(plan))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -606,8 +598,6 @@ fn interval_only_in_expr(expr: &Expr) -> bool {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use std::error::Error;
|
||||
|
||||
use arrow::datatypes::IntervalUnit;
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use catalog::RegisterTableRequest;
|
||||
@@ -825,12 +815,7 @@ mod test {
|
||||
/// the right argument is `range_fn(avg(field_0), '5m', 'NULL', '0', '1h')`
|
||||
async fn range_argument_err_1() {
|
||||
let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
|
||||
let error = do_query(query)
|
||||
.await
|
||||
.unwrap_err()
|
||||
.source()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let error = do_query(query).await.unwrap_err().to_string();
|
||||
assert_eq!(
|
||||
error,
|
||||
"Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
|
||||
@@ -840,12 +825,7 @@ mod test {
|
||||
#[tokio::test]
|
||||
async fn range_argument_err_2() {
|
||||
let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
|
||||
let error = do_query(query)
|
||||
.await
|
||||
.unwrap_err()
|
||||
.source()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let error = do_query(query).await.unwrap_err().to_string();
|
||||
assert_eq!(
|
||||
error,
|
||||
"Error during planning: Illegal argument `Int64(5)` in range select query"
|
||||
|
||||
@@ -301,8 +301,7 @@ async fn query_from_information_schema_table(
|
||||
.state()
|
||||
.clone(),
|
||||
)
|
||||
.read_table(view)
|
||||
.context(error::DataFusionSnafu)?;
|
||||
.read_table(view)?;
|
||||
|
||||
let planner = query_engine.planner();
|
||||
let planner = planner
|
||||
@@ -319,10 +318,7 @@ async fn query_from_information_schema_table(
|
||||
}
|
||||
};
|
||||
|
||||
let stream = dataframe
|
||||
.execute_stream()
|
||||
.await
|
||||
.context(error::DataFusionSnafu)?;
|
||||
let stream = dataframe.execute_stream().await?;
|
||||
|
||||
Ok(Output::new_with_stream(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
|
||||
|
||||
Reference in New Issue
Block a user