perf: reduce lock scope and improve log (#4453)

* feat: refine logs for scan

* feat: improve build parts and  unordered scan metrics

* feat: change to debug log

* fix: release lock before reading part

* test: replace region id

* test: fix sqlness

* chore: add todo

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Yingwen
2024-07-31 12:07:34 +08:00
committed by GitHub
parent 1ea43da9ea
commit f382a7695f
15 changed files with 160 additions and 60 deletions

View File

@@ -738,10 +738,14 @@ pub(crate) struct ScannerMetrics {
prepare_scan_cost: Duration,
/// Duration to build parts.
build_parts_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of batches returned.
@@ -766,12 +770,18 @@ impl ScannerMetrics {
/// Observes metrics on scanner finish.
fn observe_metrics_on_finish(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(self.build_reader_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(self.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(self.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["yield"])
.observe(self.yield_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
@@ -607,7 +607,9 @@ impl ScanInput {
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
let mut file_prune_cost = Duration::ZERO;
for file in &self.files {
let prune_start = Instant::now();
let res = self
.access_layer
.read_sst(file.clone())
@@ -620,6 +622,7 @@ impl ScanInput {
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
@@ -655,6 +658,13 @@ impl ScanInput {
READ_SST_COUNT.observe(self.files.len() as f64);
common_telemetry::debug!(
"Region {} prune {} files, cost is {:?}",
self.mapper.metadata().region_id,
self.files.len(),
file_prune_cost
);
Ok(())
}
@@ -713,7 +723,7 @@ pub(crate) type FileRangesGroup = SmallVec<[Vec<FileRange>; 4]>;
/// A partition of a scanner to read.
/// It contains memtables and file ranges to scan.
#[derive(Default)]
#[derive(Clone, Default)]
pub(crate) struct ScanPart {
/// Memtable ranges to scan.
pub(crate) memtable_ranges: Vec<MemtableRange>,
@@ -845,10 +855,10 @@ impl ScanPartList {
pub(crate) struct StreamContext {
/// Input memtables and files.
pub(crate) input: ScanInput,
/// Parts to scan.
/// Parts to scan and the cost to build parts.
/// The scanner builds parts to scan from the input lazily.
/// The mutex is used to ensure the parts are only built once.
pub(crate) parts: Mutex<ScanPartList>,
pub(crate) parts: Mutex<(ScanPartList, Duration)>,
// Metrics:
/// The start time of the query.
@@ -862,7 +872,7 @@ impl StreamContext {
Self {
input,
parts: Mutex::new(ScanPartList::default()),
parts: Mutex::new((ScanPartList::default(), Duration::default())),
query_start,
}
}
@@ -878,11 +888,11 @@ impl StreamContext {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtable ranges, {} file ranges)",
inner.len(),
inner.num_mem_ranges(),
inner.num_file_ranges()
inner.0.len(),
inner.0.num_mem_ranges(),
inner.0.num_file_ranges()
)?,
DisplayFormatType::Verbose => write!(f, "{:?}", &*inner)?,
DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?,
},
Err(_) => write!(f, "<locked>")?,
}

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_error::ext::BoxedError;
@@ -162,11 +162,11 @@ impl SeqScan {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let parts_len = parts.len();
let parts_len = parts.0.len();
let mut sources = Vec::with_capacity(parts_len);
for id in 0..parts_len {
let Some(part) = parts.get_part(id) else {
let Some(part) = parts.0.get_part(id) else {
return Ok(None);
};
@@ -185,17 +185,32 @@ impl SeqScan {
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
) -> Result<Option<BoxedBatchReader>> {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let mut sources = Vec::new();
let Some(part) = parts.get_part(range_id) else {
return Ok(None);
let build_start = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let Some(part) = parts.0.get_part(range_id) else {
return Ok(None);
};
let build_start = Instant::now();
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
build_start
};
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await;
let build_reader_cost = build_start.elapsed();
metrics.build_reader_cost += build_reader_cost;
common_telemetry::debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}",
stream_ctx.input.mapper.metadata().region_id,
range_id,
build_reader_cost
);
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
maybe_reader
}
async fn build_reader_from_sources(
@@ -290,7 +305,9 @@ impl SeqScan {
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
@@ -350,7 +367,7 @@ impl SeqScan {
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
parts.len()
parts.0.len()
};
for id in (0..parts_len).skip(partition).step_by(num_partitions) {
@@ -381,7 +398,9 @@ impl SeqScan {
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
@@ -389,12 +408,13 @@ impl SeqScan {
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
common_telemetry::debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}",
stream_ctx.input.mapper.metadata().region_id,
partition,
id,
metrics,
first_poll
first_poll,
);
}
};
@@ -410,10 +430,10 @@ impl SeqScan {
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut ScanPartList,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
@@ -422,9 +442,16 @@ impl SeqScan {
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(now.elapsed());
metrics.observe_init_part(build_part_cost);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
}
Ok(())
}
@@ -451,7 +478,11 @@ impl RegionScanner for SeqScan {
impl DisplayAs for SeqScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SeqScan: ")?;
write!(
f,
"SeqScan: region={}, ",
self.stream_ctx.input.mapper.metadata().region_id
)?;
self.stream_ctx.format_for_explain(t, f)
}
}

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
@@ -115,11 +115,11 @@ impl UnorderedScan {
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
metrics.scan_cost += start.elapsed();
let convert_start = Instant::now();
let record_batch = mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
metrics.scan_cost += start.elapsed();
Ok(Some(record_batch))
}
@@ -148,15 +148,21 @@ impl RegionScanner for UnorderedScan {
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(part) = parts.get_part(partition) else {
return;
let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// Clone the part and releases the lock.
// TODO(yingwen): We might wrap the part in an Arc in the future if cloning is too expensive.
let Some(part) = parts.0.get_part(partition).cloned() else {
return;
};
part
};
let build_reader_start = Instant::now();
let mapper = &stream_ctx.input.mapper;
let memtable_sources = part
.memtable_ranges
@@ -168,6 +174,7 @@ impl RegionScanner for UnorderedScan {
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();
let query_start = stream_ctx.query_start;
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans memtables first.
@@ -175,20 +182,26 @@ impl RegionScanner for UnorderedScan {
while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, None, &mut metrics).await? {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
}
// Then scans file ranges.
let mut reader_metrics = ReaderMetrics::default();
// Safety: UnorderedDistributor::build_parts() ensures this.
for file_range in &part.file_ranges[0] {
let build_reader_start = Instant::now();
let reader = file_range.reader(None).await.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.build_reader_cost += build_reader_start.elapsed();
let compat_batch = file_range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
@@ -213,7 +226,11 @@ impl RegionScanner for UnorderedScan {
impl DisplayAs for UnorderedScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UnorderedScan: ")?;
write!(
f,
"UnorderedScan: region={}, ",
self.stream_ctx.input.mapper.metadata().region_id
)?;
self.stream_ctx.format_for_explain(t, f)
}
}
@@ -236,11 +253,11 @@ impl UnorderedScan {
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
part_list: &mut ScanPartList,
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
@@ -249,9 +266,16 @@ async fn maybe_init_parts(
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(now.elapsed());
metrics.observe_init_part(build_part_cost);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
}
Ok(())
}

View File

@@ -198,7 +198,7 @@ impl MergeScanExec {
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;
let sub_sgate_metrics_moved = self.sub_stage_metrics.clone();
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
let stream = Box::pin(stream!({
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -226,6 +226,7 @@ impl MergeScanExec {
region_id,
plan: plan.clone(),
};
let do_get_start = Instant::now();
let mut stream = region_query_handler
.do_get(request)
.await
@@ -234,11 +235,11 @@ impl MergeScanExec {
BoxedError::new(e)
})
.context(ExternalSnafu)?;
let do_get_cost = do_get_start.elapsed();
ready_timer.stop();
let mut poll_duration = Duration::new(0, 0);
let mut poll_duration = Duration::ZERO;
let mut poll_timer = Instant::now();
while let Some(batch) = stream.next().await {
let poll_elapsed = poll_timer.elapsed();
@@ -249,13 +250,17 @@ impl MergeScanExec {
// to remove metadata and correct column name
let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
metric.record_output_batch_rows(batch.num_rows());
if let Some(first_consume_timer) = first_consume_timer.as_mut().take() {
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
}
yield Ok(batch);
// reset poll timer
poll_timer = Instant::now();
}
common_telemetry::debug!(
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
);
// process metrics after all data is drained.
if let Some(metrics) = stream.metrics() {
@@ -271,7 +276,7 @@ impl MergeScanExec {
metric.record_greptime_exec_cost(value as usize);
// record metrics from sub sgates
sub_sgate_metrics_moved.lock().unwrap().push(metrics);
sub_stage_metrics_moved.lock().unwrap().push(metrics);
}
MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());

View File

@@ -24,6 +24,7 @@ Affected Rows: 3
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze SELECT count(*) FROM system_metrics;
+-+-+-+
@@ -35,7 +36,7 @@ explain analyze SELECT count(*) FROM system_metrics;
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 1_|
+-+-+-+

View File

@@ -20,6 +20,7 @@ VALUES
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze SELECT count(*) FROM system_metrics;
drop table system_metrics;

View File

@@ -18,6 +18,7 @@ Affected Rows: 0
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t group by host;
@@ -33,7 +34,7 @@ select sum(val) from t group by host;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
| 1_| 1_|_ProjectionExec: expr=[SUM(t.val)@1 as SUM(t.val)] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[host@0 as host], aggr=[SUM(t.val)] REDACTED
@@ -42,7 +43,7 @@ select sum(val) from t group by host;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
@@ -52,6 +53,7 @@ select sum(val) from t group by host;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t;
@@ -64,9 +66,9 @@ select sum(val) from t;
|_|_|_ProjectionExec: expr=[val@1 as val] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
| 1_| 1_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 1_|
+-+-+-+
@@ -77,6 +79,7 @@ select sum(val) from t;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t group by idc;
@@ -92,9 +95,9 @@ select sum(val) from t group by idc;
|_|_|_ProjectionExec: expr=[val@1 as val, idc@3 as idc] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
| 1_| 1_|_SeqScan: partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+

View File

@@ -16,6 +16,7 @@ partition on columns (host) (
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t group by host;
@@ -24,6 +25,7 @@ select sum(val) from t group by host;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t;
@@ -33,6 +35,7 @@ select sum(val) from t;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select sum(val) from t group by idc;

View File

@@ -67,6 +67,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
+-+-+-+
@@ -76,7 +77,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 10_|
+-+-+-+

View File

@@ -35,6 +35,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
DROP TABLE host;

View File

@@ -13,6 +13,7 @@ Affected Rows: 3
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 10, '5s') test;
+-+-+-+
@@ -31,7 +32,7 @@ TQL ANALYZE (0, 10, '5s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -43,6 +44,7 @@ TQL ANALYZE (0, 10, '5s') test;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 10, '1s', '2s') test;
+-+-+-+
@@ -61,7 +63,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -72,6 +74,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '5s') test;
+-+-+-+
@@ -90,7 +93,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -103,6 +106,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
+-+-+-+
@@ -121,7 +125,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+

View File

@@ -9,6 +9,7 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 10, '5s') test;
-- 'lookback' parameter is not fully supported, the test has to be updated
@@ -18,6 +19,7 @@ TQL ANALYZE (0, 10, '5s') test;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 10, '1s', '2s') test;
-- analyze at 0s, 5s and 10s. No point at 0s.
@@ -26,6 +28,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '5s') test;
-- analyze verbose at 0s, 5s and 10s. No point at 0s.
@@ -36,6 +39,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
DROP TABLE test;

View File

@@ -27,6 +27,7 @@ Affected Rows: 9
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select
last_value(host order by ts),
@@ -47,7 +48,7 @@ explain analyze
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[last_value(t.host) ORDER BY [t.ts ASC NULLS LAST], last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST], last_value(t.val) ORDER BY [t.ts ASC NULLS LAST]] REDACTED
|_|_|_RepartitionExec: REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges), selector=LastRow REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges), selector=LastRow REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+

View File

@@ -23,6 +23,7 @@ insert into t values
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
explain analyze
select
last_value(host order by ts),