fix tsid reuse

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-14 16:13:28 +08:00
parent fe2a9f91d4
commit 97bd151e54

View File

@@ -241,8 +241,7 @@ impl InstantManipulate {
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let reuse_all_non_sample_columns =
matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
Arc::new(InstantManipulateExec {
start: self.start,
@@ -251,7 +250,7 @@ impl InstantManipulate {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_all_non_sample_columns,
reuse_tsid_column,
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
@@ -313,7 +312,7 @@ pub struct InstantManipulateExec {
interval: Millisecond,
time_index_column: String,
field_column: Option<String>,
reuse_all_non_sample_columns: bool,
reuse_tsid_column: bool,
input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
@@ -357,7 +356,7 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
reuse_all_non_sample_columns: self.reuse_all_non_sample_columns,
reuse_tsid_column: self.reuse_tsid_column,
input: children[0].clone(),
metric: self.metric.clone(),
}))
@@ -388,10 +387,10 @@ impl ExecutionPlan for InstantManipulateExec {
.as_ref()
.and_then(|name| schema.column_with_name(name))
.map(|x| x.0);
let reuse_all_non_sample_columns = self.reuse_all_non_sample_columns
&& schema
.column_with_name("__tsid")
.is_some_and(|(_, field)| field.data_type() == &DataType::UInt64);
let tsid_index = schema
.column_with_name("__tsid")
.filter(|(_, field)| field.data_type() == &DataType::UInt64)
.map(|(index, _)| index);
Ok(Box::pin(InstantManipulateStream {
start: self.start,
end: self.end,
@@ -399,7 +398,8 @@ impl ExecutionPlan for InstantManipulateExec {
interval: self.interval,
time_index,
field_index,
reuse_all_non_sample_columns,
tsid_index,
reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(),
schema,
input,
metric: baseline_metric,
@@ -465,7 +465,8 @@ pub struct InstantManipulateStream {
// Column index of TIME INDEX column's position in schema
time_index: usize,
field_index: Option<usize>,
reuse_all_non_sample_columns: bool,
tsid_index: Option<usize>,
reuse_tsid_column: bool,
schema: SchemaRef,
input: SendableRecordBatchStream,
@@ -650,7 +651,7 @@ impl InstantManipulateStream {
continue;
}
if self.reuse_all_non_sample_columns && Some(index) != self.field_index {
if self.reuse_tsid_column && self.tsid_index == Some(index) {
arrays.push(reuse_constant_column(array, output_len)?);
continue;
}
@@ -712,7 +713,7 @@ mod test {
interval,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_all_non_sample_columns: false,
reuse_tsid_column: false,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -860,7 +861,7 @@ mod test {
}
#[tokio::test]
async fn tsid_fast_path_reuses_non_sample_columns_when_output_grows() {
async fn tsid_fast_path_reuses_tsid_column_when_output_grows() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
@@ -893,7 +894,7 @@ mod test {
interval: 500,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_all_non_sample_columns: true,
reuse_tsid_column: true,
input,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -918,6 +919,67 @@ mod test {
);
}
#[tokio::test]
async fn tsid_fast_path_still_takes_additional_field_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new(
TIME_INDEX_COLUMN,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new("value", DataType::Float64, true),
Field::new("value_2", DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new("__tsid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
Arc::new(Float64Array::from(vec![10.0, 20.0])),
Arc::new(datafusion::arrow::array::StringArray::from(vec![
"foo", "foo",
])),
Arc::new(UInt64Array::from(vec![42, 42])),
],
)
.unwrap();
let input = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
)));
let normalize_exec = Arc::new(InstantManipulateExec {
start: 0,
end: 1_500,
lookback_delta: 1_000,
interval: 500,
time_index_column: TIME_INDEX_COLUMN.to_string(),
field_column: Some("value".to_string()),
reuse_tsid_column: true,
input,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(
result_literal,
"+-------------------------+-------+---------+------+--------+\
\n| timestamp | value | value_2 | host | __tsid |\
\n+-------------------------+-------+---------+------+--------+\
\n| 1970-01-01T00:00:00 | 1.0 | 10.0 | foo | 42 |\
\n| 1970-01-01T00:00:00.500 | 1.0 | 10.0 | foo | 42 |\
\n| 1970-01-01T00:00:01 | 2.0 | 20.0 | foo | 42 |\
\n| 1970-01-01T00:00:01.500 | 2.0 | 20.0 | foo | 42 |\
\n+-------------------------+-------+---------+------+--------+"
);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(