fix: check for table scan before expanding (#2491)

* fix: check for table scan before expanding

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change assert_ok to unwrap

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy warning

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't skip dml

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* uncomment ignored tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-26 20:12:08 +08:00
committed by GitHub
parent a6116bb866
commit e352fb4495
2 changed files with 17 additions and 19 deletions

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use common_telemetry::info;
use datafusion::datasource::DefaultTableSource;
use datafusion::error::Result as DfResult;
use datafusion_common::config::ConfigOptions;
@@ -46,7 +45,9 @@ impl AnalyzerRule for DistPlannerAnalyzer {
) -> datafusion_common::Result<LogicalPlan> {
let plan = plan.transform(&Self::inspect_plan_with_subquery)?;
let mut rewriter = PlanRewriter::default();
plan.rewrite(&mut rewriter)
let result = plan.rewrite(&mut rewriter)?;
Ok(result)
}
}
@@ -138,10 +139,6 @@ impl PlanRewriter {
/// Return true if should stop and expand. The input plan is the parent node of current node
fn should_expand(&mut self, plan: &LogicalPlan) -> bool {
if DFLogicalSubstraitConvertor.encode(plan).is_err() {
info!(
"substrait error: {:?}",
DFLogicalSubstraitConvertor.encode(plan)
);
return true;
}
@@ -251,6 +248,13 @@ impl TreeNodeRewriter for PlanRewriter {
return Ok(node);
}
// only expand when the leaf is table scan
if node.inputs().is_empty() && !matches!(node, LogicalPlan::TableScan(_)) {
self.set_expanded();
self.pop_stack();
return Ok(node);
}
self.maybe_set_partitions(&node);
let Some(parent) = self.get_parent() else {

View File

@@ -54,13 +54,10 @@ macro_rules! sql_tests {
$service,
test_mysql_auth,
// ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445
// test_mysql_crud,
test_mysql_crud,
test_postgres_auth,
// ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445
// test_postgres_crud,
// ignore: https://github.com/GreptimeTeam/greptimedb/issues/2445
// test_postgres_parameter_inference,
test_postgres_crud,
test_postgres_parameter_inference,
);
)*
};
@@ -123,7 +120,6 @@ pub async fn test_mysql_auth(store_type: StorageType) {
guard.remove_all().await;
}
#[allow(dead_code)]
pub async fn test_mysql_crud(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
@@ -135,12 +131,12 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.await
.unwrap();
assert!(sqlx::query(
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
)
.execute(&pool)
.await
.is_ok());
.unwrap();
for i in 0..10 {
let dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(60, i).unwrap(),
@@ -149,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
assert!(sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
@@ -157,7 +153,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.bind(bytes)
.execute(&pool)
.await
.is_ok());
.unwrap();
}
let rows = sqlx::query("select i, d, dt, b from demo")
@@ -270,7 +266,6 @@ pub async fn test_postgres_auth(store_type: StorageType) {
guard.remove_all().await;
}
#[allow(dead_code)]
pub async fn test_postgres_crud(store_type: StorageType) {
let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_crud").await;
@@ -347,7 +342,6 @@ pub async fn test_postgres_crud(store_type: StorageType) {
guard.remove_all().await;
}
#[allow(dead_code)]
pub async fn test_postgres_parameter_inference(store_type: StorageType) {
let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_inference").await;