mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
cap output batch size
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -406,6 +406,8 @@ impl ExecutionPlan for AbsentExec {
|
||||
metric: baseline_metric,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec::new(),
|
||||
input_timestamps: Vec::new(),
|
||||
input_timestamp_offset: 0,
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: self.start,
|
||||
input_finished: false,
|
||||
@@ -448,6 +450,9 @@ pub struct AbsentStream {
|
||||
metric: BaselineMetrics,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec<Millisecond>,
|
||||
// Current input timestamps being processed incrementally.
|
||||
input_timestamps: Vec<Millisecond>,
|
||||
input_timestamp_offset: usize,
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: Millisecond,
|
||||
input_finished: bool,
|
||||
@@ -464,52 +469,53 @@ impl Stream for AbsentStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if !self.input_finished {
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(Ok(batch)) => {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_input_batch(&batch) {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
// If we have enough data for a batch, output it
|
||||
if self.output_timestamps.len() >= self.batch_size {
|
||||
let timer = std::time::Instant::now();
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match result {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
|
||||
None => {
|
||||
self.input_finished = true;
|
||||
|
||||
let timer = std::time::Instant::now();
|
||||
// Process any remaining absent timestamps
|
||||
if let Err(e) = self.process_remaining_absent_timestamps() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
return Poll::Ready(result.transpose());
|
||||
}
|
||||
if self.has_pending_input_timestamps() {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_input_batch() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match self.flush_output_batch() {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
|
||||
if self.input_finished {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_remaining_absent_timestamps() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match self.flush_output_batch() {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => return Poll::Ready(None),
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(Ok(batch)) => {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.buffer_input_timestamps(&batch) {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
}
|
||||
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
|
||||
None => {
|
||||
self.input_finished = true;
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AbsentStream {
|
||||
fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
|
||||
// Extract timestamps from this batch
|
||||
fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
|
||||
let timestamp_array = batch.column(self.time_index_column_index);
|
||||
let milli_ts_array = arrow::compute::cast(
|
||||
timestamp_array,
|
||||
@@ -519,29 +525,52 @@ impl AbsentStream {
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
self.input_timestamps.clear();
|
||||
self.input_timestamps
|
||||
.extend_from_slice(timestamp_array.values());
|
||||
self.input_timestamp_offset = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_pending_input_timestamps(&self) -> bool {
|
||||
self.input_timestamp_offset < self.input_timestamps.len()
|
||||
}
|
||||
|
||||
fn process_input_batch(&mut self) -> DataFusionResult<()> {
|
||||
while self.input_timestamp_offset < self.input_timestamps.len() {
|
||||
let input_ts = self.input_timestamps[self.input_timestamp_offset];
|
||||
|
||||
// Process against current output cursor position
|
||||
for &input_ts in timestamp_array.values() {
|
||||
// Generate absent timestamps up to this input timestamp
|
||||
while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
|
||||
if self.output_timestamps.len() >= self.batch_size {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Skip the input timestamp if it matches our cursor
|
||||
if self.output_ts_cursor == input_ts {
|
||||
self.output_ts_cursor += self.step;
|
||||
}
|
||||
|
||||
self.input_timestamp_offset += 1;
|
||||
}
|
||||
|
||||
self.input_timestamps.clear();
|
||||
self.input_timestamp_offset = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
|
||||
// Generate all remaining absent timestamps (input is finished)
|
||||
while self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
|
||||
if self.output_timestamps.len() >= self.batch_size {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -551,11 +580,16 @@ impl AbsentStream {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let timestamps = if self.output_timestamps.len() <= self.batch_size {
|
||||
std::mem::take(&mut self.output_timestamps)
|
||||
} else {
|
||||
let remaining = self.output_timestamps.split_off(self.batch_size);
|
||||
std::mem::replace(&mut self.output_timestamps, remaining)
|
||||
};
|
||||
|
||||
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
|
||||
let num_rows = self.output_timestamps.len();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(
|
||||
self.output_timestamps.clone(),
|
||||
)) as _);
|
||||
let num_rows = timestamps.len();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _);
|
||||
columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
|
||||
|
||||
for (_, value) in self.fake_labels.iter() {
|
||||
@@ -567,7 +601,6 @@ impl AbsentStream {
|
||||
|
||||
let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
|
||||
|
||||
self.output_timestamps.clear();
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
@@ -580,7 +613,7 @@ mod tests {
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::catalog::memory::DataSourceExec;
|
||||
use datafusion::datasource::memory::MemorySourceConfig;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion::prelude::{SessionConfig, SessionContext};
|
||||
use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
|
||||
use super::*;
|
||||
@@ -725,4 +758,77 @@ mod tests {
|
||||
// Should output all timestamps in range: 0, 1000, 2000
|
||||
assert_eq!(output_timestamps, vec![0, 1000, 2000]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_respects_session_batch_size_for_large_gap() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
|
||||
let value_array = Arc::new(Float64Array::from(vec![1.0]));
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
|
||||
|
||||
let memory_exec = DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
|
||||
));
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let absent_exec = AbsentExec {
|
||||
start: 0,
|
||||
end: 10,
|
||||
step: 1,
|
||||
time_index_column: "timestamp".to_string(),
|
||||
value_column: "value".to_string(),
|
||||
fake_labels: vec![],
|
||||
output_schema: output_schema.clone(),
|
||||
input: Arc::new(memory_exec),
|
||||
properties: Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
)),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
};
|
||||
|
||||
let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
|
||||
let task_ctx = session_ctx.task_ctx();
|
||||
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
|
||||
|
||||
let mut batch_sizes = Vec::new();
|
||||
let mut output_timestamps = Vec::new();
|
||||
while let Some(batch_result) = stream.next().await {
|
||||
let batch = batch_result.unwrap();
|
||||
batch_sizes.push(batch.num_rows());
|
||||
|
||||
let ts_array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
for i in 0..ts_array.len() {
|
||||
if !ts_array.is_null(i) {
|
||||
output_timestamps.push(ts_array.value(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(batch_sizes, vec![3, 3, 3, 1]);
|
||||
assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user