From ddcee052b29ed470b8fbfdea6967df7308abe687 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 12 Jun 2023 11:45:43 +0800 Subject: [PATCH] fix: order by optimization (#1748) * add some debug log * fix: use lazy parquet reader in MitoTable::scan_to_stream to avoid IO in plan stage * fix: unit tests * fix: order-by optimization * add some tests * fix: move metric names to metrics.rs * fix: some cr comments --- src/common/time/src/range.rs | 34 +++++++++++ src/common/time/src/timestamp.rs | 9 +++ .../benches/memtable/util/bench_context.rs | 2 +- src/storage/src/chunk.rs | 4 +- src/storage/src/compaction/writer.rs | 2 +- src/storage/src/file_purger.rs | 2 +- src/storage/src/flush.rs | 2 +- src/storage/src/memtable.rs | 2 +- src/storage/src/memtable/btree.rs | 4 +- src/storage/src/memtable/inserter.rs | 2 +- src/storage/src/memtable/tests.rs | 22 +++---- src/storage/src/metrics.rs | 4 ++ src/storage/src/read/windowed.rs | 13 +++- src/storage/src/region/tests.rs | 18 ++++++ src/storage/src/region/tests/flush.rs | 59 ++++++++++++++++++- src/storage/src/sst.rs | 35 +++++++++-- src/storage/src/sst/parquet.rs | 10 ++-- src/storage/src/window_infer.rs | 30 +++++----- tests-integration/src/tests/instance_test.rs | 24 +++----- 19 files changed, 217 insertions(+), 61 deletions(-) diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index 8e007896b0..ce61fea10e 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Debug, Display, Formatter}; + use crate::timestamp::TimeUnit; use crate::timestamp_millis::TimestampMillis; use crate::Timestamp; @@ -193,6 +195,38 @@ impl GenericRange { pub type TimestampRange = GenericRange; +impl Display for TimestampRange { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let s = match (&self.start, &self.end) { + (Some(start), Some(end)) => { + format!( + "TimestampRange{{[{}{},{}{})}}", + start.value(), + start.unit().short_name(), + end.value(), + end.unit().short_name() + ) + } + (Some(start), None) => { + format!( + "TimestampRange{{[{}{},#)}}", + start.value(), + start.unit().short_name() + ) + } + (None, Some(end)) => { + format!( + "TimestampRange{{[#,{}{})}}", + end.value(), + end.unit().short_name() + ) + } + (None, None) => "TimestampRange{{[#,#)}}".to_string(), + }; + f.write_str(&s) + } +} + impl TimestampRange { /// Create a TimestampRange with optional inclusive end timestamp. /// If end timestamp is present and is less than start timestamp, this method will return diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 0678fa5a35..0d4b546b75 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -336,6 +336,15 @@ impl TimeUnit { TimeUnit::Nanosecond => 1, } } + + pub(crate) fn short_name(&self) -> &'static str { + match self { + TimeUnit::Second => "s", + TimeUnit::Millisecond => "ms", + TimeUnit::Microsecond => "us", + TimeUnit::Nanosecond => "ns", + } + } } impl PartialOrd for Timestamp { diff --git a/src/storage/benches/memtable/util/bench_context.rs b/src/storage/benches/memtable/util/bench_context.rs index 9ecf1dd871..866269aa92 100644 --- a/src/storage/benches/memtable/util/bench_context.rs +++ b/src/storage/benches/memtable/util/bench_context.rs @@ -41,7 +41,7 @@ impl BenchContext { batch_size, ..Default::default() }; - let iter = self.memtable.iter(&iter_ctx).unwrap(); + let iter = self.memtable.iter(iter_ctx).unwrap(); for batch in iter { batch.unwrap(); read_count += batch_size; diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index b98a5e264f..a4370efdf2 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -220,7 +220,9 @@ impl ChunkReaderBuilder { .batch_size(self.iter_ctx.batch_size); for mem in &self.memtables { - let iter = mem.iter(&self.iter_ctx)?; + let mut iter_ctx = self.iter_ctx.clone(); + iter_ctx.time_range = Some(*time_range); + let iter = mem.iter(iter_ctx)?; reader_builder = reader_builder.push_batch_iter(iter); } diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 650ab28ccb..9bd6f87a34 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -217,7 +217,7 @@ mod tests { seq.fetch_add(1, Ordering::Relaxed); } - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let file_path = sst_file_id.as_parquet(); let writer = ParquetWriter::new(&file_path, Source::Iter(iter), object_store.clone()); diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index b7bc1e04bc..09373ac69e 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -143,7 +143,7 @@ mod tests { &[(Some(1), Some(1)), (Some(2), Some(2))], ); - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let sst_path = "table1"; let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone())); let sst_info = layer diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 2b91774c79..9dd9361914 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -267,7 +267,7 @@ impl FlushJob { let file_id = FileId::random(); // TODO(hl): Check if random file name already exists in meta. - let iter = m.iter(&iter_ctx)?; + let iter = m.iter(iter_ctx.clone())?; let sst_layer = self.sst_layer.clone(); let write_options = WriteOptions { sst_write_buffer_size: self.engine_config.sst_write_buffer_size, diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 4bfe2ba814..9a4aea5dd9 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -73,7 +73,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { fn write(&self, kvs: &KeyValues) -> Result<()>; /// Iterates the memtable. - fn iter(&self, ctx: &IterContext) -> Result; + fn iter(&self, ctx: IterContext) -> Result; /// Returns the estimated bytes allocated by this memtable from heap. Result /// of this method may be larger than the estimated based on [`num_rows`] because diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index e6f1ac8fb5..0400a2e3a1 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -145,10 +145,10 @@ impl Memtable for BTreeMemtable { Ok(()) } - fn iter(&self, ctx: &IterContext) -> Result { + fn iter(&self, ctx: IterContext) -> Result { assert!(ctx.batch_size > 0); - let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone())?; + let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone())?; Ok(Box::new(iter)) } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 87cd702754..5fa263e1b5 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -179,7 +179,7 @@ mod tests { max_ts: i64, min_ts: i64, ) { - let iter = mem.iter(&IterContext::default()).unwrap(); + let iter = mem.iter(IterContext::default()).unwrap(); assert_eq!(min_ts, mem.stats().min_timestamp.value()); assert_eq!(max_ts, mem.stats().max_timestamp.value()); diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 557a700ff5..6c1b5628d5 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -196,7 +196,7 @@ struct TestContext { fn write_iter_memtable_case(ctx: &TestContext) { // Test iterating an empty memtable. - let mut iter = ctx.memtable.iter(&IterContext::default()).unwrap(); + let mut iter = ctx.memtable.iter(IterContext::default()).unwrap(); assert!(iter.next().is_none()); // Poll the empty iterator again. assert!(iter.next().is_none()); @@ -234,7 +234,7 @@ fn write_iter_memtable_case(ctx: &TestContext) { batch_size, ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap(); assert_eq!( ctx.schema.user_schema(), iter.schema().projected_user_schema() @@ -322,7 +322,7 @@ fn test_iter_batch_size() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap(); check_iter_batch_size(&mut *iter, total, batch_size); } }); @@ -355,7 +355,7 @@ fn test_duplicate_key_across_batch() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap(); check_iter_content( &mut *iter, &[1000, 1001, 2000, 2001], // keys @@ -391,7 +391,7 @@ fn test_duplicate_key_in_batch() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx.clone()).unwrap(); check_iter_content( &mut *iter, &[1000, 1001, 2001], // keys @@ -440,7 +440,7 @@ fn test_sequence_visibility() { time_range: None, }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); check_iter_content( &mut *iter, &[], // keys @@ -459,7 +459,7 @@ fn test_sequence_visibility() { time_range: None, }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); check_iter_content( &mut *iter, &[1000], // keys @@ -478,7 +478,7 @@ fn test_sequence_visibility() { time_range: None, }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); check_iter_content( &mut *iter, &[1000], // keys @@ -507,7 +507,7 @@ fn test_iter_after_none() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); assert!(iter.next().is_some()); assert!(iter.next().is_none()); assert!(iter.next().is_none()); @@ -538,7 +538,7 @@ fn test_filter_memtable() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(5, batch.columns.len()); assert_eq!( @@ -574,7 +574,7 @@ fn test_memtable_projection() { ..Default::default() }; - let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); let batch = iter.next().unwrap().unwrap(); assert!(iter.next().is_none()); diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index ce70a14b9a..9cc4b37a04 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -38,3 +38,7 @@ pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes"; pub const MEMTABLE_WRITE_ELAPSED: &str = "storage.memtable.write.elapsed"; /// Elapsed time of preprocessing write batch. pub const PREPROCESS_ELAPSED: &str = "storage.write.preprocess.elapsed"; +/// Elapsed time for windowed scan +pub const WINDOW_SCAN_ELAPSED: &str = "query.scan.window_scan.elapsed"; +/// Rows per window during window scan +pub const WINDOW_SCAN_ROWS_PER_WINDOW: &str = "query.scan.window_scan.window_row_size"; diff --git a/src/storage/src/read/windowed.rs b/src/storage/src/read/windowed.rs index 44290105bf..83e8716e7b 100644 --- a/src/storage/src/read/windowed.rs +++ b/src/storage/src/read/windowed.rs @@ -16,8 +16,10 @@ use arrow::compute::SortOptions; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, ArrayRef}; use common_recordbatch::OrderOption; +use common_telemetry::timer; use datatypes::data_type::DataType; use datatypes::vectors::Helper; +use metrics::histogram; use snafu::ResultExt; use crate::error::{self, Result}; @@ -60,6 +62,7 @@ where R: BatchReader, { async fn next_batch(&mut self) -> Result> { + let _window_scan_elapsed = timer!(crate::metrics::WINDOW_SCAN_ELAPSED); let Some(mut reader) = self.readers.pop() else { return Ok(None); }; let store_schema = self.schema.schema_to_read(); @@ -75,7 +78,12 @@ where } let Some(num_columns) = batches.get(0).map(|b| b.len()) else { - return Ok(Some(Batch::new(vec![]))); + // the reader does not yield data, a batch of empty vectors must be returned instead of + // an empty batch without any column. + let empty_columns = store_schema.columns().iter().map(|s| { + s.desc.data_type.create_mutable_vector(0).to_vector() + }).collect(); + return Ok(Some(Batch::new(empty_columns))); }; let mut vectors_in_batch = Vec::with_capacity(num_columns); @@ -85,6 +93,9 @@ where vectors_in_batch .push(arrow::compute::concat(&columns).context(error::ConvertColumnsToRowsSnafu)?); } + if let Some(v) = vectors_in_batch.get(0) { + histogram!(crate::metrics::WINDOW_SCAN_ROWS_PER_WINDOW, v.len() as f64); + } let sorted = sort_by_rows(&self.schema, vectors_in_batch, &self.order_options)?; let vectors = sorted .iter() diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 3e97efe766..0ab111b60b 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -160,6 +160,24 @@ impl TesterBase { dst } + pub async fn scan(&self, req: ScanRequest) -> Vec<(i64, Option)> { + logging::info!("Full scan with ctx {:?}", self.read_ctx); + let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); + + let resp = snapshot.scan(&self.read_ctx, req).await.unwrap(); + let mut reader = resp.reader; + + let metadata = self.region.in_memory_metadata(); + assert_eq!(metadata.schema(), reader.user_schema()); + + let mut dst = Vec::new(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + let chunk = reader.project_chunk(chunk); + append_chunk_to(&chunk, &mut dst); + } + dst + } + pub fn committed_sequence(&self) -> SequenceNumber { self.region.committed_sequence() } diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 88f7c94baf..b0901e7f50 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -17,9 +17,15 @@ use std::sync::Arc; use std::time::Duration; +use arrow::compute::SortOptions; +use common_query::prelude::Expr; +use common_recordbatch::OrderOption; use common_test_util::temp_dir::create_temp_dir; +use datafusion_common::Column; use log_store::raft_engine::log_store::RaftEngineLogStore; -use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse}; +use store_api::storage::{ + FlushContext, FlushReason, OpenOptions, Region, ScanRequest, WriteResponse, +}; use crate::engine::{self, RegionMap}; use crate::flush::{FlushStrategyRef, FlushType}; @@ -105,6 +111,10 @@ impl FlushTester { self.base().full_scan().await } + async fn scan(&self, req: ScanRequest) -> Vec<(i64, Option)> { + self.base().scan(req).await + } + async fn flush(&self, wait: Option) { let ctx = wait .map(|wait| FlushContext { @@ -347,3 +357,50 @@ async fn test_schedule_engine_flush() { let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); assert!(has_parquet_file(&sst_dir)); } + +#[tokio::test] +async fn test_flush_and_query_empty() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("flush_and_query_empty_range"); + let store_dir = dir.path().to_str().unwrap(); + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + tester + .put( + &(20000..30000) + .map(|v| (v as i64, Some(v as i64))) + .collect::>(), + ) + .await; + tester.flush(Some(true)).await; + + tester + .put( + &(20100..20200) + .map(|v| (v as i64, Some(v as i64))) + .collect::>(), + ) + .await; + tester.flush(Some(true)).await; + + use datafusion_expr::Expr as DfExpr; + let req = ScanRequest { + sequence: None, + projection: None, + filters: vec![Expr::from(datafusion_expr::binary_expr( + DfExpr::Column(Column::from("timestamp")), + datafusion_expr::Operator::GtEq, + datafusion_expr::lit(20000), + ))], + output_ordering: Some(vec![OrderOption { + name: "timestamp".to_string(), + options: SortOptions { + descending: true, + nulls_first: true, + }, + }]), + limit: Some(1), + }; + let _ = tester.scan(req).await; +} diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index e3737b81ed..25f738678c 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -41,10 +41,10 @@ use crate::error; use crate::error::{DeleteSstSnafu, Result}; use crate::file_purger::{FilePurgeRequest, FilePurgerRef}; use crate::memtable::BoxedBatchIterator; -use crate::read::{Batch, BoxedBatchReader}; +use crate::read::{Batch, BatchReader, BoxedBatchReader}; use crate::scheduler::Scheduler; use crate::schema::ProjectedSchemaRef; -use crate::sst::parquet::{ParquetReader, ParquetWriter}; +use crate::sst::parquet::{ChunkStream, ParquetReader, ParquetWriter}; /// Maximum level of SSTs. pub const MAX_LEVEL: u8 = 2; @@ -574,8 +574,7 @@ impl AccessLayer for FsAccessLayer { opts.time_range, ); - let stream = reader.chunk_stream().await?; - Ok(Box::new(stream)) + Ok(Box::new(LazyParquetBatchReader::new(reader))) } /// Deletes a SST file with given file id. @@ -588,6 +587,34 @@ impl AccessLayer for FsAccessLayer { } } +struct LazyParquetBatchReader { + inner: ParquetReader, + stream: Option, +} + +impl LazyParquetBatchReader { + fn new(inner: ParquetReader) -> Self { + Self { + inner, + stream: None, + } + } +} + +#[async_trait] +impl BatchReader for LazyParquetBatchReader { + async fn next_batch(&mut self) -> Result> { + if let Some(s) = &mut self.stream { + s.next_batch().await + } else { + let mut stream = self.inner.chunk_stream().await?; + let res = stream.next_batch().await; + self.stream = Some(stream); + res + } + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 7f999d0931..47943a9d30 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -579,7 +579,7 @@ mod tests { let object_store = create_object_store(path); let sst_file_name = "test-flush.parquet"; - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); writer @@ -663,7 +663,7 @@ mod tests { let object_store = create_object_store(path); let sst_file_handle = new_file_handle(FileId::random()); let sst_file_name = sst_file_handle.file_name(); - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { @@ -749,7 +749,7 @@ mod tests { let object_store = create_object_store(path); let file_handle = new_file_handle(FileId::random()); let sst_file_name = file_handle.file_name(); - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { @@ -855,7 +855,7 @@ mod tests { let object_store = create_object_store(path); let sst_file_handle = new_file_handle(FileId::random()); let sst_file_name = sst_file_handle.file_name(); - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { @@ -950,7 +950,7 @@ mod tests { builder.root(path); let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_name = "test-read.parquet"; - let iter = memtable.iter(&IterContext::default()).unwrap(); + let iter = memtable.iter(IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); let sst_info_opt = writer diff --git a/src/storage/src/window_infer.rs b/src/storage/src/window_infer.rs index e3393af287..efc256176b 100644 --- a/src/storage/src/window_infer.rs +++ b/src/storage/src/window_infer.rs @@ -23,10 +23,11 @@ use crate::memtable::MemtableStats; use crate::sst::FileMeta; /// A set of predefined time windows. -const TIME_WINDOW_SIZE: [i64; 9] = [ +const TIME_WINDOW_SIZE: [i64; 10] = [ + 1, // 1 second 60, // 1 minute - 60 * 10, // 10 minute - 60 * 30, // 30 minute + 60 * 10, // 10 minutes + 60 * 30, // 30 minutes 60 * 60, // 1 hour 2 * 60 * 60, // 2 hours 6 * 60 * 60, // 6 hours @@ -160,11 +161,12 @@ mod tests { #[test] fn test_get_time_window_size() { - assert_eq!(60, min_duration_to_window_size(0)); + assert_eq!(1, min_duration_to_window_size(0)); for window in TIME_WINDOW_SIZE { assert_eq!(window, min_duration_to_window_size(window)); } - assert_eq!(60, min_duration_to_window_size(1)); + assert_eq!(1, min_duration_to_window_size(1)); + assert_eq!(60, min_duration_to_window_size(60)); assert_eq!(60 * 10, min_duration_to_window_size(100)); assert_eq!(60 * 30, min_duration_to_window_size(1800)); assert_eq!(60 * 60, min_duration_to_window_size(3000)); @@ -244,7 +246,7 @@ mod tests { true, ); assert_eq!( - vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap()], + vec![TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(),], res ); @@ -295,14 +297,14 @@ mod tests { }], true, ); - assert_eq!( - vec![ - TimestampRange::with_unit(0, 60, TimeUnit::Second).unwrap(), - TimestampRange::with_unit(60, 120, TimeUnit::Second).unwrap(), - TimestampRange::with_unit(60 * 60, 61 * 60, TimeUnit::Second).unwrap(), - ], - res - ); + + let mut expect = (0..=61) + .map(|s| TimestampRange::with_unit(s, s + 1, TimeUnit::Second).unwrap()) + .collect::>(); + expect.push(TimestampRange::with_unit(60 * 60, 60 * 60 + 1, TimeUnit::Second).unwrap()); + expect.push(TimestampRange::with_unit(60 * 60 + 1, 60 * 60 + 2, TimeUnit::Second).unwrap()); + + assert_eq!(expect, res); let res = window_inference.infer_window( &[ diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 67c0db3eea..b890c3c013 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -63,16 +63,13 @@ async fn test_create_database_and_insert_query(instance: Arc) .await; assert!(matches!(output, Output::AffectedRows(2))); - let query_output = execute_sql(&instance, "select ts from test.demo order by ts").await; + let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await; match query_output { Output::Stream(s) => { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( - Arc::new(Int64Vector::from_vec(vec![ - 1655276557000_i64, - 1655276558000_i64 - ])) as VectorRef, + Arc::new(Int64Vector::from_vec(vec![1655276557000_i64])) as VectorRef, *batches[0].column(0) ); } @@ -193,7 +190,7 @@ async fn test_issue477_same_table_name_in_different_databases(instance: Arc { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( - Arc::new(Int64Vector::from_vec(vec![ - 1655276557000_i64, - 1655276558000_i64 - ])) as VectorRef, + Arc::new(Int64Vector::from_vec(vec![1655276557000_i64,])) as VectorRef, *batches[0].column(0) ); } _ => unreachable!(), } - let query_output = execute_sql(&instance, "select ts as time from demo order by ts").await; + let query_output = + execute_sql(&instance, "select ts as time from demo order by ts limit 1").await; match query_output { Output::Stream(s) => { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( - Arc::new(Int64Vector::from_vec(vec![ - 1655276557000_i64, - 1655276558000_i64 - ])) as VectorRef, + Arc::new(Int64Vector::from_vec(vec![1655276557000_i64,])) as VectorRef, *batches[0].column(0) ); }