diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs index 7e5568bff4..cbe4c92550 100644 --- a/src/common/function/src/scalars/math/rate.rs +++ b/src/common/function/src/scalars/math/rate.rs @@ -82,7 +82,7 @@ mod tests { #[test] fn test_rate_function() { let rate = RateFunction; - assert_eq!("prom_rate", rate.name()); + assert_eq!("rate", rate.name()); assert_eq!( ConcreteDataType::float64_datatype(), rate.return_type(&[]).unwrap() diff --git a/src/datatypes/src/vectors/dictionary.rs b/src/datatypes/src/vectors/dictionary.rs index 60b06949ac..07994d13bd 100644 --- a/src/datatypes/src/vectors/dictionary.rs +++ b/src/datatypes/src/vectors/dictionary.rs @@ -404,7 +404,7 @@ mod tests { assert_eq!( casted.data_type(), ConcreteDataType::Dictionary(DictionaryType::new( - ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), ConcreteDataType::string_datatype(), )) ); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index ee56fd9d40..76c30a45ff 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -206,6 +206,14 @@ impl SeqScan { .build(), )); } + if self.properties.partitions[partition].is_empty() { + return Ok(Box::pin(RecordBatchStreamWrapper::new( + self.stream_ctx.input.mapper.output_schema(), + common_recordbatch::EmptyRecordBatchStream::new( + self.stream_ctx.input.mapper.output_schema(), + ), + ))); + } if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { return self.scan_partition_by_series(metrics_set, partition); diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index a70bafc168..a3339a4e95 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -354,6 +354,9 @@ impl Stream for InstantManipulateStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let poll = match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { + if batch.num_rows() == 0 { + return Poll::Pending; + } let timer = std::time::Instant::now(); self.num_series.add(1); let result = Ok(batch).and_then(|batch| self.manipulate(batch)); diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index de0ebf1e66..36e1c10f42 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -319,7 +319,9 @@ impl Stream for SeriesDivideStream { let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?; let timer = std::time::Instant::now(); if let Some(next_batch) = next_batch { - self.buffer.push(next_batch); + if next_batch.num_rows() != 0 { + self.buffer.push(next_batch); + } continue; } else { // input stream is ended diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index fbcaa45fde..84b744596e 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -246,7 +246,7 @@ mod test { ]; assert_eq!(result, expected); - // assign 4 ranges to 5 partitions. Only 4 partitions are returned. + // assign 4 ranges to 5 partitions. let expected_partition_num = 5; let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); let expected = vec![ @@ -256,32 +256,31 @@ mod test { num_rows: 250, identifier: 4, }], + vec![PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }], vec![PartitionRange { start: Timestamp::new(10, TimeUnit::Second), end: Timestamp::new(20, TimeUnit::Second), num_rows: 200, identifier: 2, }], - vec![ - PartitionRange { - start: Timestamp::new(20, TimeUnit::Second), - end: Timestamp::new(30, TimeUnit::Second), - num_rows: 150, - identifier: 3, - }, - PartitionRange { - start: Timestamp::new(0, TimeUnit::Second), - end: Timestamp::new(10, TimeUnit::Second), - num_rows: 100, - identifier: 1, - }, - ], + vec![], + vec![PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }], ]; assert_eq!(result, expected); - // assign 0 ranges to 5 partitions. Only 1 partition is returned. + // assign 0 ranges to 5 partitions. Should return 5 empty ranges. let result = ParallelizeScan::assign_partition_range(vec![], 5); - assert_eq!(result.len(), 1); + assert_eq!(result.len(), 5); } #[test]