Compare commits

..

42 Commits

Author SHA1 Message Date
evenyag
6247de2d50 chore: Revert "feat: prune in each partition"
This reverts commit 3f9bf48161.
2024-11-08 23:57:18 +08:00
evenyag
a2eb46132f feat: tokio dump 2024-11-08 23:08:47 +08:00
evenyag
3f9bf48161 feat: prune in each partition 2024-11-08 21:31:03 +08:00
evenyag
9bd2e006b5 feat: file output thread id 2024-11-08 20:35:40 +08:00
evenyag
031421ca91 feat: add thread id to log 2024-11-08 19:09:08 +08:00
evenyag
999f3a40c2 chore: Revert "chore: Revert "feat: yield after get ranges""
This reverts commit 770a850437.
2024-11-08 17:12:54 +08:00
evenyag
50d28e0a00 feat: add timeout to build ranges 2024-11-08 16:23:03 +08:00
evenyag
770a850437 chore: Revert "feat: yield after get ranges"
This reverts commit 65e53b5bc4.
2024-11-08 15:35:23 +08:00
evenyag
65e53b5bc4 feat: yield after get ranges 2024-11-08 01:28:39 +08:00
evenyag
9a6c7aa4d6 chore: log label 2024-11-08 01:08:02 +08:00
evenyag
4f446b95d8 chore: logs 2024-11-08 01:01:27 +08:00
evenyag
9ad4200f55 feat: only log for unordered scan 2024-11-08 00:58:29 +08:00
evenyag
53d456651f chore: range builder logs 2024-11-08 00:11:54 +08:00
evenyag
f11c5acb0f feat: logs for debug prune cost 2024-11-07 22:10:46 +08:00
evenyag
8536a1ec6e chore: logs to debug hang 2024-11-07 20:36:12 +08:00
evenyag
fce8c968da feat: gauge for scan partition 2024-11-07 16:55:40 +08:00
evenyag
98a6ac973c feat: log on merge scan region start/end 2024-11-07 16:48:03 +08:00
evenyag
8f79e421c3 chore: Revert "feat: remove too large files"
This reverts commit a22667bf3c.
2024-11-07 16:20:39 +08:00
evenyag
e8b326382f chore: fix compile 2024-11-07 00:28:19 +08:00
evenyag
56781e7fbc fix: skip expired files 2024-11-07 00:25:52 +08:00
evenyag
7d342b3d95 feat: small max file size 2024-11-06 23:31:16 +08:00
evenyag
a22667bf3c feat: remove too large files 2024-11-06 22:08:43 +08:00
evenyag
29b9b7db0c feat: support compression method 2024-11-06 18:58:20 +08:00
evenyag
a66909a562 chore: fix compile 2024-11-06 16:29:18 +08:00
evenyag
8137b8ff3d chore: more logs 2024-11-06 15:25:49 +08:00
Ruihang Xia
7c5cd2922a fix split logic
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-05 23:43:35 +08:00
evenyag
a1d0dcf2c3 chore: more logs 2024-11-05 20:25:05 +08:00
evenyag
c391171f99 feat: more logs 2024-11-05 20:18:35 +08:00
evenyag
f44862aaac feat: update log 2024-11-05 17:47:32 +08:00
evenyag
8bf795d88c chore: more logs 2024-11-05 16:22:54 +08:00
evenyag
3bbf4e0232 feat: log range meta 2024-11-05 16:01:55 +08:00
evenyag
83da3950da chore: debug 2024-11-05 15:33:42 +08:00
evenyag
957b5effd5 chore: fix compile 2024-11-05 15:32:35 +08:00
evenyag
f59e28006a feat: assert precision 2024-11-05 15:24:40 +08:00
evenyag
3e5bbdf71e feat: enable batch checker 2024-11-05 15:24:40 +08:00
Ruihang Xia
b8ac19c480 log on wrong range index
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-05 14:57:44 +08:00
evenyag
92b274a856 chore: log compute cost 2024-11-05 14:57:54 +08:00
evenyag
6bdac25f0a chore: more logs 2024-11-05 13:02:16 +08:00
evenyag
a9f3c4b17c chore: page reader metrics 2024-11-04 20:08:56 +08:00
evenyag
e003eaab36 chore: more log 2024-11-04 20:06:20 +08:00
evenyag
6e590da412 chore: remove compaction skip log 2024-11-04 19:40:42 +08:00
evenyag
ff5fa40b85 feat: skip wal 2024-11-04 19:40:41 +08:00
29 changed files with 725 additions and 117 deletions

View File

@@ -267,13 +267,15 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "a10facb353b41460eeb98578868ebf19c2084fac" rev = "a10facb353b41460eeb98578868ebf19c2084fac"
[profile.release] [profile.release]
debug = 1 # debug = 1
split-debuginfo = "off"
[profile.nightly] [profile.nightly]
inherits = "release" inherits = "release"
strip = "debuginfo" split-debuginfo = "off"
# strip = "debuginfo"
lto = "thin" lto = "thin"
debug = false # debug = false
incremental = false incremental = false
[profile.ci] [profile.ci]

View File

@@ -192,6 +192,7 @@ pub fn init_global_logging(
if opts.log_format == LogFormat::Json { if opts.log_format == LogFormat::Json {
Some( Some(
Layer::new() Layer::new()
.with_thread_ids(true)
.json() .json()
.with_writer(writer) .with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout)) .with_ansi(atty::is(atty::Stream::Stdout))
@@ -200,6 +201,7 @@ pub fn init_global_logging(
} else { } else {
Some( Some(
Layer::new() Layer::new()
.with_thread_ids(true)
.with_writer(writer) .with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout)) .with_ansi(atty::is(atty::Stream::Stdout))
.boxed(), .boxed(),
@@ -228,13 +230,20 @@ pub fn init_global_logging(
if opts.log_format == LogFormat::Json { if opts.log_format == LogFormat::Json {
Some( Some(
Layer::new() Layer::new()
.with_thread_ids(true)
.json() .json()
.with_writer(writer) .with_writer(writer)
.with_ansi(false) .with_ansi(false)
.boxed(), .boxed(),
) )
} else { } else {
Some(Layer::new().with_writer(writer).with_ansi(false).boxed()) Some(
Layer::new()
.with_thread_ids(true)
.with_writer(writer)
.with_ansi(false)
.boxed(),
)
} }
} else { } else {
None None
@@ -260,6 +269,7 @@ pub fn init_global_logging(
Some( Some(
Layer::new() Layer::new()
.json() .json()
.with_thread_ids(true)
.with_writer(writer) .with_writer(writer)
.with_ansi(false) .with_ansi(false)
.with_filter(filter::LevelFilter::ERROR) .with_filter(filter::LevelFilter::ERROR)
@@ -268,6 +278,7 @@ pub fn init_global_logging(
} else { } else {
Some( Some(
Layer::new() Layer::new()
.with_thread_ids(true)
.with_writer(writer) .with_writer(writer)
.with_ansi(false) .with_ansi(false)
.with_filter(filter::LevelFilter::ERROR) .with_filter(filter::LevelFilter::ERROR)

View File

@@ -62,6 +62,8 @@ impl IndexApplier for PredicatesIndexApplier {
break; break;
} }
common_telemetry::info!("Predicate apply, apply name start, name: {}", name);
let Some(meta) = metadata.metas.get(name) else { let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy { match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => { IndexNotFoundStrategy::ReturnEmpty => {
@@ -85,6 +87,8 @@ impl IndexApplier for PredicatesIndexApplier {
let bm = mapper.map_values(&values).await?; let bm = mapper.map_values(&values).await?;
bitmap &= bm; bitmap &= bm;
common_telemetry::info!("Predicate apply, apply name end, name: {}", name);
} }
output.matched_segment_ids = bitmap; output.matched_segment_ids = bitmap;

View File

@@ -87,7 +87,16 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
&mut self, &mut self,
dest: &mut Vec<u8>, dest: &mut Vec<u8>,
) -> index::inverted_index::error::Result<usize> { ) -> index::inverted_index::error::Result<usize> {
self.inner.read_all(dest).await common_telemetry::debug!(
"Inverted index reader read_all start, file_id: {}",
self.file_id,
);
let res = self.inner.read_all(dest).await;
common_telemetry::debug!(
"Inverted index reader read_all end, file_id: {}",
self.file_id,
);
res
} }
async fn seek_read( async fn seek_read(
@@ -95,7 +104,20 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
offset: u64, offset: u64,
size: u32, size: u32,
) -> index::inverted_index::error::Result<Vec<u8>> { ) -> index::inverted_index::error::Result<Vec<u8>> {
self.inner.seek_read(offset, size).await common_telemetry::debug!(
"Inverted index reader seek_read start, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
let res = self.inner.seek_read(offset, size).await;
common_telemetry::debug!(
"Inverted index reader seek_read end, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
res
} }
async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> { async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> {
@@ -103,8 +125,16 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(cached) Ok(cached)
} else { } else {
common_telemetry::debug!(
"Inverted index reader get metadata start, file_id: {}",
self.file_id,
);
let meta = self.inner.metadata().await?; let meta = self.inner.metadata().await?;
self.cache.put_index_metadata(self.file_id, meta.clone()); self.cache.put_index_metadata(self.file_id, meta.clone());
common_telemetry::debug!(
"Inverted index reader get metadata end, file_id: {}",
self.file_id,
);
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta) Ok(meta)
} }
@@ -115,9 +145,23 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
offset: u64, offset: u64,
size: u32, size: u32,
) -> index::inverted_index::error::Result<FstMap> { ) -> index::inverted_index::error::Result<FstMap> {
self.get_or_load(offset, size) common_telemetry::debug!(
"Inverted index reader fst start, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
let res = self
.get_or_load(offset, size)
.await .await
.and_then(|r| FstMap::new(r).context(DecodeFstSnafu)) .and_then(|r| FstMap::new(r).context(DecodeFstSnafu));
common_telemetry::debug!(
"Inverted index reader fst end, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
res
} }
async fn bitmap( async fn bitmap(
@@ -125,7 +169,20 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
offset: u64, offset: u64,
size: u32, size: u32,
) -> index::inverted_index::error::Result<BitVec> { ) -> index::inverted_index::error::Result<BitVec> {
self.get_or_load(offset, size).await.map(BitVec::from_vec) common_telemetry::debug!(
"Inverted index reader bitmap start, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
let res = self.get_or_load(offset, size).await.map(BitVec::from_vec);
common_telemetry::debug!(
"Inverted index reader bitmap end, file_id: {}, offset: {}, size: {}",
self.file_id,
offset,
size,
);
res
} }
} }

View File

@@ -259,6 +259,7 @@ impl Compactor for DefaultCompactor {
let write_opts = WriteOptions { let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
compression_method: compaction_region.engine_config.compression_method,
..Default::default() ..Default::default()
}; };

View File

@@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug; use std::fmt::Debug;
use common_telemetry::{debug, info}; use common_telemetry::{info, trace};
use common_time::timestamp::TimeUnit; use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned; use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp; use common_time::Timestamp;
@@ -114,7 +114,7 @@ impl TwcsPicker {
// Files in window exceeds file num limit // Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)] vec![enforce_file_num(&files.files, max_files)]
} else { } else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue; continue;
}; };
@@ -297,6 +297,9 @@ fn assign_to_windows<'a>(
let mut windows: HashMap<i64, Window> = HashMap::new(); let mut windows: HashMap<i64, Window> = HashMap::new();
// Iterates all files and assign to time windows according to max timestamp // Iterates all files and assign to time windows according to max timestamp
for f in files { for f in files {
if f.compacting() {
continue;
}
let (_, end) = f.time_range(); let (_, end) = f.time_range();
let time_window = end let time_window = end
.convert_to(TimeUnit::Second) .convert_to(TimeUnit::Second)

View File

@@ -128,6 +128,20 @@ pub struct MitoConfig {
/// To align with the old behavior, the default value is 0 (no restrictions). /// To align with the old behavior, the default value is 0 (no restrictions).
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration, pub min_compaction_interval: Duration,
/// Skip wal
pub skip_wal: bool,
/// SST compression method.
pub compression_method: CompressionMethod,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CompressionMethod {
#[default]
Zstd,
Lz4,
None,
} }
impl Default for MitoConfig { impl Default for MitoConfig {
@@ -161,6 +175,8 @@ impl Default for MitoConfig {
fulltext_index: FulltextIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(),
memtable: MemtableConfig::default(), memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0), min_compaction_interval: Duration::from_secs(0),
skip_wal: false,
compression_method: CompressionMethod::Zstd,
}; };
// Adjust buffer and cache size according to system memory if we can. // Adjust buffer and cache size according to system memory if we can.

View File

@@ -870,6 +870,15 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Timeout: {}", msg))]
Timeout {
msg: String,
#[snafu(source)]
error: tokio::time::error::Elapsed,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1002,6 +1011,7 @@ impl ErrorExt for Error {
| ApplyFulltextIndex { source, .. } => source.status_code(), | ApplyFulltextIndex { source, .. } => source.status_code(),
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal, DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
RegionBusy { .. } => StatusCode::RegionBusy, RegionBusy { .. } => StatusCode::RegionBusy,
Timeout { .. } => StatusCode::Cancelled,
} }
} }

View File

@@ -321,6 +321,7 @@ impl RegionFlushTask {
let mut write_opts = WriteOptions { let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size, write_buffer_size: self.engine_config.sst_write_buffer_size,
compression_method: self.engine_config.compression_method,
..Default::default() ..Default::default()
}; };
if let Some(row_group_size) = self.row_group_size { if let Some(row_group_size) = self.row_group_size {

View File

@@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason";
pub const FILE_TYPE_LABEL: &str = "file_type"; pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label. /// Region worker id label.
pub const WORKER_LABEL: &str = "worker"; pub const WORKER_LABEL: &str = "worker";
/// Partition label.
pub const PARTITION_LABEL: &str = "partition";
lazy_static! { lazy_static! {
/// Global write buffer size in bytes. /// Global write buffer size in bytes.
@@ -134,6 +136,14 @@ lazy_static! {
) )
.unwrap(); .unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]); pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
pub static ref READ_STAGE_BUILD_PAGE_READER: Histogram = READ_STAGE_ELAPSED.with_label_values(&["build_page_reader"]);
/// In progress scan for each partition.
pub static ref SCAN_PARTITION: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_scan_partition",
"mito partitions scanning",
&[TYPE_LABEL, PARTITION_LABEL]
)
.unwrap();
/// Counter of rows read from different source. /// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec = pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();

View File

@@ -493,7 +493,7 @@ impl Batch {
} }
/// Checks the batch is monotonic by timestamps. /// Checks the batch is monotonic by timestamps.
#[cfg(debug_assertions)] // #[cfg(debug_assertions)]
pub(crate) fn check_monotonic(&self) -> Result<(), String> { pub(crate) fn check_monotonic(&self) -> Result<(), String> {
use std::cmp::Ordering; use std::cmp::Ordering;
if self.timestamps_native().is_none() { if self.timestamps_native().is_none() {
@@ -501,12 +501,12 @@ impl Batch {
} }
let timestamps = self.timestamps_native().unwrap(); let timestamps = self.timestamps_native().unwrap();
let sequences = self.sequences.as_arrow().values(); // let sequences = self.sequences.as_arrow().values();
for (i, window) in timestamps.windows(2).enumerate() { for (i, window) in timestamps.windows(2).enumerate() {
let current = window[0]; let current = window[0];
let next = window[1]; let next = window[1];
let current_sequence = sequences[i]; // let current_sequence = sequences[i];
let next_sequence = sequences[i + 1]; // let next_sequence = sequences[i + 1];
match current.cmp(&next) { match current.cmp(&next) {
Ordering::Less => { Ordering::Less => {
// The current timestamp is less than the next timestamp. // The current timestamp is less than the next timestamp.
@@ -514,12 +514,12 @@ impl Batch {
} }
Ordering::Equal => { Ordering::Equal => {
// The current timestamp is equal to the next timestamp. // The current timestamp is equal to the next timestamp.
if current_sequence < next_sequence { // if current_sequence < next_sequence {
return Err(format!( // return Err(format!(
"sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}", // "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
current, next, current_sequence, next_sequence, i // current, next, current_sequence, next_sequence, i
)); // ));
} // }
} }
Ordering::Greater => { Ordering::Greater => {
// The current timestamp is greater than the next timestamp. // The current timestamp is greater than the next timestamp.
@@ -535,7 +535,7 @@ impl Batch {
} }
/// Returns Ok if the given batch is behind the current batch. /// Returns Ok if the given batch is behind the current batch.
#[cfg(debug_assertions)] // #[cfg(debug_assertions)]
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> { pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
// Checks the primary key // Checks the primary key
if self.primary_key() < other.primary_key() { if self.primary_key() < other.primary_key() {
@@ -560,19 +560,20 @@ impl Batch {
)); ));
} }
// Checks the sequence. // Checks the sequence.
if self.last_sequence() >= other.first_sequence() { Ok(())
return Ok(()); // if self.last_sequence() >= other.first_sequence() {
} // return Ok(());
Err(format!( // }
"sequences are not monotonic: {:?} < {:?}", // Err(format!(
self.last_sequence(), // "sequences are not monotonic: {:?} < {:?}",
other.first_sequence() // self.last_sequence(),
)) // other.first_sequence()
// ))
} }
} }
/// A struct to check the batch is monotonic. /// A struct to check the batch is monotonic.
#[cfg(debug_assertions)] // #[cfg(debug_assertions)]
#[derive(Default)] #[derive(Default)]
pub(crate) struct BatchChecker { pub(crate) struct BatchChecker {
last_batch: Option<Batch>, last_batch: Option<Batch>,
@@ -580,7 +581,7 @@ pub(crate) struct BatchChecker {
end: Option<Timestamp>, end: Option<Timestamp>,
} }
#[cfg(debug_assertions)] // #[cfg(debug_assertions)]
impl BatchChecker { impl BatchChecker {
/// Attaches the given start timestamp to the checker. /// Attaches the given start timestamp to the checker.
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self { pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {

View File

@@ -662,10 +662,17 @@ impl ScanInput {
/// Prunes a file to scan and returns the builder to build readers. /// Prunes a file to scan and returns the builder to build readers.
async fn prune_file( async fn prune_file(
&self, &self,
row_group_index: RowGroupIndex,
file_index: usize, file_index: usize,
reader_metrics: &mut ReaderMetrics, reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> { ) -> Result<FileRangeBuilder> {
let file = &self.files[file_index]; let file = &self.files[file_index];
common_telemetry::info!(
"ScanInput prune file start, region_id: {}, file: {}, row_group_index: {:?}",
file.region_id(),
file.file_id(),
row_group_index,
);
let res = self let res = self
.access_layer .access_layer
.read_sst(file.clone()) .read_sst(file.clone())
@@ -701,6 +708,13 @@ impl ScanInput {
)?; )?;
file_range_ctx.set_compat_batch(Some(compat)); file_range_ctx.set_compat_batch(Some(compat));
} }
common_telemetry::info!(
"ScanInput prune file end, region_id: {}, file: {}, row_groups_num: {}, row_group_index: {:?}",
file.region_id(),
file.file_id(),
row_groups.len(),
row_group_index,
);
Ok(FileRangeBuilder { Ok(FileRangeBuilder {
context: Some(Arc::new(file_range_ctx)), context: Some(Arc::new(file_range_ctx)),
row_groups, row_groups,
@@ -821,11 +835,12 @@ impl StreamContext {
pub(crate) async fn build_file_ranges( pub(crate) async fn build_file_ranges(
&self, &self,
index: RowGroupIndex, index: RowGroupIndex,
read_type: &'static str,
reader_metrics: &mut ReaderMetrics, reader_metrics: &mut ReaderMetrics,
) -> Result<SmallVec<[FileRange; 2]>> { ) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new(); let mut ranges = SmallVec::new();
self.range_builders self.range_builders
.build_file_ranges(&self.input, index, &mut ranges, reader_metrics) .build_file_ranges(&self.input, index, read_type, &mut ranges, reader_metrics)
.await?; .await?;
Ok(ranges) Ok(ranges)
} }
@@ -896,19 +911,52 @@ impl RangeBuilderList {
&self, &self,
input: &ScanInput, input: &ScanInput,
index: RowGroupIndex, index: RowGroupIndex,
read_type: &'static str,
ranges: &mut SmallVec<[FileRange; 2]>, ranges: &mut SmallVec<[FileRange; 2]>,
reader_metrics: &mut ReaderMetrics, reader_metrics: &mut ReaderMetrics,
) -> Result<()> { ) -> Result<()> {
let file_index = index.index - self.mem_builders.len(); let file_index = index.index - self.mem_builders.len();
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] RangeBuilderList build ranges start, region_id: {}, row_group_index: {:?}",
input.mapper.metadata().region_id,
index,
);
}
let mut builder_opt = self.file_builders[file_index].lock().await; let mut builder_opt = self.file_builders[file_index].lock().await;
match &mut *builder_opt { match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges), Some(builder) => {
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] RangeBuilderList build ranges get lock, build ranges, region_id: {}, row_group_index: {:?}",
input.mapper.metadata().region_id,
index,
);
}
builder.build_ranges(index.row_group_index, ranges)
}
None => { None => {
let builder = input.prune_file(file_index, reader_metrics).await?; if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] RangeBuilderList build ranges get lock, build builder, region_id: {}, row_group_index: {:?}",
input.mapper.metadata().region_id,
index,
);
}
let builder = input.prune_file(index, file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, ranges); builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder); *builder_opt = Some(builder);
} }
} }
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] RangeBuilderList build ranges end, region_id: {}, row_group_index: {:?}, ranges: {}",
input.mapper.metadata().region_id,
index,
ranges.len(),
);
}
Ok(()) Ok(())
} }

View File

@@ -20,15 +20,21 @@ use std::time::{Duration, Instant};
use async_stream::try_stream; use async_stream::try_stream;
use common_telemetry::debug; use common_telemetry::debug;
use futures::Stream; use futures::Stream;
use prometheus::IntGauge;
use snafu::ResultExt;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use tokio::task::yield_now;
use crate::error::Result; use crate::error::{Result, TimeoutSnafu};
use crate::metrics::SCAN_PARTITION;
use crate::read::range::RowGroupIndex; use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext; use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source}; use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange; use crate::sst::file::FileTimeRange;
use crate::sst::parquet::reader::ReaderMetrics; use crate::sst::parquet::reader::ReaderMetrics;
const BUILD_RANGES_TIMEOUT: Duration = Duration::from_secs(60 * 5);
struct PartitionMetricsInner { struct PartitionMetricsInner {
region_id: RegionId, region_id: RegionId,
/// Index of the partition to scan. /// Index of the partition to scan.
@@ -41,6 +47,7 @@ struct PartitionMetricsInner {
first_poll: Duration, first_poll: Duration,
metrics: ScannerMetrics, metrics: ScannerMetrics,
reader_metrics: ReaderMetrics, reader_metrics: ReaderMetrics,
scan_partition_gauge: IntGauge,
} }
impl PartitionMetricsInner { impl PartitionMetricsInner {
@@ -56,6 +63,7 @@ impl Drop for PartitionMetricsInner {
fn drop(&mut self) { fn drop(&mut self) {
self.on_finish(); self.on_finish();
self.metrics.observe_metrics(); self.metrics.observe_metrics();
self.scan_partition_gauge.dec();
debug!( debug!(
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}", "{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
@@ -76,6 +84,10 @@ impl PartitionMetrics {
query_start: Instant, query_start: Instant,
metrics: ScannerMetrics, metrics: ScannerMetrics,
) -> Self { ) -> Self {
let partition_str = partition.to_string();
let scan_partition_gauge =
SCAN_PARTITION.with_label_values(&[scanner_type, &partition_str]);
scan_partition_gauge.inc();
let inner = PartitionMetricsInner { let inner = PartitionMetricsInner {
region_id, region_id,
partition, partition,
@@ -84,10 +96,15 @@ impl PartitionMetrics {
first_poll: Duration::default(), first_poll: Duration::default(),
metrics, metrics,
reader_metrics: ReaderMetrics::default(), reader_metrics: ReaderMetrics::default(),
scan_partition_gauge,
}; };
Self(Arc::new(Mutex::new(inner))) Self(Arc::new(Mutex::new(inner)))
} }
pub(crate) fn partition(&self) -> usize {
self.0.lock().unwrap().partition
}
pub(crate) fn on_first_poll(&self) { pub(crate) fn on_first_poll(&self) {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
inner.first_poll = inner.query_start.elapsed(); inner.first_poll = inner.query_start.elapsed();
@@ -126,6 +143,7 @@ impl PartitionMetrics {
/// Scans memtable ranges at `index`. /// Scans memtable ranges at `index`.
pub(crate) fn scan_mem_ranges( pub(crate) fn scan_mem_ranges(
partition: usize,
stream_ctx: Arc<StreamContext>, stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics, part_metrics: PartitionMetrics,
index: RowGroupIndex, index: RowGroupIndex,
@@ -137,7 +155,17 @@ pub(crate) fn scan_mem_ranges(
for range in ranges { for range in ranges {
let build_reader_start = Instant::now(); let build_reader_start = Instant::now();
let iter = range.build_iter(time_range)?; let iter = range.build_iter(time_range)?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
common_telemetry::debug!(
"Thread: {:?}, Scan mem range, region_id: {}, partition: {}, time_range: {:?}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
time_range,
index,
build_cost
);
let mut source = Source::Iter(iter); let mut source = Source::Iter(iter);
while let Some(batch) = source.next_batch().await? { while let Some(batch) = source.next_batch().await? {
@@ -149,6 +177,7 @@ pub(crate) fn scan_mem_ranges(
/// Scans file ranges at `index`. /// Scans file ranges at `index`.
pub(crate) fn scan_file_ranges( pub(crate) fn scan_file_ranges(
partition: usize,
stream_ctx: Arc<StreamContext>, stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics, part_metrics: PartitionMetrics,
index: RowGroupIndex, index: RowGroupIndex,
@@ -156,14 +185,70 @@ pub(crate) fn scan_file_ranges(
) -> impl Stream<Item = Result<Batch>> { ) -> impl Stream<Item = Result<Batch>> {
try_stream! { try_stream! {
let mut reader_metrics = ReaderMetrics::default(); let mut reader_metrics = ReaderMetrics::default();
let ranges = stream_ctx if read_type == "unordered_scan_files" {
.build_file_ranges(index, &mut reader_metrics) common_telemetry::debug!(
.await?; "[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges start, region_id: {}, partition: {}, index: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
index,
);
}
let ranges = tokio::time::timeout(
BUILD_RANGES_TIMEOUT,
stream_ctx.build_file_ranges(index, read_type, &mut reader_metrics),
)
.await
.with_context(|_| TimeoutSnafu {
msg: format!(
"build file ranges for {}, partition: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
),
})
.inspect_err(|e| {
common_telemetry::error!(
e; "Thread: {:?}, Scan file ranges build ranges timeout, region_id: {}, partition: {}, index: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
index,
);
})??;
// let ranges = stream_ctx
// .build_file_ranges(index, read_type, &mut reader_metrics)
// .await?;
part_metrics.inc_num_file_ranges(ranges.len()); part_metrics.inc_num_file_ranges(ranges.len());
// Notify other partitions.
yield_now().await;
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges end, region_id: {}, partition: {}, index: {:?}, ranges: {}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
index,
ranges.len(),
);
}
for range in ranges { for range in ranges {
let build_reader_start = Instant::now(); let build_reader_start = Instant::now();
let reader = range.reader(None).await?; let reader = range.reader(None).await?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"[DEBUG_SCAN] Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
range.file_handle().file_id(),
index,
build_cost
);
}
let compat_batch = range.compat_batch(); let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader); let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? { while let Some(mut batch) = source.next_batch().await? {

View File

@@ -361,6 +361,7 @@ fn build_sources(
for index in &range_meta.row_group_indices { for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) { let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges( let stream = scan_mem_ranges(
part_metrics.partition(),
stream_ctx.clone(), stream_ctx.clone(),
part_metrics.clone(), part_metrics.clone(),
*index, *index,
@@ -373,8 +374,13 @@ fn build_sources(
} else { } else {
"seq_scan_files" "seq_scan_files"
}; };
let stream = let stream = scan_file_ranges(
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type); part_metrics.partition(),
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
);
Box::pin(stream) as _ Box::pin(stream) as _
}; };
sources.push(Source::Stream(stream)); sources.push(Source::Stream(stream));

View File

@@ -81,6 +81,7 @@ impl UnorderedScan {
/// Scans a [PartitionRange] by its `identifier` and returns a stream. /// Scans a [PartitionRange] by its `identifier` and returns a stream.
fn scan_partition_range( fn scan_partition_range(
partition: usize,
stream_ctx: Arc<StreamContext>, stream_ctx: Arc<StreamContext>,
part_range_id: usize, part_range_id: usize,
part_metrics: PartitionMetrics, part_metrics: PartitionMetrics,
@@ -90,12 +91,12 @@ impl UnorderedScan {
let range_meta = &stream_ctx.ranges[part_range_id]; let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices { for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) { if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range); let stream = scan_mem_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
for await batch in stream { for await batch in stream {
yield batch; yield batch;
} }
} else { } else {
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
for await batch in stream { for await batch in stream {
yield batch; yield batch;
} }
@@ -132,24 +133,45 @@ impl UnorderedScan {
let part_ranges = self.properties.partitions[partition].clone(); let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range(); let distinguish_range = self.properties.distinguish_partition_range();
common_telemetry::info!(
"[DEBUG_SCAN] Thread: {:?}, Unordered scan start, region_id: {}, partition: {}, num_ranges: {}, part_ranges: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
part_ranges.len(),
part_ranges,
);
let stream = try_stream! { let stream = try_stream! {
part_metrics.on_first_poll(); part_metrics.on_first_poll();
let cache = stream_ctx.input.cache_manager.as_deref(); let cache = stream_ctx.input.cache_manager.as_deref();
let ranges_len = part_ranges.len();
// Scans each part. // Scans each part.
for part_range in part_ranges { for (part_idx, part_range) in part_ranges.into_iter().enumerate() {
let mut metrics = ScannerMetrics::default(); common_telemetry::debug!(
let mut fetch_start = Instant::now(); "[DEBUG_SCAN] Thread: {:?}, Unordered scan range start {}/{}, region_id: {}, partition: {}, part_range: {:?}, range_meta: {:?}",
#[cfg(debug_assertions)] std::thread::current().id(),
part_idx,
ranges_len,
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
stream_ctx.ranges[part_range.identifier]
);
// #[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default() let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start)) .with_start(Some(part_range.start))
.with_end(Some(part_range.end)); .with_end(Some(part_range.end));
let stream = Self::scan_partition_range( let stream = Self::scan_partition_range(
partition,
stream_ctx.clone(), stream_ctx.clone(),
part_range.identifier, part_range.identifier,
part_metrics.clone(), part_metrics.clone(),
); );
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
for await batch in stream { for await batch in stream {
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?; let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.scan_cost += fetch_start.elapsed(); metrics.scan_cost += fetch_start.elapsed();
@@ -161,7 +183,7 @@ impl UnorderedScan {
continue; continue;
} }
#[cfg(debug_assertions)] // #[cfg(debug_assertions)]
checker.ensure_part_range_batch( checker.ensure_part_range_batch(
"UnorderedScan", "UnorderedScan",
stream_ctx.input.mapper.metadata().region_id, stream_ctx.input.mapper.metadata().region_id,
@@ -188,7 +210,20 @@ impl UnorderedScan {
metrics.yield_cost += yield_start.elapsed(); metrics.yield_cost += yield_start.elapsed();
} }
metrics.scan_cost += fetch_start.elapsed(); let scan_cost = fetch_start.elapsed();
metrics.scan_cost += scan_cost;
common_telemetry::debug!(
"[DEBUG_SCAN] Thread: {:?}, Unordered scan range end {}/{}, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}",
std::thread::current().id(),
part_idx,
ranges_len,
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
metrics.scan_cost,
metrics.yield_cost,
metrics.num_rows,
);
part_metrics.merge_metrics(&metrics); part_metrics.merge_metrics(&metrics);
} }

View File

@@ -106,6 +106,10 @@ impl InvertedIndexApplier {
if let Err(err) = other { if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
} }
common_telemetry::debug!(
"Inverted applier get from remote blob reader, file_id: {}",
file_id,
);
self.remote_blob_reader(file_id).await? self.remote_blob_reader(file_id).await?
} }
}; };

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData; use parquet::file::metadata::ParquetMetaData;
use crate::config::CompressionMethod;
use crate::sst::file::FileTimeRange; use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput; use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -49,6 +50,8 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize, pub write_buffer_size: ReadableSize,
/// Row group size. /// Row group size.
pub row_group_size: usize, pub row_group_size: usize,
/// Compression method.
pub compression_method: CompressionMethod,
} }
impl Default for WriteOptions { impl Default for WriteOptions {
@@ -56,6 +59,7 @@ impl Default for WriteOptions {
WriteOptions { WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE,
compression_method: CompressionMethod::default(),
} }
} }
} }

View File

@@ -193,6 +193,12 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size; let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file. // Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
common_telemetry::debug!(
"Parquet read metadata done, region_id: {}, file_id: {}, elapsed: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
start.elapsed(),
);
// Decodes region metadata. // Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST. // Gets the metadata stored in the SST.
@@ -476,6 +482,12 @@ impl ParquetReaderBuilder {
return false; return false;
} }
common_telemetry::debug!(
"Parquet prune by inverted index start, region_id: {}, file_id: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
);
let apply_output = match index_applier.apply(self.file_handle.file_id()).await { let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
Ok(output) => output, Ok(output) => output,
Err(err) => { Err(err) => {
@@ -497,6 +509,12 @@ impl ParquetReaderBuilder {
} }
}; };
common_telemetry::debug!(
"Parquet prune by inverted index stop, region_id: {}, file_id: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
);
let segment_row_count = apply_output.segment_row_count; let segment_row_count = apply_output.segment_row_count;
let grouped_in_row_groups = apply_output let grouped_in_row_groups = apply_output
.matched_segment_ids .matched_segment_ids

View File

@@ -33,7 +33,7 @@ use tokio::task::yield_now;
use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheManagerRef, PageKey, PageValue}; use crate::cache::{CacheManagerRef, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::metrics::{READ_STAGE_BUILD_PAGE_READER, READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::FileId; use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::page_reader::RowGroupCachedReader; use crate::sst::parquet::page_reader::RowGroupCachedReader;
@@ -308,6 +308,7 @@ impl<'a> InMemoryRowGroup<'a> {
/// Creates a page reader to read column at `i`. /// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> { fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let _timer = READ_STAGE_BUILD_PAGE_READER.start_timer();
if let Some(cached_pages) = &self.column_uncompressed_pages[i] { if let Some(cached_pages) = &self.column_uncompressed_pages[i] {
debug_assert!(!cached_pages.row_group.is_empty()); debug_assert!(!cached_pages.row_group.is_empty());
// Hits the row group level page cache. // Hits the row group level page cache.

View File

@@ -34,6 +34,7 @@ use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::config::CompressionMethod;
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source}; use crate::read::{Batch, Source};
use crate::sst::index::Indexer; use crate::sst::index::Indexer;
@@ -217,9 +218,14 @@ where
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid. // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
let compression = match opts.compression_method {
CompressionMethod::Zstd => Compression::ZSTD(ZstdLevel::default()),
CompressionMethod::Lz4 => Compression::LZ4_RAW,
CompressionMethod::None => Compression::UNCOMPRESSED,
};
let props_builder = WriterProperties::builder() let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta])) .set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default())) .set_compression(compression)
.set_encoding(Encoding::PLAIN) .set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size); .set_max_row_group_size(opts.row_group_size);

View File

@@ -79,21 +79,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_ctx.set_error(e); region_ctx.set_error(e);
} }
} }
match wal_writer.write_to_wal().await.map_err(Arc::new) { if !self.config.skip_wal {
Ok(response) => { match wal_writer.write_to_wal().await.map_err(Arc::new) {
for (region_id, region_ctx) in region_ctxs.iter_mut() { Ok(response) => {
// Safety: the log store implementation ensures that either the `write_to_wal` fails and no for (region_id, region_ctx) in region_ctxs.iter_mut() {
// response is returned or the last entry ids for each region do exist. // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
let last_entry_id = response.last_entry_ids.get(region_id).unwrap(); // response is returned or the last entry ids for each region do exist.
region_ctx.set_next_entry_id(last_entry_id + 1); let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
region_ctx.set_next_entry_id(last_entry_id + 1);
}
} }
} Err(e) => {
Err(e) => { // Failed to write wal.
// Failed to write wal. for mut region_ctx in region_ctxs.into_values() {
for mut region_ctx in region_ctxs.into_values() { region_ctx.set_error(e.clone());
region_ctx.set_error(e.clone()); }
return;
} }
return;
} }
} }
} }

View File

@@ -224,6 +224,11 @@ impl MergeScanExec {
region_id, region_id,
plan: plan.clone(), plan: plan.clone(),
}; };
common_telemetry::info!(
"Merge scan start poll stream, partition: {}, region_id: {}",
partition,
region_id,
);
let do_get_start = Instant::now(); let do_get_start = Instant::now();
let mut stream = region_query_handler let mut stream = region_query_handler
.do_get(request) .do_get(request)
@@ -255,7 +260,7 @@ impl MergeScanExec {
// reset poll timer // reset poll timer
poll_timer = Instant::now(); poll_timer = Instant::now();
} }
common_telemetry::debug!( common_telemetry::info!(
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}", "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
); );

View File

@@ -25,6 +25,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DataFusionResult; use datafusion_common::Result as DataFusionResult;
use datafusion_physical_expr::expressions::Column as PhysicalColumn; use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use store_api::region_engine::PartitionRange; use store_api::region_engine::PartitionRange;
use store_api::storage::RegionId;
use table::table::scan::RegionScanExec; use table::table::scan::RegionScanExec;
use crate::part_sort::PartSortExec; use crate::part_sort::PartSortExec;
@@ -100,6 +101,7 @@ impl WindowedSortPhysicalRule {
sort_exec.input().clone() sort_exec.input().clone()
} else { } else {
Arc::new(PartSortExec::new( Arc::new(PartSortExec::new(
scanner_info.region_id,
first_sort_expr.clone(), first_sort_expr.clone(),
sort_exec.fetch(), sort_exec.fetch(),
scanner_info.partition_ranges.clone(), scanner_info.partition_ranges.clone(),
@@ -108,6 +110,7 @@ impl WindowedSortPhysicalRule {
}; };
let windowed_sort_exec = WindowedSortExec::try_new( let windowed_sort_exec = WindowedSortExec::try_new(
scanner_info.region_id,
first_sort_expr, first_sort_expr,
sort_exec.fetch(), sort_exec.fetch(),
scanner_info.partition_ranges, scanner_info.partition_ranges,
@@ -143,6 +146,7 @@ impl WindowedSortPhysicalRule {
#[derive(Debug)] #[derive(Debug)]
struct ScannerInfo { struct ScannerInfo {
region_id: RegionId,
partition_ranges: Vec<Vec<PartitionRange>>, partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String, time_index: String,
tag_columns: Vec<String>, tag_columns: Vec<String>,
@@ -152,6 +156,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let mut partition_ranges = None; let mut partition_ranges = None;
let mut time_index = None; let mut time_index = None;
let mut tag_columns = None; let mut tag_columns = None;
let mut region_id = None;
let mut is_batch_coalesced = false; let mut is_batch_coalesced = false;
input.transform_up(|plan| { input.transform_up(|plan| {
@@ -172,6 +177,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = Some(region_scan_exec.time_index()); time_index = Some(region_scan_exec.time_index());
tag_columns = Some(region_scan_exec.tag_columns()); tag_columns = Some(region_scan_exec.tag_columns());
region_id = Some(region_scan_exec.region_id());
// set distinguish_partition_ranges to true, this is an incorrect workaround // set distinguish_partition_ranges to true, this is an incorrect workaround
if !is_batch_coalesced { if !is_batch_coalesced {
@@ -184,6 +190,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let result = try { let result = try {
ScannerInfo { ScannerInfo {
region_id: region_id?,
partition_ranges: partition_ranges?, partition_ranges: partition_ranges?,
time_index: time_index?, time_index: time_index?,
tag_columns: tag_columns?, tag_columns: tag_columns?,

View File

@@ -41,6 +41,7 @@ use futures::Stream;
use itertools::Itertools; use itertools::Itertools;
use snafu::location; use snafu::location;
use store_api::region_engine::PartitionRange; use store_api::region_engine::PartitionRange;
use store_api::storage::RegionId;
use crate::{array_iter_helper, downcast_ts_array}; use crate::{array_iter_helper, downcast_ts_array};
@@ -51,6 +52,7 @@ use crate::{array_iter_helper, downcast_ts_array};
/// and this operator will sort each partition independently within the partition. /// and this operator will sort each partition independently within the partition.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PartSortExec { pub struct PartSortExec {
region_id: RegionId,
/// Physical sort expressions(that is, sort by timestamp) /// Physical sort expressions(that is, sort by timestamp)
expression: PhysicalSortExpr, expression: PhysicalSortExpr,
limit: Option<usize>, limit: Option<usize>,
@@ -63,6 +65,7 @@ pub struct PartSortExec {
impl PartSortExec { impl PartSortExec {
pub fn new( pub fn new(
region_id: RegionId,
expression: PhysicalSortExpr, expression: PhysicalSortExpr,
limit: Option<usize>, limit: Option<usize>,
partition_ranges: Vec<Vec<PartitionRange>>, partition_ranges: Vec<Vec<PartitionRange>>,
@@ -76,6 +79,7 @@ impl PartSortExec {
); );
Self { Self {
region_id,
expression, expression,
limit, limit,
input, input,
@@ -94,14 +98,20 @@ impl PartSortExec {
self.input.execute(partition, context.clone())?; self.input.execute(partition, context.clone())?;
if partition >= self.partition_ranges.len() { if partition >= self.partition_ranges.len() {
common_telemetry::error!(
"to_stream: Partition index out of range: {} >= {}",
partition,
self.partition_ranges.len()
);
internal_err!( internal_err!(
"Partition index out of range: {} >= {}", "to_stream: Partition index out of range: {} >= {}",
partition, partition,
self.partition_ranges.len() self.partition_ranges.len()
)?; )?;
} }
let df_stream = Box::pin(PartSortStream::new( let df_stream = Box::pin(PartSortStream::new(
self.region_id,
context, context,
self, self,
self.limit, self.limit,
@@ -156,6 +166,7 @@ impl ExecutionPlan for PartSortExec {
internal_err!("No children found")? internal_err!("No children found")?
}; };
Ok(Arc::new(Self::new( Ok(Arc::new(Self::new(
self.region_id,
self.expression.clone(), self.expression.clone(),
self.limit, self.limit,
self.partition_ranges.clone(), self.partition_ranges.clone(),
@@ -186,6 +197,7 @@ impl ExecutionPlan for PartSortExec {
} }
struct PartSortStream { struct PartSortStream {
region_id: RegionId,
/// Memory pool for this stream /// Memory pool for this stream
reservation: MemoryReservation, reservation: MemoryReservation,
buffer: Vec<DfRecordBatch>, buffer: Vec<DfRecordBatch>,
@@ -199,11 +211,14 @@ struct PartSortStream {
#[allow(dead_code)] // this is used under #[debug_assertions] #[allow(dead_code)] // this is used under #[debug_assertions]
partition: usize, partition: usize,
cur_part_idx: usize, cur_part_idx: usize,
evaluating_batch: Option<DfRecordBatch>,
metrics: BaselineMetrics, metrics: BaselineMetrics,
pending: bool,
} }
impl PartSortStream { impl PartSortStream {
fn new( fn new(
region_id: RegionId,
context: Arc<TaskContext>, context: Arc<TaskContext>,
sort: &PartSortExec, sort: &PartSortExec,
limit: Option<usize>, limit: Option<usize>,
@@ -211,7 +226,11 @@ impl PartSortStream {
partition_ranges: Vec<PartitionRange>, partition_ranges: Vec<PartitionRange>,
partition: usize, partition: usize,
) -> Self { ) -> Self {
common_telemetry::info!(
"[PartSortStream] Region {region_id} Partition {partition} new stream with ranges: {partition_ranges:?}"
);
Self { Self {
region_id,
reservation: MemoryConsumer::new("PartSortStream".to_string()) reservation: MemoryConsumer::new("PartSortStream".to_string())
.register(&context.runtime_env().memory_pool), .register(&context.runtime_env().memory_pool),
buffer: Vec::new(), buffer: Vec::new(),
@@ -224,7 +243,9 @@ impl PartSortStream {
partition_ranges, partition_ranges,
partition, partition,
cur_part_idx: 0, cur_part_idx: 0,
evaluating_batch: None,
metrics: BaselineMetrics::new(&sort.metrics, partition), metrics: BaselineMetrics::new(&sort.metrics, partition),
pending: false,
} }
} }
} }
@@ -275,8 +296,14 @@ impl PartSortStream {
min_max_idx: (usize, usize), min_max_idx: (usize, usize),
) -> datafusion_common::Result<()> { ) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() { if self.cur_part_idx >= self.partition_ranges.len() {
common_telemetry::error!(
"check_in_range: Partition index out of range: {} >= {}, ranges: {:?}",
self.cur_part_idx,
self.partition_ranges.len(),
self.partition_ranges,
);
internal_err!( internal_err!(
"Partition index out of range: {} >= {}", "check_in_range: Partition index out of range: {} >= {}",
self.cur_part_idx, self.cur_part_idx,
self.partition_ranges.len() self.partition_ranges.len()
)?; )?;
@@ -308,8 +335,18 @@ impl PartSortStream {
// check if the current partition index is out of range // check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() { if self.cur_part_idx >= self.partition_ranges.len() {
common_telemetry::error!(
"try_find_next_range: Region {} Partition {} index out of range: {} >= {}, ranges: {:?}",
self.region_id,
self.partition,
self.cur_part_idx,
self.partition_ranges.len(),
self.partition_ranges,
);
internal_err!( internal_err!(
"Partition index out of range: {} >= {}", "try_find_next_range: Region {} Partition {} index out of range: {} >= {}",
self.region_id,
self.partition,
self.cur_part_idx, self.cur_part_idx,
self.partition_ranges.len() self.partition_ranges.len()
)?; )?;
@@ -324,10 +361,26 @@ impl PartSortStream {
)?, )?,
); );
match sort_column.data_type() {
arrow_schema::DataType::Timestamp(unit, _) => {
assert_eq!(cur_range.start.unit().as_arrow_time_unit(), *unit);
assert_eq!(cur_range.end.unit().as_arrow_time_unit(), *unit);
}
_ => panic!(
"Unsupported data type for sort column: {:?}",
sort_column.data_type()
),
}
for (idx, val) in sort_column_iter { for (idx, val) in sort_column_iter {
// ignore vacant time index data // ignore vacant time index data
if let Some(val) = val { if let Some(val) = val {
if val >= cur_range.end.value() || val < cur_range.start.value() { if val >= cur_range.end.value() || val < cur_range.start.value() {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} find val {} at index {} out of range {}: {:?}",
self.region_id, self.partition, val, idx, self.cur_part_idx, cur_range,
);
return Ok(Some(idx)); return Ok(Some(idx));
} }
} }
@@ -343,6 +396,15 @@ impl PartSortStream {
if self.buffer.is_empty() { if self.buffer.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone())); return Ok(DfRecordBatch::new_empty(self.schema.clone()));
} }
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} sort {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
self.buffer.iter().map(|b| b.num_rows()).sum::<usize>(),
);
let mut sort_columns = Vec::with_capacity(self.buffer.len()); let mut sort_columns = Vec::with_capacity(self.buffer.len());
let mut opt = None; let mut opt = None;
for batch in self.buffer.iter() { for batch in self.buffer.iter() {
@@ -425,10 +487,87 @@ impl PartSortStream {
Ok(sorted) Ok(sorted)
} }
fn split_batch(
&mut self,
batch: DfRecordBatch,
) -> datafusion_common::Result<Option<DfRecordBatch>> {
if batch.num_rows() == 0 {
return Ok(None);
}
let sort_column = self
.expression
.expr
.evaluate(&batch)?
.into_array(batch.num_rows())?;
let next_range_idx = self.try_find_next_range(&sort_column)?;
let Some(idx) = next_range_idx else {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(batch);
// keep polling input for next batch
return Ok(None);
};
let this_range = batch.slice(0, idx);
let remaining_range = batch.slice(idx, batch.num_rows() - idx);
if this_range.num_rows() != 0 {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(this_range);
}
// mark end of current PartitionRange
let sorted_batch = self.sort_buffer();
// step to next proper PartitionRange
self.cur_part_idx += 1;
let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
if self.try_find_next_range(&next_sort_column)?.is_some() {
// remaining batch still contains data that exceeds the current partition range
// register the remaining batch for next polling
self.evaluating_batch = Some(remaining_range);
} else {
// remaining batch is within the current partition range
// push to the buffer and continue polling
if remaining_range.num_rows() != 0 {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(remaining_range);
}
}
sorted_batch.map(|x| if x.num_rows() == 0 { None } else { Some(x) })
}
pub fn poll_next_inner( pub fn poll_next_inner(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> { ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
if self.pending {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} poll from pending",
self.region_id,
self.partition,
self.cur_part_idx,
);
self.as_mut().pending = false;
}
loop { loop {
// no more input, sort the buffer and return // no more input, sort the buffer and return
if self.input_complete { if self.input_complete {
@@ -439,57 +578,50 @@ impl PartSortStream {
} }
} }
// if there is a remaining batch being evaluated from last run,
// split on it instead of fetching new batch
if let Some(evaluating_batch) = self.evaluating_batch.take()
&& evaluating_batch.num_rows() != 0
{
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
return Poll::Ready(Some(Ok(sorted_batch)));
} else {
continue;
}
}
// fetch next batch from input // fetch next batch from input
let res = self.input.as_mut().poll_next(cx); let res = self.input.as_mut().poll_next(cx);
match res { match res {
Poll::Ready(Some(Ok(batch))) => { Poll::Ready(Some(Ok(batch))) => {
let sort_column = self if let Some(sorted_batch) = self.split_batch(batch)? {
.expression
.expr
.evaluate(&batch)?
.into_array(batch.num_rows())?;
let next_range_idx = self.try_find_next_range(&sort_column)?;
// `Some` means the current range is finished, split the batch into two parts and sort
if let Some(idx) = next_range_idx {
let this_range = batch.slice(0, idx);
let next_range = batch.slice(idx, batch.num_rows() - idx);
if this_range.num_rows() != 0 {
self.buffer.push(this_range);
}
// mark end of current PartitionRange
let sorted_batch = self.sort_buffer()?;
let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
// step to next proper PartitionRange
loop {
self.cur_part_idx += 1;
if next_sort_column.is_empty()
|| self.try_find_next_range(&next_sort_column)?.is_none()
{
break;
}
}
// push the next range to the buffer
if next_range.num_rows() != 0 {
self.buffer.push(next_range);
}
if sorted_batch.num_rows() == 0 {
// Current part is empty, continue polling next part.
continue;
}
return Poll::Ready(Some(Ok(sorted_batch))); return Poll::Ready(Some(Ok(sorted_batch)));
} else {
continue;
} }
self.buffer.push(batch);
// keep polling until boundary(a empty RecordBatch) is reached
continue;
} }
// input stream end, sort the buffer and return // input stream end, mark and continue
Poll::Ready(None) => { Poll::Ready(None) => {
self.input_complete = true; self.input_complete = true;
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} input complete",
self.region_id,
self.partition,
self.cur_part_idx,
);
continue; continue;
} }
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending, Poll::Pending => {
self.as_mut().pending = true;
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} is pending",
self.region_id,
self.partition,
self.cur_part_idx,
);
return Poll::Pending;
}
} }
} }
} }
@@ -528,6 +660,7 @@ mod test {
use crate::test_util::{new_ts_array, MockInputExec}; use crate::test_util::{new_ts_array, MockInputExec};
#[tokio::test] #[tokio::test]
#[ignore = "behavior changed"]
async fn fuzzy_test() { async fn fuzzy_test() {
let test_cnt = 100; let test_cnt = 100;
let part_cnt_bound = 100; let part_cnt_bound = 100;
@@ -570,7 +703,11 @@ mod test {
// generate each `PartitionRange`'s timestamp range // generate each `PartitionRange`'s timestamp range
let (start, end) = if descending { let (start, end) = if descending {
let end = bound_val let end = bound_val
.map(|i| i.checked_sub(rng.i64(0..range_offset_bound)).expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")) .map(
|i| i
.checked_sub(rng.i64(0..range_offset_bound))
.expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")
)
.unwrap_or_else(|| rng.i64(..)); .unwrap_or_else(|| rng.i64(..));
bound_val = Some(end); bound_val = Some(end);
let start = end - rng.i64(1..range_size_bound); let start = end - rng.i64(1..range_size_bound);
@@ -658,7 +795,7 @@ mod test {
((5, 10), vec![vec![5, 6], vec![7, 8]]), ((5, 10), vec![vec![5, 6], vec![7, 8]]),
], ],
false, false,
vec![vec![1, 2, 3, 4, 5, 6, 7, 8, 9], vec![5, 6, 7, 8]], vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]],
), ),
( (
TimeUnit::Millisecond, TimeUnit::Millisecond,
@@ -744,7 +881,14 @@ mod test {
}) })
.collect_vec(); .collect_vec();
run_test(0, input_ranged_data, schema.clone(), opt, expected_output).await; run_test(
identifier,
input_ranged_data,
schema.clone(),
opt,
expected_output,
)
.await;
} }
} }
@@ -767,6 +911,7 @@ mod test {
let mock_input = MockInputExec::new(batches, schema.clone()); let mock_input = MockInputExec::new(batches, schema.clone());
let exec = PartSortExec::new( let exec = PartSortExec::new(
RegionId::new(0, 0),
PhysicalSortExpr { PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)), expr: Arc::new(Column::new("ts", 0)),
options: opt, options: opt,
@@ -794,8 +939,8 @@ mod test {
buf.push(b','); buf.push(b',');
} }
// TODO(discord9): better ways to print buf // TODO(discord9): better ways to print buf
let _buf = String::from_utf8_lossy(&buf); let buf = String::from_utf8_lossy(&buf);
full_msg += &format!("case_id:{case_id}, real_output"); full_msg += &format!("\ncase_id:{case_id}, real_output \n{buf}\n");
} }
{ {
let mut buf = Vec::with_capacity(10 * real_output.len()); let mut buf = Vec::with_capacity(10 * real_output.len());
@@ -807,8 +952,8 @@ mod test {
buf.append(&mut rb_json); buf.append(&mut rb_json);
buf.push(b','); buf.push(b',');
} }
let _buf = String::from_utf8_lossy(&buf); let buf = String::from_utf8_lossy(&buf);
full_msg += &format!("case_id:{case_id}, expected_output"); full_msg += &format!("case_id:{case_id}, expected_output \n{buf}");
} }
panic!( panic!(
"case_{} failed, opt: {:?}, full msg: {}", "case_{} failed, opt: {:?}, full msg: {}",

View File

@@ -20,6 +20,7 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Instant;
use arrow::array::{Array, ArrayRef}; use arrow::array::{Array, ArrayRef};
use arrow::compute::SortColumn; use arrow::compute::SortColumn;
@@ -45,6 +46,7 @@ use futures::Stream;
use itertools::Itertools; use itertools::Itertools;
use snafu::ResultExt; use snafu::ResultExt;
use store_api::region_engine::PartitionRange; use store_api::region_engine::PartitionRange;
use store_api::storage::RegionId;
use crate::error::{QueryExecutionSnafu, Result}; use crate::error::{QueryExecutionSnafu, Result};
@@ -64,6 +66,7 @@ use crate::error::{QueryExecutionSnafu, Result};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WindowedSortExec { pub struct WindowedSortExec {
region_id: RegionId,
/// Physical sort expressions(that is, sort by timestamp) /// Physical sort expressions(that is, sort by timestamp)
expression: PhysicalSortExpr, expression: PhysicalSortExpr,
/// Optional number of rows to fetch. Stops producing rows after this fetch /// Optional number of rows to fetch. Stops producing rows after this fetch
@@ -110,6 +113,7 @@ fn check_partition_range_monotonicity(
impl WindowedSortExec { impl WindowedSortExec {
pub fn try_new( pub fn try_new(
region_id: RegionId,
expression: PhysicalSortExpr, expression: PhysicalSortExpr,
fetch: Option<usize>, fetch: Option<usize>,
ranges: Vec<Vec<PartitionRange>>, ranges: Vec<Vec<PartitionRange>>,
@@ -126,6 +130,8 @@ impl WindowedSortExec {
input.execution_mode(), input.execution_mode(),
); );
let start = Instant::now();
let mut all_avail_working_range = Vec::with_capacity(ranges.len()); let mut all_avail_working_range = Vec::with_capacity(ranges.len());
for r in &ranges { for r in &ranges {
let overlap_counts = split_overlapping_ranges(r); let overlap_counts = split_overlapping_ranges(r);
@@ -134,7 +140,13 @@ impl WindowedSortExec {
all_avail_working_range.push(working_ranges); all_avail_working_range.push(working_ranges);
} }
common_telemetry::info!(
"WindowSortExec compute working ranges, cost: {:?}",
start.elapsed()
);
Ok(Self { Ok(Self {
region_id,
expression, expression,
fetch, fetch,
ranges, ranges,
@@ -157,6 +169,7 @@ impl WindowedSortExec {
self.input.execute(partition, context.clone())?; self.input.execute(partition, context.clone())?;
let df_stream = Box::pin(WindowedSortStream::new( let df_stream = Box::pin(WindowedSortStream::new(
self.region_id,
context, context,
self, self,
input_stream, input_stream,
@@ -209,6 +222,7 @@ impl ExecutionPlan for WindowedSortExec {
internal_err!("No children found")? internal_err!("No children found")?
}; };
let new = Self::try_new( let new = Self::try_new(
self.region_id,
self.expression.clone(), self.expression.clone(),
self.fetch, self.fetch,
self.ranges.clone(), self.ranges.clone(),
@@ -283,10 +297,14 @@ pub struct WindowedSortStream {
ranges: Vec<PartitionRange>, ranges: Vec<PartitionRange>,
/// Execution metrics /// Execution metrics
metrics: BaselineMetrics, metrics: BaselineMetrics,
pending: bool,
region_id: RegionId,
partition: usize,
} }
impl WindowedSortStream { impl WindowedSortStream {
pub fn new( pub fn new(
region_id: RegionId,
context: Arc<TaskContext>, context: Arc<TaskContext>,
exec: &WindowedSortExec, exec: &WindowedSortExec,
input: DfSendableRecordBatchStream, input: DfSendableRecordBatchStream,
@@ -310,6 +328,9 @@ impl WindowedSortStream {
all_avail_working_range: exec.all_avail_working_range[partition].clone(), all_avail_working_range: exec.all_avail_working_range[partition].clone(),
ranges: exec.ranges[partition].clone(), ranges: exec.ranges[partition].clone(),
metrics: BaselineMetrics::new(&exec.metrics, partition), metrics: BaselineMetrics::new(&exec.metrics, partition),
pending: false,
region_id,
partition,
} }
} }
} }
@@ -410,6 +431,12 @@ impl WindowedSortStream {
continue; continue;
} }
Poll::Pending => { Poll::Pending => {
self.pending = true;
common_telemetry::info!(
"[WindowedSortStream] Region {} Partition {} is pending",
self.region_id,
self.partition,
);
return Poll::Pending; return Poll::Pending;
} }
} }
@@ -450,7 +477,15 @@ impl WindowedSortStream {
self.is_terminated = true; self.is_terminated = true;
None None
} }
Poll::Pending => return Poll::Pending, Poll::Pending => {
self.as_mut().pending = true;
common_telemetry::info!(
"[WindowedSortStream] Region {} Partition {} is pending",
self.region_id,
self.partition,
);
return Poll::Pending;
}
}; };
let Some(SortedRunSet { let Some(SortedRunSet {
@@ -665,6 +700,15 @@ impl Stream for WindowedSortStream {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> { ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
if self.pending {
common_telemetry::info!(
"[WindowedSortStream] Region {} Partition {} poll from pending",
self.region_id,
self.partition,
);
self.as_mut().pending = false;
}
let result = self.as_mut().poll_next_inner(cx); let result = self.as_mut().poll_next_inner(cx);
self.metrics.record_poll(result) self.metrics.record_poll(result)
} }
@@ -2518,6 +2562,7 @@ mod test {
let mock_input = MockInputExec::new(batches, self.schema.clone()); let mock_input = MockInputExec::new(batches, self.schema.clone());
let exec = WindowedSortExec::try_new( let exec = WindowedSortExec::try_new(
RegionId::new(0, 0),
self.expression.clone(), self.expression.clone(),
self.fetch, self.fetch,
vec![ranges], vec![ranges],

View File

@@ -77,6 +77,7 @@ use crate::query_handler::{
use crate::server::Server; use crate::server::Server;
pub mod authorize; pub mod authorize;
pub mod dump;
pub mod dyn_log; pub mod dyn_log;
pub mod event; pub mod event;
pub mod handler; pub mod handler;
@@ -744,6 +745,7 @@ impl HttpServer {
"/log_level", "/log_level",
routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler), routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler),
) )
.route("/dump_tasks", routing::get(dump::dump_tasks_handler))
.nest( .nest(
"/prof", "/prof",
Router::new() Router::new()

View File

@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Write;
use std::time::Duration;
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::runtime::Handle;
use crate::error::Result;
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(default)]
pub struct DumpQuery {
seconds: u64,
path: String,
}
impl Default for DumpQuery {
fn default() -> DumpQuery {
DumpQuery {
seconds: 5,
path: "/tmp/greptimedb/dump-tasks.txt".to_string(),
}
}
}
#[axum_macros::debug_handler]
pub async fn dump_tasks_handler(Query(req): Query<DumpQuery>) -> Result<impl IntoResponse> {
common_telemetry::info!("Dump start, request: {:?}", req);
let handle = Handle::current();
let Ok(mut file) = tokio::fs::File::create(&req.path).await.inspect_err(|e| {
common_telemetry::error!(e; "Failed to open to {}", req.path);
}) else {
return Ok((StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file."));
};
if let Ok(dump) = tokio::time::timeout(Duration::from_secs(req.seconds), handle.dump()).await {
for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
let mut lines = String::new();
writeln!(lines, "TASK {i}:").unwrap();
writeln!(lines, "{trace}\n").unwrap();
if let Err(e) = file.write_all(lines.as_bytes()).await {
common_telemetry::error!(e; "Failed to open to {}", req.path);
return Ok((StatusCode::INTERNAL_SERVER_ERROR, "Failed to write file."));
}
}
} else {
common_telemetry::info!("Dump tasks timeout.");
return Ok((StatusCode::REQUEST_TIMEOUT, "Dump tasks timeout."));
}
common_telemetry::info!("Dump tasks done.");
Ok((StatusCode::OK, "Dump tasks done."))
}

View File

@@ -36,6 +36,7 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSort
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::RegionId;
use crate::table::metrics::StreamMetrics; use crate::table::metrics::StreamMetrics;
@@ -166,6 +167,10 @@ impl RegionScanExec {
.map(|col| col.column_schema.name.clone()) .map(|col| col.column_schema.name.clone())
.collect() .collect()
} }
pub fn region_id(&self) -> RegionId {
self.scanner.lock().unwrap().metadata().region_id
}
} }
impl ExecutionPlan for RegionScanExec { impl ExecutionPlan for RegionScanExec {

View File

@@ -897,6 +897,8 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32 parallel_scan_channel_size = 32
allow_stale_entries = false allow_stale_entries = false
min_compaction_interval = "0s" min_compaction_interval = "0s"
skip_wal = false
compression_method = "zstd"
[region_engine.mito.index] [region_engine.mito.index]
aux_path = "" aux_path = ""