diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 2bd8c21e75..e607717842 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -25,6 +25,7 @@ pub(crate) mod write_cache; use std::mem; use std::sync::Arc; +use bytes::Bytes; use datatypes::value::Value; use datatypes::vectors::VectorRef; use moka::notification::RemovalCause; @@ -393,20 +394,59 @@ impl SstMetaKey { } } +/// Path to column pages in the SST file. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ColumnPagePath { + /// Region id of the SST file to cache. + region_id: RegionId, + /// Id of the SST file to cache. + file_id: FileId, + /// Index of the row group. + row_group_idx: usize, + /// Index of the column in the row group. + column_idx: usize, +} + /// Cache key for pages of a SST row group. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PageKey { - /// Region id of the SST file to cache. - pub region_id: RegionId, - /// Id of the SST file to cache. - pub file_id: FileId, - /// Index of the row group. - pub row_group_idx: usize, - /// Index of the column in the row group. - pub column_idx: usize, +pub enum PageKey { + /// Cache key for a compressed page in a row group. + Compressed(ColumnPagePath), + /// Cache key for all uncompressed pages in a row group. + Uncompressed(ColumnPagePath), } impl PageKey { + /// Creates a key for a compressed page. + pub fn new_compressed( + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + column_idx: usize, + ) -> PageKey { + PageKey::Compressed(ColumnPagePath { + region_id, + file_id, + row_group_idx, + column_idx, + }) + } + + /// Creates a key for all uncompressed pages in a row group. + pub fn new_uncompressed( + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + column_idx: usize, + ) -> PageKey { + PageKey::Uncompressed(ColumnPagePath { + region_id, + file_id, + row_group_idx, + column_idx, + }) + } + /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { mem::size_of::() @@ -414,21 +454,41 @@ impl PageKey { } /// Cached row group pages for a column. +// We don't use enum here to make it easier to mock and use the struct. +#[derive(Default)] pub struct PageValue { + /// Compressed page of the column in the row group. + pub compressed: Bytes, /// All pages of the column in the row group. - pub pages: Vec, + pub row_group: Vec, } impl PageValue { - /// Creates a new page value. - pub fn new(pages: Vec) -> PageValue { - PageValue { pages } + /// Creates a new value from a compressed page. + pub fn new_compressed(bytes: Bytes) -> PageValue { + PageValue { + compressed: bytes, + row_group: vec![], + } + } + + /// Creates a new value from all pages in a row group. + pub fn new_row_group(pages: Vec) -> PageValue { + PageValue { + compressed: Bytes::new(), + row_group: pages, + } } /// Returns memory used by the value (estimated). fn estimated_size(&self) -> usize { - // We only consider heap size of all pages. - self.pages.iter().map(|page| page.buffer().len()).sum() + mem::size_of::() + + self.compressed.len() + + self + .row_group + .iter() + .map(|page| page.buffer().len()) + .sum::() } } @@ -507,13 +567,8 @@ mod tests { .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) .is_none()); - let key = PageKey { - region_id, - file_id, - row_group_idx: 0, - column_idx: 0, - }; - let pages = Arc::new(PageValue::new(Vec::new())); + let key = PageKey::new_uncompressed(region_id, file_id, 0, 0); + let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_none()); @@ -562,14 +617,9 @@ mod tests { let cache = CacheManager::builder().page_cache_size(1000).build(); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); - let key = PageKey { - region_id, - file_id, - row_group_idx: 0, - column_idx: 0, - }; + let key = PageKey::new_compressed(region_id, file_id, 0, 0); assert!(cache.get_pages(&key).is_none()); - let pages = Arc::new(PageValue::new(Vec::new())); + let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_some()); } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 355c0fba47..c50bbbdc78 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -133,6 +133,7 @@ lazy_static! { vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); + pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]); /// Counter of rows read from different source. pub static ref READ_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index b827388080..8060c53405 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -89,6 +89,9 @@ impl Drop for MergeReader { READ_STAGE_ELAPSED .with_label_values(&["merge"]) .observe(self.metrics.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["merge_fetch"]) + .observe(self.metrics.fetch_cost.as_secs_f64()); } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 716d372945..40f18c393b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -901,10 +901,21 @@ impl ScanPartList { }) } + /// Returns the number of files. + pub(crate) fn num_files(&self) -> usize { + self.0.as_ref().map_or(0, |parts| { + parts.iter().map(|part| part.file_ranges.len()).sum() + }) + } + /// Returns the number of file ranges. pub(crate) fn num_file_ranges(&self) -> usize { self.0.as_ref().map_or(0, |parts| { - parts.iter().map(|part| part.file_ranges.len()).sum() + parts + .iter() + .flat_map(|part| part.file_ranges.iter()) + .map(|ranges| ranges.len()) + .sum() }) } } @@ -947,9 +958,10 @@ impl StreamContext { Ok(inner) => match t { DisplayFormatType::Default => write!( f, - "partition_count={} ({} memtable ranges, {} file ranges)", + "partition_count={} ({} memtable ranges, {} file {} ranges)", inner.0.len(), inner.0.num_mem_ranges(), + inner.0.num_files(), inner.0.num_file_ranges() )?, DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index ca232df834..296d55250b 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -166,8 +166,8 @@ impl SeqScan { reader_metrics.merge_from(reader.metrics()); } debug!( - "Seq scan region {}, file {}, {} ranges finished, metrics: {:?}", - region_id, file_id, range_num, reader_metrics + "Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}", + region_id, file_id, range_num, reader_metrics, compaction ); // Reports metrics. reader_metrics.observe_rows(read_type); @@ -238,11 +238,12 @@ impl SeqScan { let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await; let build_reader_cost = build_start.elapsed(); metrics.build_reader_cost += build_reader_cost; - common_telemetry::debug!( - "Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}", + debug!( + "Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}", stream_ctx.input.mapper.metadata().region_id, range_id, - build_reader_cost + build_reader_cost, + compaction, ); maybe_reader @@ -354,11 +355,12 @@ impl SeqScan { metrics.observe_metrics_on_finish(); debug!( - "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}", + "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}", stream_ctx.input.mapper.metadata().region_id, partition, metrics, first_poll, + compaction, ); } }; @@ -450,13 +452,14 @@ impl SeqScan { metrics.total_cost = stream_ctx.query_start.elapsed(); metrics.observe_metrics_on_finish(); - common_telemetry::debug!( - "Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}", + debug!( + "Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}", stream_ctx.input.mapper.metadata().region_id, partition, id, metrics, first_poll, + compaction, ); } }; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 5dfcc519d6..67e87197a6 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -152,7 +152,6 @@ impl RegionScanner for UnorderedScan { let parallelism = self.properties.num_partitions(); let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); - let part = { let mut parts = stream_ctx.parts.lock().await; maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism) @@ -180,6 +179,7 @@ impl RegionScanner for UnorderedScan { .map_err(BoxedError::new) .context(ExternalSnafu)?; metrics.build_reader_cost = build_reader_start.elapsed(); + let query_start = stream_ctx.query_start; let cache = stream_ctx.input.cache_manager.as_deref(); // Scans memtables first. @@ -217,8 +217,8 @@ impl RegionScanner for UnorderedScan { metrics.total_cost = query_start.elapsed(); metrics.observe_metrics_on_finish(); debug!( - "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}", - partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, + "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}", + partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(), ); }; let stream = Box::pin(RecordBatchStreamWrapper::new( @@ -343,14 +343,14 @@ impl UnorderedDistributor { let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1); let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1); - common_telemetry::debug!( - "Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", - parallelism, - self.mem_ranges.len(), - self.file_ranges.len(), - mems_per_part, - ranges_per_part - ); + debug!( + "Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", + parallelism, + self.mem_ranges.len(), + self.file_ranges.len(), + mems_per_part, + ranges_per_part + ); let mut scan_parts = self .mem_ranges .chunks(mems_per_part) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 34819c0c71..cce9965539 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -216,22 +216,16 @@ mod tests { .await; } + // Doesn't have compressed page cached. + let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0); + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + // Cache 4 row groups. for i in 0..4 { - let page_key = PageKey { - region_id: metadata.region_id, - file_id: handle.file_id(), - row_group_idx: i, - column_idx: 0, - }; + let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0); assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); } - let page_key = PageKey { - region_id: metadata.region_id, - file_id: handle.file_id(), - row_group_idx: 5, - column_idx: 0, - }; + let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0); assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); } diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs index 1416da448b..b2f69e2dd7 100644 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -19,14 +19,14 @@ use std::collections::VecDeque; use parquet::column::page::{Page, PageMetadata, PageReader}; use parquet::errors::Result; -/// A reader that reads from cached pages. -pub(crate) struct CachedPageReader { +/// A reader that reads all pages from a cache. +pub(crate) struct RowGroupCachedReader { /// Cached pages. pages: VecDeque, } -impl CachedPageReader { - /// Returns a new reader from existing pages. +impl RowGroupCachedReader { + /// Returns a new reader from pages of a column in a row group. pub(crate) fn new(pages: &[Page]) -> Self { Self { pages: pages.iter().cloned().collect(), @@ -34,7 +34,7 @@ impl CachedPageReader { } } -impl PageReader for CachedPageReader { +impl PageReader for RowGroupCachedReader { fn get_next_page(&mut self) -> Result> { Ok(self.pages.pop_front()) } @@ -55,9 +55,8 @@ impl PageReader for CachedPageReader { } } -impl Iterator for CachedPageReader { +impl Iterator for RowGroupCachedReader { type Item = Result; - fn next(&mut self) -> Option { self.get_next_page().transpose() } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 68a91e55fe..991d56943d 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -23,33 +23,37 @@ use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; +use parquet::file::properties::DEFAULT_PAGE_SIZE; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; use store_api::storage::RegionId; +use tokio::task::yield_now; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::{CacheManagerRef, PageKey, PageValue}; -use crate::metrics::READ_STAGE_ELAPSED; +use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::file::FileId; use crate::sst::parquet::helper::fetch_byte_ranges; -use crate::sst::parquet::page_reader::CachedPageReader; +use crate::sst::parquet::page_reader::RowGroupCachedReader; /// An in-memory collection of column chunks pub struct InMemoryRowGroup<'a> { metadata: &'a RowGroupMetaData, page_locations: Option<&'a [Vec]>, + /// Compressed page of each column. column_chunks: Vec>>, row_count: usize, region_id: RegionId, file_id: FileId, row_group_idx: usize, cache_manager: Option, - /// Cached pages for each column. + /// Row group level cached pages for each column. /// - /// `column_cached_pages.len()` equals to `column_chunks.len()`. - column_cached_pages: Vec>>, + /// These pages are uncompressed pages of a row group. + /// `column_uncompressed_pages.len()` equals to `column_chunks.len()`. + column_uncompressed_pages: Vec>>, file_path: &'a str, /// Object store. object_store: ObjectStore, @@ -86,7 +90,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id, row_group_idx, cache_manager, - column_cached_pages: vec![None; metadata.columns().len()], + column_uncompressed_pages: vec![None; metadata.columns().len()], file_path, object_store, } @@ -161,16 +165,20 @@ impl<'a> InMemoryRowGroup<'a> { // Now we only use cache in dense chunk data. self.fetch_pages_from_cache(projection); + // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache` + // is a synchronous, CPU-bound operation. + yield_now().await; + let fetch_ranges = self .column_chunks .iter() - .zip(&self.column_cached_pages) + .zip(&self.column_uncompressed_pages) .enumerate() - // Don't need to fetch column data if we already cache the column's pages. - .filter(|&(idx, (chunk, cached_pages))| { - chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none() + .filter(|&(idx, (chunk, uncompressed_pages))| { + // Don't need to fetch column data if we already cache the column's pages. + chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none() }) - .map(|(idx, (_chunk, _cached_pages))| { + .map(|(idx, (_chunk, _pages))| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start..(start + length) @@ -184,22 +192,41 @@ impl<'a> InMemoryRowGroup<'a> { let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); - for (idx, (chunk, cached_pages)) in self + for (idx, (chunk, row_group_pages)) in self .column_chunks .iter_mut() - .zip(&self.column_cached_pages) + .zip(&self.column_uncompressed_pages) .enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() { + if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { continue; } - if let Some(data) = chunk_data.next() { - *chunk = Some(Arc::new(ColumnChunkData::Dense { - offset: self.metadata.column(idx).byte_range().0 as usize, - data, - })); + // Get the fetched page. + let Some(data) = chunk_data.next() else { + continue; + }; + + let column = self.metadata.column(idx); + if let Some(cache) = &self.cache_manager { + if !cache_uncompressed_pages(column) { + // For columns that have multiple uncompressed pages, we only cache the compressed page + // to save memory. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + idx, + ); + cache + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + } } + + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: column.byte_range().0 as usize, + data, + })); } } @@ -207,20 +234,42 @@ impl<'a> InMemoryRowGroup<'a> { } /// Fetches pages for columns if cache is enabled. + /// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column. fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { + let _timer = READ_STAGE_FETCH_PAGES.start_timer(); self.column_chunks - .iter() + .iter_mut() .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .for_each(|(idx, _chunk)| { - if let Some(cache) = &self.cache_manager { - let page_key = PageKey { - region_id: self.region_id, - file_id: self.file_id, - row_group_idx: self.row_group_idx, - column_idx: idx, - }; - self.column_cached_pages[idx] = cache.get_pages(&page_key); + .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) + .for_each(|(idx, chunk)| { + let Some(cache) = &self.cache_manager else { + return; + }; + let column = self.metadata.column(idx); + if cache_uncompressed_pages(column) { + // Fetches uncompressed pages for the row group. + let page_key = PageKey::new_uncompressed( + self.region_id, + self.file_id, + self.row_group_idx, + idx, + ); + self.column_uncompressed_pages[idx] = cache.get_pages(&page_key); + } else { + // Fetches the compressed page from the cache. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + idx, + ); + + *chunk = cache.get_pages(&page_key).map(|page_value| { + Arc::new(ColumnChunkData::Dense { + offset: column.byte_range().0 as usize, + data: page_value.compressed.clone(), + }) + }); } }); } @@ -259,12 +308,12 @@ impl<'a> InMemoryRowGroup<'a> { /// Creates a page reader to read column at `i`. fn column_page_reader(&self, i: usize) -> Result> { - if let Some(cached_pages) = &self.column_cached_pages[i] { - // Already in cache. - return Ok(Box::new(CachedPageReader::new(&cached_pages.pages))); + if let Some(cached_pages) = &self.column_uncompressed_pages[i] { + debug_assert!(!cached_pages.row_group.is_empty()); + // Hits the row group level page cache. + return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group))); } - // Cache miss. let page_reader = match &self.column_chunks[i] { None => { return Err(ParquetError::General(format!( @@ -283,25 +332,34 @@ impl<'a> InMemoryRowGroup<'a> { }; let Some(cache) = &self.cache_manager else { - // Cache is disabled. return Ok(Box::new(page_reader)); }; - // We collect all pages and put them into the cache. - let pages = page_reader.collect::>>()?; - let page_value = Arc::new(PageValue::new(pages)); - let page_key = PageKey { - region_id: self.region_id, - file_id: self.file_id, - row_group_idx: self.row_group_idx, - column_idx: i, - }; - cache.put_pages(page_key, page_value.clone()); + let column = self.metadata.column(i); + if cache_uncompressed_pages(column) { + // This column use row group level page cache. + // We collect all pages and put them into the cache. + let pages = page_reader.collect::>>()?; + let page_value = Arc::new(PageValue::new_row_group(pages)); + let page_key = + PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); + cache.put_pages(page_key, page_value.clone()); - Ok(Box::new(CachedPageReader::new(&page_value.pages))) + return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); + } + + // This column don't cache uncompressed pages. + Ok(Box::new(page_reader)) } } +/// Returns whether we cache uncompressed pages for the column. +fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool { + // If the row group only has a data page, cache the whole row group as + // it might be faster than caching a compressed page. + column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE +} + impl<'a> RowGroups for InMemoryRowGroup<'a> { fn num_rows(&self) -> usize { self.row_count @@ -318,7 +376,7 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { /// An in-memory column chunk #[derive(Clone)] -enum ColumnChunkData { +pub(crate) enum ColumnChunkData { /// Column chunk data representing only a subset of data pages Sparse { /// Length of the full column chunk diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 2f3955c163..76aac4fa13 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -36,7 +36,7 @@ explain analyze SELECT count(*) FROM system_metrics; |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 1_| +-+-+-+ diff --git a/tests/cases/standalone/common/aggregate/multi_regions.result b/tests/cases/standalone/common/aggregate/multi_regions.result index 66dcf01f40..a7c6907eaf 100644 --- a/tests/cases/standalone/common/aggregate/multi_regions.result +++ b/tests/cases/standalone/common/aggregate/multi_regions.result @@ -34,7 +34,7 @@ select sum(val) from t group by host; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| | 1_| 1_|_ProjectionExec: expr=[SUM(t.val)@1 as SUM(t.val)] REDACTED |_|_|_AggregateExec: mode=FinalPartitioned, gby=[host@0 as host], aggr=[SUM(t.val)] REDACTED @@ -43,7 +43,7 @@ select sum(val) from t group by host; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 0_| +-+-+-+ @@ -66,9 +66,9 @@ select sum(val) from t; |_|_|_ProjectionExec: expr=[val@1 as val] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| -| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 1_| +-+-+-+ @@ -95,9 +95,9 @@ select sum(val) from t group by idc; |_|_|_ProjectionExec: expr=[val@1 as val, idc@3 as idc] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| -| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED +| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 0_| +-+-+-+ diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 952b9fd8b4..236c0f297a 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -77,7 +77,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; |_|_|_CoalescePartitionsExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 10_| +-+-+-+ diff --git a/tests/cases/standalone/common/select/prune.result b/tests/cases/standalone/common/select/prune.result index f2718926c9..13ddee5510 100644 --- a/tests/cases/standalone/common/select/prune.result +++ b/tests/cases/standalone/common/select/prune.result @@ -89,7 +89,7 @@ explain analyze select * from demo where idc='idc1'; +-+-+-+ | 0_| 0_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 2_| +-+-+-+ diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index fe8af2e753..5a08da5517 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -32,7 +32,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -63,7 +63,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -93,7 +93,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -125,7 +125,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ diff --git a/tests/cases/standalone/optimizer/last_value.result b/tests/cases/standalone/optimizer/last_value.result index ab3f12bce1..790a6a4748 100644 --- a/tests/cases/standalone/optimizer/last_value.result +++ b/tests/cases/standalone/optimizer/last_value.result @@ -48,7 +48,7 @@ explain analyze |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[last_value(t.host) ORDER BY [t.ts ASC NULLS LAST], last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST], last_value(t.val) ORDER BY [t.ts ASC NULLS LAST]] REDACTED |_|_|_RepartitionExec: REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges), selector=LastRow REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), selector=LastRow REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+