feat: output multiple partition in MergeScanExec (#4227)

* feat: output multiple partition in MergeScanExec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix range manipulate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-06-28 21:45:22 +08:00
committed by GitHub
parent ef935a1de6
commit a7aa556763
14 changed files with 77 additions and 37 deletions

View File

@@ -301,6 +301,12 @@ impl ExecutionPlan for RangeManipulateExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
let exec_input = children[0].clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(self.output_schema.clone()),
exec_input.properties().partitioning.clone(),
exec_input.properties().execution_mode,
);
Ok(Arc::new(Self {
start: self.start,
end: self.end,
@@ -312,7 +318,7 @@ impl ExecutionPlan for RangeManipulateExec {
output_schema: self.output_schema.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
properties: self.properties.clone(),
properties,
}))
}

View File

@@ -134,6 +134,7 @@ pub struct MergeScanExec {
/// Metrics from sub stages
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
}
impl std::fmt::Debug for MergeScanExec {
@@ -154,11 +155,12 @@ impl MergeScanExec {
arrow_schema: &ArrowSchema,
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
target_partition: usize,
) -> Result<Self> {
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema_without_metadata.clone()),
Partitioning::UnknownPartitioning(1),
Partitioning::UnknownPartitioning(target_partition),
ExecutionMode::Bounded,
);
let schema_without_metadata =
@@ -174,10 +176,15 @@ impl MergeScanExec {
sub_stage_metrics: Arc::default(),
properties,
query_ctx,
target_partition,
})
}
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
pub fn to_stream(
&self,
context: Arc<TaskContext>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
@@ -189,6 +196,7 @@ impl MergeScanExec {
let current_schema = self.query_ctx.current_schema().to_string();
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;
let sub_sgate_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
@@ -198,7 +206,12 @@ impl MergeScanExec {
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
for region_id in regions {
for region_id in regions
.iter()
.skip(partition)
.step_by(target_partition)
.copied()
{
let request = QueryRequest {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
@@ -325,11 +338,11 @@ impl ExecutionPlan for MergeScanExec {
fn execute(
&self,
_partition: usize,
partition: usize,
context: Arc<TaskContext>,
) -> Result<DfSendableRecordBatchStream> {
Ok(Box::pin(DfRecordBatchStreamAdapter::new(
self.to_stream(context)?,
self.to_stream(context, partition)?,
)))
}

View File

@@ -107,6 +107,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
&schema,
self.region_query_handler.clone(),
query_ctx,
session_state.config().target_partitions(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}

View File

@@ -121,13 +121,11 @@ limit 1;
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED

View File

@@ -27,7 +27,8 @@ explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY hos
+-+-+
| logical_plan_| Sort: demo.host ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[false]_|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -21,7 +21,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
@@ -68,7 +67,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -94,10 +94,8 @@ order by t.i desc;
|_|_CoalescePartitionsExec_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: i@0 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_|

View File

@@ -259,9 +259,9 @@ EXPLAIN SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHER
| physical_plan | CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[false as cond]_|
|_|_CrossJoinExec_|
|_|_CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_|

View File

@@ -81,11 +81,9 @@ EXPLAIN SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY
| | AggregateExec: mode=Partial, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | UnionExec |
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+

View File

@@ -46,15 +46,16 @@ INSERT INTO my_table VALUES
Affected Rows: 8
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
@@ -65,14 +66,15 @@ DELETE FROM my_table WHERE a < 150;
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
@@ -83,14 +85,15 @@ DELETE FROM my_table WHERE a < 2200 AND a > 1500;
Affected Rows: 2
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
@@ -148,15 +151,16 @@ INSERT INTO my_table VALUES
Affected Rows: 8
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |

View File

@@ -26,14 +26,17 @@ INSERT INTO my_table VALUES
(2200, 'g', 7),
(2400, 'h', 8);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 150;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 2200 AND a > 1500;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 2500;
@@ -66,6 +69,7 @@ INSERT INTO my_table VALUES
(2200, 'g', 7),
(2400, 'h', 8);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;
DROP TABLE my_table;

View File

@@ -58,6 +58,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| logical_plan_| RangeSelect: range_exprs=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts |
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts |
|_|_CoalescePartitionsExec_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
@@ -72,6 +73,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED

View File

@@ -21,7 +21,8 @@ TQL ANALYZE (0, 10, '5s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
@@ -50,7 +51,8 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
@@ -78,7 +80,8 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
@@ -108,7 +111,8 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED

View File

@@ -22,8 +22,9 @@ TQL EXPLAIN (0, 10, '5s') test;
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] |
| | MergeScanExec: REDACTED
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -43,8 +44,9 @@ TQL EXPLAIN (0, 10, '1s', '2s') test;
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] |
| | MergeScanExec: REDACTED
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------+
@@ -63,8 +65,9 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] |
| | MergeScanExec: REDACTED
| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] |
| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -151,13 +154,20 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_|
| physical_plan after join_selection_| SAME TEXT AS ABOVE_|
| physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceDistribution_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_CoalescePartitionsExec_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_|
| physical_plan after EnforceSorting_| OutputRequirementExec_|
|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_|
@@ -166,7 +176,8 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_|
@@ -176,13 +187,15 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivideExec: tags=["k"]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
| physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] |
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+