isolate rule into a dedicated mod

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-03 11:17:53 +08:00
parent 0e7ccfadf7
commit 36a5023f67
8 changed files with 1534 additions and 217 deletions

View File

@@ -43,6 +43,7 @@ use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
@@ -442,6 +443,22 @@ impl MergeScanExec {
}
}
// Metric-engine scans can satisfy any hash distribution that includes `__tsid`.
// Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay
// co-located across MergeScan partitions.
if self
.arrow_schema
.column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& hash_exprs.iter().any(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
})
{
overlaps = hash_exprs.clone();
}
if overlaps.is_empty() {
return None;
}

View File

@@ -15,6 +15,7 @@
pub mod constant_term;
pub mod count_nest_aggr;
pub mod count_wildcard;
pub mod join_reduce;
pub mod parallelize_scan;
pub mod pass_distribution;
pub mod remove_duplicate;

File diff suppressed because it is too large Load Diff

View File

@@ -17,8 +17,10 @@ use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::utils::map_columns_before_projection;
use crate::dist_plan::MergeScanExec;
@@ -83,7 +85,9 @@ impl PassDistribution {
let mut new_children = Vec::with_capacity(children.len());
for (idx, child) in children.into_iter().enumerate() {
let child_req = match required.get(idx) {
Some(Distribution::UnspecifiedDistribution) => None,
Some(Distribution::UnspecifiedDistribution) => {
Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, &current_req)
}
None => current_req.clone(),
Some(req) => Some(req.clone()),
};
@@ -103,4 +107,199 @@ impl PassDistribution {
plan.with_new_children(new_children)
}
}
fn propagate_unspecified_child_requirement(
plan: &dyn ExecutionPlan,
idx: usize,
current_req: &Option<Distribution>,
) -> Option<Distribution> {
if idx != 0 {
return None;
}
let Some(Distribution::HashPartitioned(required_exprs)) = current_req else {
return None;
};
let projection = plan.as_any().downcast_ref::<ProjectionExec>()?;
let proj_exprs = projection
.expr()
.iter()
.map(|expr| (Arc::clone(&expr.expr), expr.alias.clone()))
.collect::<Vec<_>>();
let mapped = map_columns_before_projection(required_exprs, &proj_exprs);
(mapped.len() == required_exprs.len()).then_some(Distribution::HashPartitioned(mapped))
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use async_trait::async_trait;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::NullEquality;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion::physical_plan::{ExecutionPlanProperties, Partitioning};
use datafusion_expr::{JoinType, LogicalPlanBuilder};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use session::ReadPreference;
use session::context::QueryContext;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use super::*;
use crate::error::Result as QueryResult;
use crate::region_query::RegionQueryHandler;
struct NoopRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for NoopRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
unreachable!("pass distribution tests should not execute remote queries")
}
}
#[test]
fn passes_hash_requirement_through_projection_to_merge_scan() {
let schema = test_schema();
let left_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let right_merge_scan = Arc::new(test_merge_scan_exec(schema.clone()));
let left_projection = Arc::new(
ProjectionExec::try_new(
vec![
ProjectionExpr::new(partition_column("greptime_value", 3), "greptime_value"),
ProjectionExpr::new(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
DATA_SCHEMA_TSID_COLUMN_NAME,
),
ProjectionExpr::new(
partition_column("greptime_timestamp", 2),
"greptime_timestamp",
),
],
left_merge_scan,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let join = Arc::new(
HashJoinExec::try_new(
left_projection,
right_merge_scan,
vec![
(
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
partition_column(DATA_SCHEMA_TSID_COLUMN_NAME, 1),
),
(
partition_column("greptime_timestamp", 2),
partition_column("greptime_timestamp", 2),
),
],
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNull,
false,
)
.unwrap(),
) as Arc<dyn datafusion::physical_plan::ExecutionPlan>;
let optimized = PassDistribution
.optimize(join, &ConfigOptions::default())
.unwrap();
let hash_join = optimized.as_any().downcast_ref::<HashJoinExec>().unwrap();
let left_projection = hash_join
.left()
.as_any()
.downcast_ref::<ProjectionExec>()
.unwrap();
let left_partitioning = left_projection.input().output_partitioning();
let right_partitioning = hash_join.right().output_partitioning();
let Partitioning::Hash(left_exprs, left_count) = left_partitioning else {
panic!("expected left merge scan hash partitioning");
};
let Partitioning::Hash(right_exprs, right_count) = right_partitioning else {
panic!("expected right merge scan hash partitioning");
};
assert_eq!(*left_count, 32);
assert_eq!(*right_count, 32);
assert_eq!(
column_names(left_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
assert_eq!(
column_names(right_exprs),
vec![DATA_SCHEMA_TSID_COLUMN_NAME, "greptime_timestamp"]
);
}
fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec {
let session_state = SessionStateBuilder::new().with_default_features().build();
let partition_cols = BTreeMap::from([(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([datafusion_common::Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)]),
)]);
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec::new(
&session_state,
TableName::new("greptime", "public", "test"),
vec![RegionId::new(1, 0), RegionId::new(1, 1)],
plan,
schema.as_ref(),
Arc::new(NoopRegionQueryHandler),
QueryContext::arc(),
32,
partition_cols,
)
.unwrap()
}
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(DATA_SCHEMA_TSID_COLUMN_NAME, DataType::UInt64, false),
Field::new(
"greptime_timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", DataType::Float64, true),
]))
}
fn partition_column(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(PhysicalColumn::new(name, index))
}
fn column_names(exprs: &[Arc<dyn PhysicalExpr>]) -> Vec<&str> {
exprs
.iter()
.map(|expr| {
expr.as_any()
.downcast_ref::<PhysicalColumn>()
.unwrap()
.name()
})
.collect()
}
}

View File

@@ -203,14 +203,6 @@ pub struct PromPlanner {
ctx: PromPlannerContext,
}
struct RepeatedBinaryPattern<'a> {
repeated: &'a PromExpr,
other: &'a PromExpr,
inner: &'a PromBinaryExpr,
inner_repeated_on_left: bool,
outer_inner_on_left: bool,
}
impl PromPlanner {
pub async fn stmt_to_plan(
table_provider: DfTableSourceProvider,
@@ -670,15 +662,6 @@ impl PromPlanner {
}
// both are columns. join them on time index
(None, None) => {
if !is_comparison_op
&& !Self::is_token_a_set_op(*op)
&& let Some(plan) = self
.try_plan_collapsed_repeated_binary_expr(query_engine_state, binary_expr)
.await?
{
return Ok(plan);
}
let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
let left_field_columns = self.ctx.field_columns.clone();
let left_time_index_column = self.ctx.time_index_column.clone();
@@ -799,123 +782,6 @@ impl PromPlanner {
}
}
async fn try_plan_collapsed_repeated_binary_expr(
&mut self,
query_engine_state: &QueryEngineState,
binary_expr: &PromBinaryExpr,
) -> Result<Option<LogicalPlan>> {
let PromBinaryExpr {
lhs,
rhs,
op,
modifier,
} = binary_expr;
let Some(pattern) = Self::find_repeated_binary_pattern(binary_expr) else {
return Ok(None);
};
if modifier.is_some()
|| pattern.inner.modifier.is_some()
|| Self::is_token_a_set_op(*op)
|| Self::is_token_a_set_op(pattern.inner.op)
|| Self::is_token_a_comparison_op(*op)
|| Self::is_token_a_comparison_op(pattern.inner.op)
|| lhs.value_type() != ValueType::Vector
|| rhs.value_type() != ValueType::Vector
|| pattern.repeated.value_type() != ValueType::Vector
|| pattern.other.value_type() != ValueType::Vector
{
return Ok(None);
}
let repeated_input = self
.prom_expr_to_plan(pattern.repeated, query_engine_state)
.await?;
let repeated_field_columns = self.ctx.field_columns.clone();
let repeated_time_index_column = self.ctx.time_index_column.clone();
let mut repeated_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let repeated_context = self.ctx.clone();
let other_input = self
.prom_expr_to_plan(pattern.other, query_engine_state)
.await?;
let other_field_columns = self.ctx.field_columns.clone();
let other_time_index_column = self.ctx.time_index_column.clone();
let mut other_table_ref = self
.table_ref()
.unwrap_or_else(|_| TableReference::bare(""));
let other_context = self.ctx.clone();
if repeated_table_ref == other_table_ref {
repeated_table_ref = TableReference::bare("lhs");
other_table_ref = TableReference::bare("rhs");
if self.ctx.tag_columns.is_empty() {
self.ctx = repeated_context.clone();
self.ctx.table_name = Some("lhs".to_string());
} else {
self.ctx.table_name = Some("rhs".to_string());
}
}
let field_columns = repeated_field_columns
.iter()
.zip(other_field_columns.iter())
.collect::<Vec<_>>();
// The collapsed fast path must preserve the same zipped-field semantics as the
// original two-step plan: only the shared prefix of value columns participates.
self.ctx.field_columns = field_columns
.iter()
.map(|(repeated_col_name, _)| (*repeated_col_name).clone())
.collect();
let mut field_columns = field_columns.into_iter();
let join_plan = self.join_on_non_field_columns(
repeated_input,
other_input,
repeated_table_ref.clone(),
other_table_ref.clone(),
repeated_time_index_column,
other_time_index_column,
repeated_context.tag_columns.is_empty() || other_context.tag_columns.is_empty(),
true,
&None,
)?;
let join_plan_schema = join_plan.schema().clone();
let field_expr_builder = |_: &String| {
let (repeated_col_name, other_col_name) = field_columns.next().unwrap();
let repeated_col = join_plan_schema
.qualified_field_with_name(Some(&repeated_table_ref), repeated_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let other_col = join_plan_schema
.qualified_field_with_name(Some(&other_table_ref), other_col_name)
.context(DataFusionPlanningSnafu)?
.into();
let repeated_expr = DfExpr::Column(repeated_col);
let other_expr = DfExpr::Column(other_col);
let inner_expr_builder = Self::prom_token_to_binary_expr_builder(pattern.inner.op)?;
let inner_expr = if pattern.inner_repeated_on_left {
inner_expr_builder(repeated_expr.clone(), other_expr)?
} else {
inner_expr_builder(other_expr, repeated_expr.clone())?
};
let outer_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
if pattern.outer_inner_on_left {
outer_expr_builder(inner_expr, repeated_expr)
} else {
outer_expr_builder(repeated_expr, inner_expr)
}
};
self.projection_for_each_field_column(join_plan, field_expr_builder)
.map(Some)
}
fn project_binary_join_side(
&mut self,
input: LogicalPlan,
@@ -3567,69 +3433,6 @@ impl PromPlanner {
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
}
fn prom_expr_eq(left: &PromExpr, right: &PromExpr) -> bool {
Self::strip_paren_expr(left).to_string() == Self::strip_paren_expr(right).to_string()
}
fn strip_paren_expr(mut expr: &PromExpr) -> &PromExpr {
while let PromExpr::Paren(paren) = expr {
expr = paren.expr.as_ref();
}
expr
}
fn find_repeated_binary_pattern<'a>(
binary_expr: &'a PromBinaryExpr,
) -> Option<RepeatedBinaryPattern<'a>> {
let PromBinaryExpr { lhs, rhs, .. } = binary_expr;
let lhs = Self::strip_paren_expr(lhs.as_ref());
let rhs = Self::strip_paren_expr(rhs.as_ref());
if let PromExpr::Binary(inner) = lhs {
if Self::prom_expr_eq(inner.lhs.as_ref(), rhs) {
return Some(RepeatedBinaryPattern {
repeated: rhs,
other: inner.rhs.as_ref(),
inner,
inner_repeated_on_left: true,
outer_inner_on_left: true,
});
}
if Self::prom_expr_eq(inner.rhs.as_ref(), rhs) {
return Some(RepeatedBinaryPattern {
repeated: rhs,
other: inner.lhs.as_ref(),
inner,
inner_repeated_on_left: false,
outer_inner_on_left: true,
});
}
}
if let PromExpr::Binary(inner) = rhs {
if Self::prom_expr_eq(inner.lhs.as_ref(), lhs) {
return Some(RepeatedBinaryPattern {
repeated: lhs,
other: inner.rhs.as_ref(),
inner,
inner_repeated_on_left: true,
outer_inner_on_left: false,
});
}
if Self::prom_expr_eq(inner.rhs.as_ref(), lhs) {
return Some(RepeatedBinaryPattern {
repeated: lhs,
other: inner.lhs.as_ref(),
inner,
inner_repeated_on_left: false,
outer_inner_on_left: false,
});
}
}
None
}
/// Build a inner join on time index column and tag columns to concat two logical plans.
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
#[allow(clippy::too_many_arguments)]
@@ -5102,7 +4905,7 @@ mod test {
}
#[tokio::test]
async fn repeated_tsid_binary_operand_reuses_single_join() {
async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
let prom_expr =
parser::parse("((some_metric - some_alt_metric) / some_metric) * 100").unwrap();
let eval_stmt = EvalStmt {
@@ -5133,12 +4936,12 @@ mod test {
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}");
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
}
#[tokio::test]
async fn repeated_tsid_binary_operand_uses_shorter_field_side() {
async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
let prom_expr =
parser::parse("((two_field_metric - one_field_metric) / one_field_metric) * 100")
.unwrap();
@@ -5190,6 +4993,9 @@ mod test {
})
.count();
assert_eq!(value_columns, 1, "{field_names:?}");
let plan_str = plan.display_indent_schema().to_string();
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
}
#[tokio::test]

View File

@@ -62,6 +62,7 @@ use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_nest_aggr::CountNestAggrRule;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::join_reduce::JoinReduceRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::pass_distribution::PassDistribution;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
@@ -173,6 +174,10 @@ impl QueryEngineState {
analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
let mut optimizer = Optimizer::new();
let join_reduce_insert_at = optimizer.rules.len().saturating_sub(1);
optimizer
.rules
.insert(join_reduce_insert_at, Arc::new(JoinReduceRule));
optimizer.rules.push(Arc::new(ScanHintRule));
// add physical optimizer

View File

@@ -616,14 +616,14 @@ Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
+-------+---------------------+---------------------------------------------------------------------------------+
| job | ts | cache_hit.greptime_value / cache_miss.greptime_value + cache_hit.greptime_value |
+-------+---------------------+---------------------------------------------------------------------------------+
| read | 1970-01-01T00:00:03 | 0.5 |
| read | 1970-01-01T00:00:04 | 0.75 |
| write | 1970-01-01T00:00:03 | 0.5 |
| write | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+---------------------+---------------------------------------------------------------------------------+
+-------+---------------------+-------------------------------------------------------------------------------+
| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value |
+-------+---------------------+-------------------------------------------------------------------------------+
| read | 1970-01-01T00:00:03 | 0.5 |
| read | 1970-01-01T00:00:04 | 0.75 |
| write | 1970-01-01T00:00:03 | 0.5 |
| write | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+---------------------+-------------------------------------------------------------------------------+
drop table cache_hit;
@@ -672,14 +672,14 @@ Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | cache_hit_with_null_label.greptime_value / cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
| read | | 1970-01-01T00:00:03 | 0.5 |
| read | | 1970-01-01T00:00:04 | 0.75 |
| write | | 1970-01-01T00:00:03 | 0.5 |
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);

View File

@@ -123,6 +123,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| MergeScan [is_placeholder=false, remote_input=[_|
|_| PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
|_|_PromSeriesDivide: tags=["k"]_|
@@ -154,6 +155,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
@@ -267,6 +269,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| MergeScan [is_placeholder=false, remote_input=[_|
|_| Projection: test.i AS series, test.k, test.j_|
|_|_PromInstantManipulate: range=[0..10000], lookback=[300000], interval=[5000], time index=[j]_|
@@ -299,6 +302,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test AS series;
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after extract_leaf_expressions_| SAME TEXT AS ABOVE_|
| logical_plan after push_down_leaf_projections_| SAME TEXT AS ABOVE_|
| logical_plan after JoinReduceRule_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|