update tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-27 17:35:50 +08:00
parent aee72ab363
commit 0f521956bf
6 changed files with 32 additions and 20 deletions

View File

@@ -82,7 +82,7 @@ mod tests {
#[test]
fn test_rate_function() {
let rate = RateFunction;
assert_eq!("prom_rate", rate.name());
assert_eq!("rate", rate.name());
assert_eq!(
ConcreteDataType::float64_datatype(),
rate.return_type(&[]).unwrap()

View File

@@ -404,7 +404,7 @@ mod tests {
assert_eq!(
casted.data_type(),
ConcreteDataType::Dictionary(DictionaryType::new(
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::string_datatype(),
))
);

View File

@@ -206,6 +206,14 @@ impl SeqScan {
.build(),
));
}
if self.properties.partitions[partition].is_empty() {
return Ok(Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
common_recordbatch::EmptyRecordBatchStream::new(
self.stream_ctx.input.mapper.output_schema(),
),
)));
}
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
return self.scan_partition_by_series(metrics_set, partition);

View File

@@ -354,6 +354,9 @@ impl Stream for InstantManipulateStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if batch.num_rows() == 0 {
return Poll::Pending;
}
let timer = std::time::Instant::now();
self.num_series.add(1);
let result = Ok(batch).and_then(|batch| self.manipulate(batch));

View File

@@ -319,7 +319,9 @@ impl Stream for SeriesDivideStream {
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
let timer = std::time::Instant::now();
if let Some(next_batch) = next_batch {
self.buffer.push(next_batch);
if next_batch.num_rows() != 0 {
self.buffer.push(next_batch);
}
continue;
} else {
// input stream is ended

View File

@@ -246,7 +246,7 @@ mod test {
];
assert_eq!(result, expected);
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
// assign 4 ranges to 5 partitions.
let expected_partition_num = 5;
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
@@ -256,32 +256,31 @@ mod test {
num_rows: 250,
identifier: 4,
}],
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
}],
vec![
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
vec![],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
}],
];
assert_eq!(result, expected);
// assign 0 ranges to 5 partitions. Only 1 partition is returned.
// assign 0 ranges to 5 partitions. Should return 5 empty ranges.
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
assert_eq!(result.len(), 5);
}
#[test]