feat: rewrite the dist analyzer (#2238)

* it works!

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add documents

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unstable timestamp from sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename rewriter struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-23 22:29:08 -05:00
committed by GitHub
parent 8b1f4eb958
commit e5ba3d1708
6 changed files with 438 additions and 234 deletions

View File

@@ -16,7 +16,6 @@ mod analyzer;
mod commutativity;
mod merge_scan;
mod planner;
mod utils;
pub use analyzer::DistPlannerAnalyzer;
pub use merge_scan::MergeScanLogicalPlan;

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Mutex};
use datafusion::datasource::DefaultTableSource;
use datafusion::error::Result as DfResult;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_expr::{Extension, LogicalPlan};
use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter};
use datafusion_expr::LogicalPlan;
use datafusion_optimizer::analyzer::AnalyzerRule;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
@@ -27,7 +26,6 @@ use crate::dist_plan::commutativity::{
partial_commutative_transformer, Categorizer, Commutativity,
};
use crate::dist_plan::merge_scan::MergeScanLogicalPlan;
use crate::dist_plan::utils;
pub struct DistPlannerAnalyzer;
@@ -41,233 +39,196 @@ impl AnalyzerRule for DistPlannerAnalyzer {
plan: LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<LogicalPlan> {
// (1) transform up merge scan
let mut visitor = CommutativeVisitor::new();
let _ = plan.visit(&mut visitor)?;
let state = ExpandState::new();
let plan = plan.transform_down(&|plan| Self::expand(plan, &visitor, &state))?;
// (2) remove placeholder merge scan
let plan = plan.transform(&Self::remove_placeholder_merge_scan)?;
Ok(plan)
let mut rewriter = PlanRewriter::default();
plan.rewrite(&mut rewriter)
}
}
impl DistPlannerAnalyzer {
/// Add [MergeScanLogicalPlan] before the table scan
#[allow(dead_code)]
fn add_merge_scan(plan: LogicalPlan) -> datafusion_common::Result<Transformed<LogicalPlan>> {
Ok(match plan {
LogicalPlan::TableScan(table_scan) => {
let ext_plan = LogicalPlan::Extension(Extension {
node: Arc::new(MergeScanLogicalPlan::new(
LogicalPlan::TableScan(table_scan),
true,
)),
});
Transformed::Yes(ext_plan)
}
_ => Transformed::No(plan),
})
}
/// Remove placeholder [MergeScanLogicalPlan]
fn remove_placeholder_merge_scan(
plan: LogicalPlan,
) -> datafusion_common::Result<Transformed<LogicalPlan>> {
Ok(match &plan {
LogicalPlan::Extension(extension)
if extension.node.name() == MergeScanLogicalPlan::name() =>
{
let merge_scan = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
.unwrap();
if merge_scan.is_placeholder() {
Transformed::Yes(merge_scan.input().clone())
} else {
Transformed::No(plan)
}
}
_ => Transformed::No(plan),
})
}
/// Expand stages on the stop node
fn expand(
mut plan: LogicalPlan,
visitor: &CommutativeVisitor,
state: &ExpandState,
) -> datafusion_common::Result<Transformed<LogicalPlan>> {
if state.is_transformed() {
// only transform once
return Ok(Transformed::No(plan));
}
if let Some(stop_node) = visitor.stop_node && utils::hash_plan(&plan) != stop_node {
// only act with the stop node or the root (the first node seen by this closure) if no stop node
return Ok(Transformed::No(plan));
}
if visitor.stop_node.is_some() {
// insert merge scan between the stop node and its child
let children = plan.inputs();
let mut new_children = Vec::with_capacity(children.len());
for child in children {
let mut new_child =
MergeScanLogicalPlan::new(child.clone(), false).into_logical_plan();
// expand stages
for new_stage in &visitor.next_stage {
new_child = new_stage.with_new_inputs(&[new_child])?
}
new_children.push(new_child);
}
plan = plan.with_new_inputs(&new_children)?;
} else {
// otherwise add merge scan as the new root
plan = MergeScanLogicalPlan::new(plan, false).into_logical_plan();
// expand stages
for new_stage in &visitor.next_stage {
plan = new_stage.with_new_inputs(&[plan])?
}
}
state.set_transformed();
Ok(Transformed::Yes(plan))
}
/// Status of the rewriter to mark if the current pass is expanded
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
enum RewriterStatus {
#[default]
Unexpanded,
Expanded,
}
struct ExpandState {
transformed: Mutex<bool>,
#[derive(Debug, Default)]
struct PlanRewriter {
/// Current level in the tree
level: usize,
/// Simulated stack for the `rewrite` recursion
stack: Vec<(LogicalPlan, usize)>,
/// Stages to be expanded
stage: Vec<LogicalPlan>,
status: RewriterStatus,
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
}
impl ExpandState {
pub fn new() -> Self {
Self {
transformed: Mutex::new(false),
}
impl PlanRewriter {
fn get_parent(&self) -> Option<&LogicalPlan> {
// level starts from 1, it's safe to minus by 1
self.stack
.iter()
.rev()
.find(|(_, level)| *level == self.level - 1)
.map(|(node, _)| node)
}
pub fn is_transformed(&self) -> bool {
*self.transformed.lock().unwrap()
}
/// Set the state to transformed
pub fn set_transformed(&self) {
*self.transformed.lock().unwrap() = true;
}
}
#[derive(Debug)]
struct CommutativeVisitor {
next_stage: Vec<LogicalPlan>,
// hash of the stop node
stop_node: Option<u64>,
/// Partition columns of current visiting table
current_partition_cols: Option<Vec<String>>,
}
impl TreeNodeVisitor for CommutativeVisitor {
type N = LogicalPlan;
fn pre_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result<VisitRecursion> {
// find the first merge scan and stop traversing down
// todo: check if it works for join
Ok(match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO(ruihang): spawn a sub visitor to retrieve partition columns
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
if let Some(provider) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
let partition_key_indices = info.meta.partition_key_indices.clone();
let schema = info.meta.schema.clone();
let partition_cols = partition_key_indices
.into_iter()
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
self.current_partition_cols = Some(partition_cols);
}
}
}
VisitRecursion::Continue
}
_ => VisitRecursion::Continue,
})
}
fn post_visit(&mut self, plan: &LogicalPlan) -> datafusion_common::Result<VisitRecursion> {
/// Return true if should stop and expand. The input plan is the parent node of current node
fn should_expand(&mut self, plan: &LogicalPlan) -> bool {
if DFLogicalSubstraitConvertor.encode(plan).is_err() {
common_telemetry::info!(
"substrait error: {:?}",
DFLogicalSubstraitConvertor.encode(plan)
);
self.stop_node = Some(utils::hash_plan(plan));
return Ok(VisitRecursion::Stop);
return true;
}
match Categorizer::check_plan(plan) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.next_stage.push(plan)
self.stage.push(plan)
}
}
Commutativity::ConditionalCommutative(transformer) => {
if let Some(transformer) = transformer
&& let Some(plan) = transformer(plan) {
self.next_stage.push(plan)
self.stage.push(plan)
}
},
Commutativity::TransformedCommutative(transformer) => {
if let Some(transformer) = transformer
&& let Some(plan) = transformer(plan) {
self.next_stage.push(plan)
self.stage.push(plan)
}
},
Commutativity::CheckPartition => {
if let Some(partition_cols) = &self.current_partition_cols
&& partition_cols.is_empty() {
// no partition columns, and can be encoded skip
return Ok(VisitRecursion::Continue);
} else {
self.stop_node = Some(utils::hash_plan(plan));
return Ok(VisitRecursion::Stop);
}
},
Commutativity::NonCommutative
Commutativity::CheckPartition
| Commutativity::NonCommutative
| Commutativity::Unimplemented
| Commutativity::Unsupported => {
self.stop_node = Some(utils::hash_plan(plan));
return Ok(VisitRecursion::Stop);
return true;
}
}
Ok(VisitRecursion::Continue)
false
}
fn is_expanded(&self) -> bool {
self.status == RewriterStatus::Expanded
}
fn set_expanded(&mut self) {
self.status = RewriterStatus::Expanded;
}
fn set_unexpanded(&mut self) {
self.status = RewriterStatus::Unexpanded;
}
fn maybe_set_partitions(&mut self, plan: &LogicalPlan) {
if self.partition_cols.is_some() {
// only need to set once
return;
}
if let LogicalPlan::TableScan(table_scan) = plan {
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
if let Some(provider) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
let partition_key_indices = info.meta.partition_key_indices.clone();
let schema = info.meta.schema.clone();
let partition_cols = partition_key_indices
.into_iter()
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
self.partition_cols = Some(partition_cols);
}
}
}
}
}
/// pop one stack item and reduce the level by 1
fn pop_stack(&mut self) {
self.level -= 1;
self.stack.pop();
}
}
impl CommutativeVisitor {
pub fn new() -> Self {
Self {
next_stage: vec![],
stop_node: None,
current_partition_cols: None,
impl TreeNodeRewriter for PlanRewriter {
type N = LogicalPlan;
/// descend
fn pre_visit<'a>(&'a mut self, node: &'a Self::N) -> DfResult<RewriteRecursion> {
self.level += 1;
self.stack.push((node.clone(), self.level));
// decendening will clear the stage
self.stage.clear();
self.set_unexpanded();
self.partition_cols = None;
Ok(RewriteRecursion::Continue)
}
/// ascend
///
/// Besure to call `pop_stack` before returning
fn mutate(&mut self, node: Self::N) -> DfResult<Self::N> {
// only expand once on each ascending
if self.is_expanded() {
self.pop_stack();
return Ok(node);
}
self.maybe_set_partitions(&node);
let Some(parent) = self.get_parent() else {
// add merge scan as the new root
let mut node = MergeScanLogicalPlan::new(node, false).into_logical_plan();
// expand stages
for new_stage in self.stage.drain(..) {
node = new_stage.with_new_inputs(&[node])?
}
self.set_expanded();
self.pop_stack();
return Ok(node);
};
// TODO(ruihang): avoid this clone
if self.should_expand(&parent.clone()) {
// TODO(ruihang): does this work for nodes with multiple children?;
// replace the current node with expanded one
let mut node = MergeScanLogicalPlan::new(node, false).into_logical_plan();
// expand stages
for new_stage in self.stage.drain(..) {
node = new_stage.with_new_inputs(&[node])?
}
self.set_expanded();
self.pop_stack();
return Ok(node);
}
self.pop_stack();
Ok(node)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::datasource::DefaultTableSource;
use datafusion_expr::{avg, col, lit, Expr, LogicalPlanBuilder};
use table::table::adapter::DfTableProviderAdapter;

View File

@@ -1,45 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::{Hash, Hasher};
use ahash::AHasher;
use datafusion_expr::LogicalPlan;
/// Calculate u64 hash for a [LogicalPlan].
pub fn hash_plan(plan: &LogicalPlan) -> u64 {
let mut hasher = AHasher::default();
plan.hash(&mut hasher);
hasher.finish()
}
#[cfg(test)]
mod test {
use datafusion_expr::LogicalPlanBuilder;
use super::*;
#[test]
fn hash_two_plan() {
let plan1 = LogicalPlanBuilder::empty(false).build().unwrap();
let plan2 = LogicalPlanBuilder::empty(false)
.explain(false, false)
.unwrap()
.build()
.unwrap();
assert_eq!(hash_plan(&plan1), hash_plan(&plan1));
assert_ne!(hash_plan(&plan1), hash_plan(&plan2));
}
}

View File

@@ -0,0 +1,206 @@
create table t_1 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_2 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_3 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_4 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_5 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_6 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_7 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_8 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_9 (ts timestamp time index, vin string, val int);
Affected Rows: 0
create table t_10 (ts timestamp time index, vin string, val int);
Affected Rows: 0
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
explain
select *
from
t_1
INNER JOIN t_2 ON t_2.ts = t_1.ts
AND t_2.vin = t_1.vin
INNER JOIN t_3 ON t_3.ts = t_2.ts
AND t_3.vin = t_2.vin
INNER JOIN t_4 ON t_4.ts = t_3.ts
AND t_4.vin = t_3.vin
INNER JOIN t_5 ON t_5.ts = t_4.ts
AND t_5.vin = t_4.vin
INNER JOIN t_6 ON t_6.ts = t_5.ts
AND t_6.vin = t_5.vin
INNER JOIN t_7 ON t_7.ts = t_6.ts
AND t_7.vin = t_6.vin
INNER JOIN t_8 ON t_8.ts = t_7.ts
AND t_8.vin = t_7.vin
INNER JOIN t_9 ON t_9.ts = t_8.ts
AND t_9.vin = t_8.vin
INNER JOIN t_10 ON t_10.ts = t_9.ts
AND t_10.vin = t_9.vin
where
t_1.vin is not null
order by t_1.ts desc
limit 1;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Limit: skip=0, fetch=1_|
|_|_Sort: t_1.ts DESC NULLS FIRST, fetch=1_|
|_|_Inner Join: t_9.ts = t_10.ts, t_9.vin = t_10.vin_|
|_|_Inner Join: t_8.ts = t_9.ts, t_8.vin = t_9.vin_|
|_|_Inner Join: t_7.ts = t_8.ts, t_7.vin = t_8.vin_|
|_|_Inner Join: t_6.ts = t_7.ts, t_6.vin = t_7.vin_|
|_|_Inner Join: t_5.ts = t_6.ts, t_5.vin = t_6.vin_|
|_|_Inner Join: t_4.ts = t_5.ts, t_4.vin = t_5.vin_|
|_|_Inner Join: t_3.ts = t_4.ts, t_3.vin = t_4.vin_|
|_|_Inner Join: t_2.ts = t_3.ts, t_2.vin = t_3.vin_|
|_|_Inner Join: t_1.ts = t_2.ts, t_1.vin = t_2.vin_|
|_|_Filter: t_1.vin IS NOT NULL_|
|_|_MergeScan [is_placeholder=false]_|
|_|_Filter: t_2.vin IS NOT NULL_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | GlobalLimitExec: skip=0, fetch=1_|
|_|_SortPreservingMergeExec: [ts@0 DESC], fetch=1_|
|_|_SortExec: fetch=1, expr=[ts@0 DESC]_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
drop table t_1;
Affected Rows: 1
drop table t_2;
Affected Rows: 1
drop table t_3;
Affected Rows: 1
drop table t_4;
Affected Rows: 1
drop table t_5;
Affected Rows: 1
drop table t_6;
Affected Rows: 1
drop table t_7;
Affected Rows: 1
drop table t_8;
Affected Rows: 1
drop table t_9;
Affected Rows: 1
drop table t_10;
Affected Rows: 1

View File

@@ -0,0 +1,53 @@
create table t_1 (ts timestamp time index, vin string, val int);
create table t_2 (ts timestamp time index, vin string, val int);
create table t_3 (ts timestamp time index, vin string, val int);
create table t_4 (ts timestamp time index, vin string, val int);
create table t_5 (ts timestamp time index, vin string, val int);
create table t_6 (ts timestamp time index, vin string, val int);
create table t_7 (ts timestamp time index, vin string, val int);
create table t_8 (ts timestamp time index, vin string, val int);
create table t_9 (ts timestamp time index, vin string, val int);
create table t_10 (ts timestamp time index, vin string, val int);
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
explain
select *
from
t_1
INNER JOIN t_2 ON t_2.ts = t_1.ts
AND t_2.vin = t_1.vin
INNER JOIN t_3 ON t_3.ts = t_2.ts
AND t_3.vin = t_2.vin
INNER JOIN t_4 ON t_4.ts = t_3.ts
AND t_4.vin = t_3.vin
INNER JOIN t_5 ON t_5.ts = t_4.ts
AND t_5.vin = t_4.vin
INNER JOIN t_6 ON t_6.ts = t_5.ts
AND t_6.vin = t_5.vin
INNER JOIN t_7 ON t_7.ts = t_6.ts
AND t_7.vin = t_6.vin
INNER JOIN t_8 ON t_8.ts = t_7.ts
AND t_8.vin = t_7.vin
INNER JOIN t_9 ON t_9.ts = t_8.ts
AND t_9.vin = t_8.vin
INNER JOIN t_10 ON t_10.ts = t_9.ts
AND t_10.vin = t_9.vin
where
t_1.vin is not null
order by t_1.ts desc
limit 1;
drop table t_1;
drop table t_2;
drop table t_3;
drop table t_4;
drop table t_5;
drop table t_6;
drop table t_7;
drop table t_8;
drop table t_9;
drop table t_10;

View File

@@ -56,7 +56,22 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i>
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NOT NULL ORDER BY 2;
Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" }))
+---+---+
| i | i |
+---+---+
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| | 1 |
| 1 | 2 |
| 2 | 2 |
| 3 | 2 |
| | 2 |
| 1 | 3 |
| 2 | 3 |
| 3 | 3 |
| | 3 |
+---+---+
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>1 ORDER BY 2;
@@ -75,7 +90,22 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE CASE WHEN i2.i IS NULL THEN False ELSE True END ORDER BY 2;
Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" }))
+---+---+
| i | i |
+---+---+
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| | 1 |
| 1 | 2 |
| 2 | 2 |
| 3 | 2 |
| | 2 |
| 1 | 3 |
| 2 | 3 |
| 3 | 3 |
| | 3 |
+---+---+
SELECT DISTINCT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NULL ORDER BY 1;