mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
feat: New scanner SeriesScan to scan by series for querying metrics (#5968)
* chore: basic methods for SeriesScan
* chore: add to scanner enum
* feat: implement scan logic of each partition
* feat: use series scan when distribution is PerSeries
* refactor: remove per series scan from SeqScan
* fix: use series scan in PerSeries distribution
* feat: keep parallelize_scan unchanged
* fix: address compiler errors
* fix: include build merge reader cost to scan cost
* feat: use smallvec
* chore: update comment
* Revert "feat: keep parallelize_scan unchanged"
This reverts commit 96ba00d175.
* assign partition_ranges
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
* feat: try send before send
reduce the send timeout to 10ms
* chore: add comments
* fix: add metrics to partition metrics list
* fix: correct scan cost metrics
* chore: reset instant
* fix: scanner metrics init
* chore: display more info in explain
* feat: metrics for send series timeout
* style: fix clippy
* refactor: use ChainedRecordBatchStream to simplify codes
* chore: fix typos
* feat: separate distributor metrics
* feat: remove parallelize hack
* chore: fix warning
* test: add test for series scan
* test: update sqlness test
---------
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
39
Cargo.lock
generated
39
Cargo.lock
generated
@@ -173,9 +173,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.98"
|
||||
version = "1.0.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
||||
|
||||
[[package]]
|
||||
name = "anymap2"
|
||||
@@ -1571,7 +1571,7 @@ dependencies = [
|
||||
"partition",
|
||||
"paste",
|
||||
"prometheus",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
@@ -1593,9 +1593,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.20"
|
||||
version = "1.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04da6a0d40b948dfc4fa8f5bbf402b0fc1a64a28dbf7d12ffd683550f2c1b63a"
|
||||
checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
"libc",
|
||||
@@ -2884,9 +2884,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.15"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
|
||||
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
@@ -2943,9 +2943,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.3.1"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
|
||||
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
|
||||
dependencies = [
|
||||
"csv-core",
|
||||
"itoa",
|
||||
@@ -6416,9 +6416,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.27"
|
||||
version = "0.4.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
|
||||
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
@@ -9357,7 +9357,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"rustls",
|
||||
"socket2",
|
||||
"thiserror 1.0.64",
|
||||
@@ -9374,7 +9374,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"rand 0.8.5",
|
||||
"ring",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash 2.0.0",
|
||||
"rustls",
|
||||
"slab",
|
||||
"thiserror 1.0.64",
|
||||
@@ -9624,9 +9624,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.11.1"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -9808,14 +9808,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.14"
|
||||
version = "0.17.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
|
||||
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"getrandom 0.2.15",
|
||||
"libc",
|
||||
"spin",
|
||||
"untrusted",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
@@ -10088,9 +10089,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
|
||||
checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
|
||||
use api::v1::Rows;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use futures::TryStreamExt;
|
||||
use store_api::region_engine::{PrepareRequest, RegionEngine, RegionScanner};
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::read::scan_region::Scanner;
|
||||
use crate::test_util;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
@@ -147,3 +150,142 @@ async fn test_scan_with_min_sst_sequence() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_series_scan() {
|
||||
let mut env = TestEnv::with_prefix("test_series_scan");
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.time_window", "1h")
|
||||
.build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let put_flush_rows = async |start, end| {
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(start, end),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
};
|
||||
// generates 3 SST files
|
||||
put_flush_rows(0, 3).await;
|
||||
put_flush_rows(2, 6).await;
|
||||
put_flush_rows(3600, 3603).await;
|
||||
// Put to memtable.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(7200, 7203),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let request = ScanRequest {
|
||||
distribution: Some(TimeSeriesDistribution::PerSeries),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = engine.scanner(region_id, request).unwrap();
|
||||
let Scanner::Series(mut scanner) = scanner else {
|
||||
panic!("Scanner should be series scan");
|
||||
};
|
||||
// 3 partition ranges for 3 time window.
|
||||
assert_eq!(
|
||||
3,
|
||||
scanner.properties().partitions[0].len(),
|
||||
"unexpected ranges: {:?}",
|
||||
scanner.properties().partitions
|
||||
);
|
||||
let raw_ranges: Vec<_> = scanner
|
||||
.properties()
|
||||
.partitions
|
||||
.iter()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect();
|
||||
let mut new_ranges = Vec::with_capacity(3);
|
||||
for range in raw_ranges {
|
||||
new_ranges.push(vec![range]);
|
||||
}
|
||||
scanner
|
||||
.prepare(PrepareRequest {
|
||||
ranges: Some(new_ranges),
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let metrics_set = ExecutionPlanMetricsSet::default();
|
||||
|
||||
let mut partition_batches = vec![vec![]; 3];
|
||||
let mut streams: Vec<_> = (0..3)
|
||||
.map(|partition| {
|
||||
let stream = scanner.scan_partition(&metrics_set, partition).unwrap();
|
||||
Some(stream)
|
||||
})
|
||||
.collect();
|
||||
let mut num_done = 0;
|
||||
let mut schema = None;
|
||||
// Pull streams in round-robin fashion to get the consistent output from the sender.
|
||||
while num_done < 3 {
|
||||
if schema.is_none() {
|
||||
schema = Some(streams[0].as_ref().unwrap().schema().clone());
|
||||
}
|
||||
for i in 0..3 {
|
||||
let Some(mut stream) = streams[i].take() else {
|
||||
continue;
|
||||
};
|
||||
let Some(rb) = stream.try_next().await.unwrap() else {
|
||||
num_done += 1;
|
||||
continue;
|
||||
};
|
||||
partition_batches[i].push(rb);
|
||||
streams[i] = Some(stream);
|
||||
}
|
||||
}
|
||||
|
||||
let mut check_result = |expected| {
|
||||
let batches =
|
||||
RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap();
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
};
|
||||
|
||||
// Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 0 | 0.0 | 1970-01-01T00:00:00 |
|
||||
| 3 | 3.0 | 1970-01-01T00:00:03 |
|
||||
| 3602 | 3602.0 | 1970-01-01T01:00:02 |
|
||||
| 7200 | 7200.0 | 1970-01-01T02:00:00 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 3600 | 3600.0 | 1970-01-01T01:00:00 |
|
||||
| 4 | 4.0 | 1970-01-01T00:00:04 |
|
||||
| 7201 | 7201.0 | 1970-01-01T02:00:01 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 2 | 2.0 | 1970-01-01T00:00:02 |
|
||||
| 3601 | 3601.0 | 1970-01-01T01:00:01 |
|
||||
| 5 | 5.0 | 1970-01-01T00:00:05 |
|
||||
| 7202 | 7202.0 | 1970-01-01T02:00:02 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
}
|
||||
|
||||
@@ -712,8 +712,8 @@ pub enum Error {
|
||||
error: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to filter record batch"))]
|
||||
FilterRecordBatch {
|
||||
#[snafu(display("Record batch error"))]
|
||||
RecordBatch {
|
||||
source: common_recordbatch::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
@@ -1023,6 +1023,20 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to scan series"))]
|
||||
ScanSeries {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: Arc<Error>,
|
||||
},
|
||||
|
||||
#[snafu(display("Partition {} scan multiple times", partition))]
|
||||
ScanMultiTimes {
|
||||
partition: usize,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1145,7 +1159,7 @@ impl ErrorExt for Error {
|
||||
|
||||
External { source, .. } => source.status_code(),
|
||||
|
||||
FilterRecordBatch { source, .. } => source.status_code(),
|
||||
RecordBatch { source, .. } => source.status_code(),
|
||||
|
||||
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||
@@ -1174,6 +1188,10 @@ impl ErrorExt for Error {
|
||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||
|
||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ScanSeries { source, .. } => source.status_code(),
|
||||
|
||||
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ pub(crate) mod range;
|
||||
pub(crate) mod scan_region;
|
||||
pub(crate) mod scan_util;
|
||||
pub(crate) mod seq_scan;
|
||||
pub(crate) mod series_scan;
|
||||
pub(crate) mod unordered_scan;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
@@ -21,7 +21,7 @@ use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{FilterRecordBatchSnafu, Result};
|
||||
use crate::error::{RecordBatchSnafu, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
@@ -201,7 +201,7 @@ impl PruneTimeIterator {
|
||||
for filter in filters.iter() {
|
||||
let result = filter
|
||||
.evaluate_vector(batch.timestamps())
|
||||
.context(FilterRecordBatchSnafu)?;
|
||||
.context(RecordBatchSnafu)?;
|
||||
mask = mask.bitand(&result);
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ use crate::read::compat::{self, CompatBatch};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::series_scan::SeriesScan;
|
||||
use crate::read::unordered_scan::UnorderedScan;
|
||||
use crate::read::{Batch, Source};
|
||||
use crate::region::options::MergeMode;
|
||||
@@ -67,6 +68,8 @@ pub(crate) enum Scanner {
|
||||
Seq(SeqScan),
|
||||
/// Unordered scan.
|
||||
Unordered(UnorderedScan),
|
||||
/// Per-series scan.
|
||||
Series(SeriesScan),
|
||||
}
|
||||
|
||||
impl Scanner {
|
||||
@@ -76,6 +79,7 @@ impl Scanner {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
|
||||
Scanner::Series(series_scan) => series_scan.build_stream().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -87,6 +91,7 @@ impl Scanner {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
|
||||
Scanner::Series(series_scan) => series_scan.input().num_files(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,6 +100,7 @@ impl Scanner {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
|
||||
Scanner::Series(series_scan) => series_scan.input().num_memtables(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +109,7 @@ impl Scanner {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
|
||||
Scanner::Series(series_scan) => series_scan.input().file_ids(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +121,7 @@ impl Scanner {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
|
||||
Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,7 +257,9 @@ impl ScanRegion {
|
||||
|
||||
/// Returns a [Scanner] to scan the region.
|
||||
pub(crate) fn scanner(self) -> Result<Scanner> {
|
||||
if self.use_unordered_scan() {
|
||||
if self.use_series_scan() {
|
||||
self.series_scan().map(Scanner::Series)
|
||||
} else if self.use_unordered_scan() {
|
||||
// If table is append only and there is no series row selector, we use unordered scan in query.
|
||||
// We still use seq scan in compaction.
|
||||
self.unordered_scan().map(Scanner::Unordered)
|
||||
@@ -261,7 +271,9 @@ impl ScanRegion {
|
||||
/// Returns a [RegionScanner] to scan the region.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
|
||||
if self.use_unordered_scan() {
|
||||
if self.use_series_scan() {
|
||||
self.series_scan().map(|scanner| Box::new(scanner) as _)
|
||||
} else if self.use_unordered_scan() {
|
||||
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
|
||||
} else {
|
||||
self.seq_scan().map(|scanner| Box::new(scanner) as _)
|
||||
@@ -280,6 +292,12 @@ impl ScanRegion {
|
||||
Ok(UnorderedScan::new(input))
|
||||
}
|
||||
|
||||
/// Scans by series.
|
||||
pub(crate) fn series_scan(self) -> Result<SeriesScan> {
|
||||
let input = self.scan_input(true)?;
|
||||
Ok(SeriesScan::new(input))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
|
||||
let input = self.scan_input(false)?;
|
||||
@@ -300,6 +318,11 @@ impl ScanRegion {
|
||||
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
||||
}
|
||||
|
||||
/// Returns true if the region can use series scan for current request.
|
||||
fn use_series_scan(&self) -> bool {
|
||||
self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
|
||||
}
|
||||
|
||||
/// Creates a scan input.
|
||||
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
||||
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
|
||||
|
||||
@@ -92,6 +92,17 @@ struct ScanMetricsSet {
|
||||
|
||||
/// Elapsed time before the first poll operation.
|
||||
first_poll: Duration,
|
||||
|
||||
/// Number of send timeout in SeriesScan.
|
||||
num_series_send_timeout: usize,
|
||||
/// Number of rows the series distributor scanned.
|
||||
num_distributor_rows: usize,
|
||||
/// Number of batches the series distributor scanned.
|
||||
num_distributor_batches: usize,
|
||||
/// Duration of the series distributor to scan.
|
||||
distributor_scan_cost: Duration,
|
||||
/// Duration of the series distributor to yield.
|
||||
distributor_yield_cost: Duration,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScanMetricsSet {
|
||||
@@ -122,6 +133,11 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
num_sst_batches,
|
||||
num_sst_rows,
|
||||
first_poll,
|
||||
num_series_send_timeout,
|
||||
num_distributor_rows,
|
||||
num_distributor_batches,
|
||||
distributor_scan_cost,
|
||||
distributor_yield_cost,
|
||||
} = self;
|
||||
|
||||
write!(
|
||||
@@ -150,7 +166,12 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
num_sst_record_batches={num_sst_record_batches}, \
|
||||
num_sst_batches={num_sst_batches}, \
|
||||
num_sst_rows={num_sst_rows}, \
|
||||
first_poll={first_poll:?}}}"
|
||||
first_poll={first_poll:?}, \
|
||||
num_series_send_timeout={num_series_send_timeout}, \
|
||||
num_distributor_rows={num_distributor_rows}, \
|
||||
num_distributor_batches={num_distributor_batches}, \
|
||||
distributor_scan_cost={distributor_scan_cost:?}, \
|
||||
distributor_yield_cost={distributor_yield_cost:?}}},"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -228,6 +249,23 @@ impl ScanMetricsSet {
|
||||
self.num_sst_rows += *num_rows;
|
||||
}
|
||||
|
||||
/// Sets distributor metrics.
|
||||
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
|
||||
let SeriesDistributorMetrics {
|
||||
num_series_send_timeout,
|
||||
num_rows,
|
||||
num_batches,
|
||||
scan_cost,
|
||||
yield_cost,
|
||||
} = distributor_metrics;
|
||||
|
||||
self.num_series_send_timeout += *num_series_send_timeout;
|
||||
self.num_distributor_rows += *num_rows;
|
||||
self.num_distributor_batches += *num_batches;
|
||||
self.distributor_scan_cost += *scan_cost;
|
||||
self.distributor_yield_cost += *yield_cost;
|
||||
}
|
||||
|
||||
/// Observes metrics.
|
||||
fn observe_metrics(&self) {
|
||||
READ_STAGE_ELAPSED
|
||||
@@ -439,6 +477,12 @@ impl PartitionMetrics {
|
||||
pub(crate) fn on_finish(&self) {
|
||||
self.0.on_finish();
|
||||
}
|
||||
|
||||
/// Sets the distributor metrics.
|
||||
pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
|
||||
let mut metrics_set = self.0.metrics.lock().unwrap();
|
||||
metrics_set.set_distributor_metrics(metrics);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for PartitionMetrics {
|
||||
@@ -448,6 +492,21 @@ impl fmt::Debug for PartitionMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for the series distributor.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SeriesDistributorMetrics {
|
||||
/// Number of send timeout in SeriesScan.
|
||||
pub(crate) num_series_send_timeout: usize,
|
||||
/// Number of rows the series distributor scanned.
|
||||
pub(crate) num_rows: usize,
|
||||
/// Number of batches the series distributor scanned.
|
||||
pub(crate) num_batches: usize,
|
||||
/// Duration of the series distributor to scan.
|
||||
pub(crate) scan_cost: Duration,
|
||||
/// Duration of the series distributor to yield.
|
||||
pub(crate) yield_cost: Duration,
|
||||
}
|
||||
|
||||
/// Scans memtable ranges at `index`.
|
||||
pub(crate) fn scan_mem_ranges(
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
|
||||
@@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
@@ -149,7 +149,7 @@ impl SeqScan {
|
||||
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
||||
/// if possible.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn build_reader_from_sources(
|
||||
pub(crate) async fn build_reader_from_sources(
|
||||
stream_ctx: &StreamContext,
|
||||
mut sources: Vec<Source>,
|
||||
semaphore: Option<Arc<Semaphore>>,
|
||||
@@ -215,10 +215,6 @@ impl SeqScan {
|
||||
)));
|
||||
}
|
||||
|
||||
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
||||
return self.scan_partition_by_series(metrics_set, partition);
|
||||
}
|
||||
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
let semaphore = self.new_semaphore();
|
||||
let partition_ranges = self.properties.partitions[partition].clone();
|
||||
@@ -245,14 +241,14 @@ impl SeqScan {
|
||||
&mut sources,
|
||||
);
|
||||
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
let mut reader =
|
||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
#[cfg(debug_assertions)]
|
||||
let mut checker = crate::read::BatchChecker::default()
|
||||
.with_start(Some(part_range.start))
|
||||
@@ -315,97 +311,6 @@ impl SeqScan {
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
/// Scans all ranges in the given partition and merge by time series.
|
||||
/// Otherwise the returned stream might not contains any data.
|
||||
fn scan_partition_by_series(
|
||||
&self,
|
||||
metrics_set: &ExecutionPlanMetricsSet,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
let semaphore = self.new_semaphore();
|
||||
let partition_ranges = self.properties.partitions[partition].clone();
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = self.new_partition_metrics(metrics_set, partition);
|
||||
debug_assert!(!self.compaction);
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
|
||||
let range_builder_list = Arc::new(RangeBuilderList::new(
|
||||
stream_ctx.input.num_memtables(),
|
||||
stream_ctx.input.num_files(),
|
||||
));
|
||||
// Scans all parts.
|
||||
let mut sources = Vec::with_capacity(partition_ranges.len());
|
||||
for part_range in partition_ranges {
|
||||
build_sources(
|
||||
&stream_ctx,
|
||||
&part_range,
|
||||
false,
|
||||
&part_metrics,
|
||||
range_builder_list.clone(),
|
||||
&mut sources,
|
||||
);
|
||||
}
|
||||
|
||||
// Builds a reader that merge sources from all parts.
|
||||
let mut reader =
|
||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
|
||||
while let Some(batch) = reader
|
||||
.next_batch()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
{
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
metrics.num_batches += 1;
|
||||
metrics.num_rows += batch.num_rows();
|
||||
|
||||
debug_assert!(!batch.is_empty());
|
||||
if batch.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let convert_start = Instant::now();
|
||||
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
||||
metrics.convert_cost += convert_start.elapsed();
|
||||
let yield_start = Instant::now();
|
||||
yield record_batch;
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
|
||||
fetch_start = Instant::now();
|
||||
}
|
||||
|
||||
// Yields an empty part to indicate this range is terminated.
|
||||
// The query engine can use this to optimize some queries.
|
||||
if distinguish_range {
|
||||
let yield_start = Instant::now();
|
||||
yield stream_ctx.input.mapper.empty_record_batch();
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
}
|
||||
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
part_metrics.merge_metrics(&metrics);
|
||||
|
||||
part_metrics.on_finish();
|
||||
};
|
||||
|
||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||
self.stream_ctx.input.mapper.output_schema(),
|
||||
Box::pin(stream),
|
||||
));
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
||||
if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
||||
@@ -506,7 +411,7 @@ impl fmt::Debug for SeqScan {
|
||||
}
|
||||
|
||||
/// Builds sources for the partition range and push them to the `sources` vector.
|
||||
fn build_sources(
|
||||
pub(crate) fn build_sources(
|
||||
stream_ctx: &Arc<StreamContext>,
|
||||
part_range: &PartitionRange,
|
||||
compaction: bool,
|
||||
@@ -517,8 +422,8 @@ fn build_sources(
|
||||
// Gets range meta.
|
||||
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
||||
#[cfg(debug_assertions)]
|
||||
if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
||||
// Compaction or per series distribution expects input sources are not been split.
|
||||
if compaction {
|
||||
// Compaction expects input sources are not been split.
|
||||
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
||||
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
||||
// It should scan all row groups.
|
||||
|
||||
547
src/mito2/src/read/series_scan.rs
Normal file
547
src/mito2/src/read/series_scan.rs
Normal file
@@ -0,0 +1,547 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Per-series scan implementation.
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::util::ChainedRecordBatchStream;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::compute::concat_batches;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
|
||||
ScanMultiTimesSnafu, ScanSeriesSnafu,
|
||||
};
|
||||
use crate::read::range::RangeBuilderList;
|
||||
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
|
||||
use crate::read::seq_scan::{build_sources, SeqScan};
|
||||
use crate::read::{Batch, ScannerMetrics};
|
||||
|
||||
/// Timeout to send a batch to a sender.
|
||||
const SEND_TIMEOUT: Duration = Duration::from_millis(10);
|
||||
|
||||
/// List of receivers.
|
||||
type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
|
||||
|
||||
/// Scans a region and returns sorted rows of a series in the same partition.
|
||||
///
|
||||
/// The output order is always order by `(primary key, time index)` inside every
|
||||
/// partition.
|
||||
/// Always returns the same series (primary key) to the same partition.
|
||||
pub struct SeriesScan {
|
||||
/// Properties of the scanner.
|
||||
properties: ScannerProperties,
|
||||
/// Context of streams.
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
/// Receivers of each partition.
|
||||
receivers: Mutex<ReceiverList>,
|
||||
/// Metrics for each partition.
|
||||
/// The scanner only sets in query and keeps it empty during compaction.
|
||||
metrics_list: Arc<PartitionMetricsList>,
|
||||
}
|
||||
|
||||
impl SeriesScan {
|
||||
/// Creates a new [SeriesScan].
|
||||
pub(crate) fn new(input: ScanInput) -> Self {
|
||||
let mut properties = ScannerProperties::default()
|
||||
.with_append_mode(input.append_mode)
|
||||
.with_total_rows(input.total_rows());
|
||||
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
|
||||
properties.partitions = vec![stream_ctx.partition_ranges()];
|
||||
|
||||
Self {
|
||||
properties,
|
||||
stream_ctx,
|
||||
receivers: Mutex::new(Vec::new()),
|
||||
metrics_list: Arc::new(PartitionMetricsList::default()),
|
||||
}
|
||||
}
|
||||
|
||||
fn scan_partition_impl(
|
||||
&self,
|
||||
metrics_set: &ExecutionPlanMetricsSet,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
if partition >= self.properties.num_partitions() {
|
||||
return Err(BoxedError::new(
|
||||
PartitionOutOfRangeSnafu {
|
||||
given: partition,
|
||||
all: self.properties.num_partitions(),
|
||||
}
|
||||
.build(),
|
||||
));
|
||||
}
|
||||
|
||||
self.maybe_start_distributor(metrics_set, &self.metrics_list);
|
||||
|
||||
let part_metrics =
|
||||
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
|
||||
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
|
||||
let stream_ctx = self.stream_ctx.clone();
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut df_record_batches = Vec::new();
|
||||
let mut fetch_start = Instant::now();
|
||||
while let Some(result) = receiver.recv().await {
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
fetch_start = Instant::now();
|
||||
|
||||
let convert_start = Instant::now();
|
||||
df_record_batches.reserve(series.batches.len());
|
||||
for batch in series.batches {
|
||||
metrics.num_batches += 1;
|
||||
metrics.num_rows += batch.num_rows();
|
||||
|
||||
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
||||
df_record_batches.push(record_batch.into_df_record_batch());
|
||||
}
|
||||
|
||||
let output_schema = stream_ctx.input.mapper.output_schema();
|
||||
let df_record_batch =
|
||||
concat_batches(output_schema.arrow_schema(), &df_record_batches)
|
||||
.context(ComputeArrowSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
df_record_batches.clear();
|
||||
let record_batch =
|
||||
RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
|
||||
metrics.convert_cost += convert_start.elapsed();
|
||||
|
||||
let yield_start = Instant::now();
|
||||
yield record_batch;
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
|
||||
part_metrics.merge_metrics(&metrics);
|
||||
}
|
||||
};
|
||||
|
||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||
self.stream_ctx.input.mapper.output_schema(),
|
||||
Box::pin(stream),
|
||||
));
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
/// Takes the receiver for the partition.
|
||||
fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
|
||||
let mut rx_list = self.receivers.lock().unwrap();
|
||||
rx_list[partition]
|
||||
.take()
|
||||
.context(ScanMultiTimesSnafu { partition })
|
||||
}
|
||||
|
||||
/// Starts the distributor if the receiver list is empty.
|
||||
fn maybe_start_distributor(
|
||||
&self,
|
||||
metrics_set: &ExecutionPlanMetricsSet,
|
||||
metrics_list: &Arc<PartitionMetricsList>,
|
||||
) {
|
||||
let mut rx_list = self.receivers.lock().unwrap();
|
||||
if !rx_list.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let (senders, receivers) = new_channel_list(self.properties.num_partitions());
|
||||
let mut distributor = SeriesDistributor {
|
||||
stream_ctx: self.stream_ctx.clone(),
|
||||
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
|
||||
partitions: self.properties.partitions.clone(),
|
||||
senders,
|
||||
metrics_set: metrics_set.clone(),
|
||||
metrics_list: metrics_list.clone(),
|
||||
};
|
||||
common_runtime::spawn_global(async move {
|
||||
distributor.execute().await;
|
||||
});
|
||||
|
||||
*rx_list = receivers;
|
||||
}
|
||||
|
||||
/// Scans the region and returns a stream.
|
||||
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
let part_num = self.properties.num_partitions();
|
||||
let metrics_set = ExecutionPlanMetricsSet::default();
|
||||
let streams = (0..part_num)
|
||||
.map(|i| self.scan_partition(&metrics_set, i))
|
||||
.collect::<Result<Vec<_>, BoxedError>>()?;
|
||||
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
|
||||
Ok(Box::pin(chained_stream))
|
||||
}
|
||||
}
|
||||
|
||||
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
|
||||
let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
|
||||
.map(|_| {
|
||||
let (sender, receiver) = mpsc::channel(1);
|
||||
(Some(sender), Some(receiver))
|
||||
})
|
||||
.unzip();
|
||||
(SenderList::new(senders), receivers)
|
||||
}
|
||||
|
||||
impl RegionScanner for SeriesScan {
|
||||
fn properties(&self) -> &ScannerProperties {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.stream_ctx.input.mapper.output_schema()
|
||||
}
|
||||
|
||||
fn metadata(&self) -> RegionMetadataRef {
|
||||
self.stream_ctx.input.mapper.metadata().clone()
|
||||
}
|
||||
|
||||
fn scan_partition(
|
||||
&self,
|
||||
metrics_set: &ExecutionPlanMetricsSet,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
self.scan_partition_impl(metrics_set, partition)
|
||||
}
|
||||
|
||||
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||
self.properties.prepare(request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_predicate(&self) -> bool {
|
||||
let predicate = self.stream_ctx.input.predicate();
|
||||
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn set_logical_region(&mut self, logical_region: bool) {
|
||||
self.properties.set_logical_region(logical_region);
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SeriesScan {
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"SeriesScan: region={}, ",
|
||||
self.stream_ctx.input.mapper.metadata().region_id
|
||||
)?;
|
||||
match t {
|
||||
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
|
||||
DisplayFormatType::Verbose => {
|
||||
self.stream_ctx.format_for_explain(true, f)?;
|
||||
self.metrics_list.format_verbose_metrics(f)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SeriesScan {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("SeriesScan")
|
||||
.field("num_ranges", &self.stream_ctx.ranges.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl SeriesScan {
|
||||
/// Returns the input.
|
||||
pub(crate) fn input(&self) -> &ScanInput {
|
||||
&self.stream_ctx.input
|
||||
}
|
||||
}
|
||||
|
||||
/// The distributor scans series and distributes them to different partitions.
|
||||
struct SeriesDistributor {
|
||||
/// Context for the scan stream.
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
/// Optional semaphore for limiting the number of concurrent scans.
|
||||
semaphore: Option<Arc<Semaphore>>,
|
||||
/// Partition ranges to scan.
|
||||
partitions: Vec<Vec<PartitionRange>>,
|
||||
/// Senders of all partitions.
|
||||
senders: SenderList,
|
||||
/// Metrics set to report.
|
||||
/// The distributor report the metrics as an additional partition.
|
||||
/// This may double the scan cost of the [SeriesScan] metrics. We can
|
||||
/// get per-partition metrics in verbose mode to see the metrics of the
|
||||
/// distributor.
|
||||
metrics_set: ExecutionPlanMetricsSet,
|
||||
metrics_list: Arc<PartitionMetricsList>,
|
||||
}
|
||||
|
||||
impl SeriesDistributor {
|
||||
/// Executes the distributor.
|
||||
async fn execute(&mut self) {
|
||||
if let Err(e) = self.scan_partitions().await {
|
||||
self.senders.send_error(e).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Scans all parts.
|
||||
async fn scan_partitions(&mut self) -> Result<()> {
|
||||
let part_metrics = new_partition_metrics(
|
||||
&self.stream_ctx,
|
||||
&self.metrics_set,
|
||||
self.partitions.len(),
|
||||
&self.metrics_list,
|
||||
);
|
||||
part_metrics.on_first_poll();
|
||||
|
||||
let range_builder_list = Arc::new(RangeBuilderList::new(
|
||||
self.stream_ctx.input.num_memtables(),
|
||||
self.stream_ctx.input.num_files(),
|
||||
));
|
||||
// Scans all parts.
|
||||
let mut sources = Vec::with_capacity(self.partitions.len());
|
||||
for partition in &self.partitions {
|
||||
sources.reserve(partition.len());
|
||||
for part_range in partition {
|
||||
build_sources(
|
||||
&self.stream_ctx,
|
||||
part_range,
|
||||
false,
|
||||
&part_metrics,
|
||||
range_builder_list.clone(),
|
||||
&mut sources,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Builds a reader that merge sources from all parts.
|
||||
let mut reader =
|
||||
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
|
||||
.await?;
|
||||
let mut metrics = SeriesDistributorMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
|
||||
let mut current_series = SeriesBatch::default();
|
||||
while let Some(batch) = reader.next_batch().await? {
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
fetch_start = Instant::now();
|
||||
metrics.num_batches += 1;
|
||||
metrics.num_rows += batch.num_rows();
|
||||
|
||||
debug_assert!(!batch.is_empty());
|
||||
if batch.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(last_key) = current_series.current_key() else {
|
||||
current_series.push(batch);
|
||||
continue;
|
||||
};
|
||||
|
||||
if last_key == batch.primary_key() {
|
||||
current_series.push(batch);
|
||||
continue;
|
||||
}
|
||||
|
||||
// We find a new series, send the current one.
|
||||
let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
|
||||
let yield_start = Instant::now();
|
||||
self.senders.send_batch(to_send).await?;
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
}
|
||||
|
||||
if !current_series.is_empty() {
|
||||
let yield_start = Instant::now();
|
||||
self.senders.send_batch(current_series).await?;
|
||||
metrics.yield_cost += yield_start.elapsed();
|
||||
}
|
||||
|
||||
metrics.scan_cost += fetch_start.elapsed();
|
||||
metrics.num_series_send_timeout = self.senders.num_timeout;
|
||||
part_metrics.set_distributor_metrics(&metrics);
|
||||
|
||||
part_metrics.on_finish();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Batches of the same series.
|
||||
#[derive(Default)]
|
||||
struct SeriesBatch {
|
||||
batches: SmallVec<[Batch; 4]>,
|
||||
}
|
||||
|
||||
impl SeriesBatch {
|
||||
/// Creates a new [SeriesBatch] from a single [Batch].
|
||||
fn single(batch: Batch) -> Self {
|
||||
Self {
|
||||
batches: smallvec![batch],
|
||||
}
|
||||
}
|
||||
|
||||
fn current_key(&self) -> Option<&[u8]> {
|
||||
self.batches.first().map(|batch| batch.primary_key())
|
||||
}
|
||||
|
||||
fn push(&mut self, batch: Batch) {
|
||||
self.batches.push(batch);
|
||||
}
|
||||
|
||||
/// Returns true if there is no batch.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.batches.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// List of senders.
|
||||
struct SenderList {
|
||||
senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
|
||||
/// Number of None senders.
|
||||
num_nones: usize,
|
||||
/// Index of the current partition to send.
|
||||
sender_idx: usize,
|
||||
/// Number of timeout.
|
||||
num_timeout: usize,
|
||||
}
|
||||
|
||||
impl SenderList {
|
||||
fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
|
||||
let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
|
||||
Self {
|
||||
senders,
|
||||
num_nones,
|
||||
sender_idx: 0,
|
||||
num_timeout: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds a partition and tries to send the batch to the partition.
|
||||
/// Returns None if it sends successfully.
|
||||
fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
|
||||
for _ in 0..self.senders.len() {
|
||||
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
|
||||
|
||||
let sender_idx = self.fetch_add_sender_idx();
|
||||
let Some(sender) = &self.senders[sender_idx] else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match sender.try_send(Ok(batch)) {
|
||||
Ok(()) => return Ok(None),
|
||||
Err(TrySendError::Full(res)) => {
|
||||
// Safety: we send Ok.
|
||||
batch = res.unwrap();
|
||||
}
|
||||
Err(TrySendError::Closed(res)) => {
|
||||
self.senders[sender_idx] = None;
|
||||
self.num_nones += 1;
|
||||
// Safety: we send Ok.
|
||||
batch = res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(batch))
|
||||
}
|
||||
|
||||
/// Finds a partition and sends the batch to the partition.
|
||||
async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
|
||||
// Sends the batch without blocking first.
|
||||
match self.try_send_batch(batch)? {
|
||||
Some(b) => {
|
||||
// Unable to send batch to partition.
|
||||
batch = b;
|
||||
}
|
||||
None => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
|
||||
|
||||
let sender_idx = self.fetch_add_sender_idx();
|
||||
let Some(sender) = &self.senders[sender_idx] else {
|
||||
continue;
|
||||
};
|
||||
// Adds a timeout to avoid blocking indefinitely and sending
|
||||
// the batch in a round-robin fashion when some partitions
|
||||
// don't poll their inputs. This may happen if we have a
|
||||
// node like sort merging. But it is rare when we are using SeriesScan.
|
||||
match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
|
||||
Ok(()) => break,
|
||||
Err(SendTimeoutError::Timeout(res)) => {
|
||||
self.num_timeout += 1;
|
||||
// Safety: we send Ok.
|
||||
batch = res.unwrap();
|
||||
}
|
||||
Err(SendTimeoutError::Closed(res)) => {
|
||||
self.senders[sender_idx] = None;
|
||||
self.num_nones += 1;
|
||||
// Safety: we send Ok.
|
||||
batch = res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_error(&self, error: Error) {
|
||||
let error = Arc::new(error);
|
||||
for sender in self.senders.iter().flatten() {
|
||||
let result = Err(error.clone()).context(ScanSeriesSnafu);
|
||||
let _ = sender.send(result).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn fetch_add_sender_idx(&mut self) -> usize {
|
||||
let sender_idx = self.sender_idx;
|
||||
self.sender_idx = (self.sender_idx + 1) % self.senders.len();
|
||||
sender_idx
|
||||
}
|
||||
}
|
||||
|
||||
fn new_partition_metrics(
|
||||
stream_ctx: &StreamContext,
|
||||
metrics_set: &ExecutionPlanMetricsSet,
|
||||
partition: usize,
|
||||
metrics_list: &PartitionMetricsList,
|
||||
) -> PartitionMetrics {
|
||||
let metrics = PartitionMetrics::new(
|
||||
stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
"SeriesScan",
|
||||
stream_ctx.query_start,
|
||||
metrics_set,
|
||||
);
|
||||
|
||||
metrics_list.set(partition, metrics.clone());
|
||||
metrics
|
||||
}
|
||||
@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
|
||||
use crate::error::{
|
||||
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||
};
|
||||
use crate::read::compat::CompatBatch;
|
||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||
@@ -294,7 +294,7 @@ impl RangeBase {
|
||||
};
|
||||
if filter
|
||||
.evaluate_scalar(&pk_value)
|
||||
.context(FilterRecordBatchSnafu)?
|
||||
.context(RecordBatchSnafu)?
|
||||
{
|
||||
continue;
|
||||
} else {
|
||||
@@ -311,11 +311,11 @@ impl RangeBase {
|
||||
let field_col = &input.fields()[field_index].data;
|
||||
filter
|
||||
.evaluate_vector(field_col)
|
||||
.context(FilterRecordBatchSnafu)?
|
||||
.context(RecordBatchSnafu)?
|
||||
}
|
||||
SemanticType::Timestamp => filter
|
||||
.evaluate_vector(input.timestamps())
|
||||
.context(FilterRecordBatchSnafu)?,
|
||||
.context(RecordBatchSnafu)?,
|
||||
};
|
||||
|
||||
mask = mask.bitand(&result);
|
||||
|
||||
@@ -979,7 +979,8 @@ pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api
|
||||
}
|
||||
}
|
||||
|
||||
/// Build rows with schema (string, f64, ts_millis).
|
||||
/// Build rows with schema (string, f64, ts_millis) in range `[start, end)`.
|
||||
/// `start`, `end` are in second resolution.
|
||||
pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
|
||||
(start..end)
|
||||
.map(|i| api::v1::Row {
|
||||
|
||||
@@ -61,13 +61,13 @@ impl ParallelizeScan {
|
||||
} else if let Some(region_scan_exec) =
|
||||
plan.as_any().downcast_ref::<RegionScanExec>()
|
||||
{
|
||||
let expected_partition_num = config.execution.target_partitions;
|
||||
if region_scan_exec.is_partition_set() {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
let ranges = region_scan_exec.get_partition_ranges();
|
||||
let total_range_num = ranges.len();
|
||||
let expected_partition_num = config.execution.target_partitions;
|
||||
|
||||
// assign ranges to each partition
|
||||
let mut partition_ranges =
|
||||
|
||||
@@ -23,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -45,7 +45,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -66,7 +66,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -89,7 +89,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -120,11 +120,11 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -151,7 +151,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_ProjectionExec: expr=[j@0 as j, prom_rate(j_range,i,test.j,Int64(10000))@1 as prom_rate(j_range,i,j,Int64(10000)), k@2 as k, l@3 as l] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
@@ -160,7 +160,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -21,7 +21,7 @@ tql analyze (1, 3, '1s') t1{ a = "a" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
+-+-+-+
|
||||
@@ -41,7 +41,7 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 6_|
|
||||
+-+-+-+
|
||||
@@ -61,7 +61,7 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" };
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[1000..3000], lookback=[300000], interval=[1000], time index=[b] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["a"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -35,7 +35,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -86,7 +86,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
@@ -94,7 +94,7 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -146,10 +146,10 @@ tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 ASC, l@3 ASC, j@1 ASC] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SortPreservingMergeExec: [k@2 ASC, l@3 ASC, j@1 ASC] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -23,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -45,7 +45,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -66,7 +66,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -89,7 +89,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), distribution=PerSeries, projection=["i", "j", "k"], filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(310000, None)], REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -120,11 +120,11 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -150,7 +150,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_rate(j_range,i,j,Int64(10000))@1 IS NOT NULL REDACTED
|
||||
@@ -158,7 +158,7 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
Reference in New Issue
Block a user