mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
Compare commits
55 Commits
async_deco
...
transform-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6247de2d50 | ||
|
|
a2eb46132f | ||
|
|
3f9bf48161 | ||
|
|
9bd2e006b5 | ||
|
|
031421ca91 | ||
|
|
999f3a40c2 | ||
|
|
50d28e0a00 | ||
|
|
770a850437 | ||
|
|
65e53b5bc4 | ||
|
|
9a6c7aa4d6 | ||
|
|
4f446b95d8 | ||
|
|
9ad4200f55 | ||
|
|
53d456651f | ||
|
|
f11c5acb0f | ||
|
|
8536a1ec6e | ||
|
|
fce8c968da | ||
|
|
98a6ac973c | ||
|
|
8f79e421c3 | ||
|
|
e8b326382f | ||
|
|
56781e7fbc | ||
|
|
7d342b3d95 | ||
|
|
a22667bf3c | ||
|
|
29b9b7db0c | ||
|
|
a66909a562 | ||
|
|
8137b8ff3d | ||
|
|
7c5cd2922a | ||
|
|
a1d0dcf2c3 | ||
|
|
c391171f99 | ||
|
|
f44862aaac | ||
|
|
8bf795d88c | ||
|
|
3bbf4e0232 | ||
|
|
83da3950da | ||
|
|
957b5effd5 | ||
|
|
f59e28006a | ||
|
|
3e5bbdf71e | ||
|
|
b8ac19c480 | ||
|
|
92b274a856 | ||
|
|
6bdac25f0a | ||
|
|
a9f3c4b17c | ||
|
|
e003eaab36 | ||
|
|
6e590da412 | ||
|
|
ff5fa40b85 | ||
|
|
d4aa4159d4 | ||
|
|
960f6d821b | ||
|
|
9c5d044238 | ||
|
|
70c354eed6 | ||
|
|
23bf663d58 | ||
|
|
817648eac5 | ||
|
|
03b29439e2 | ||
|
|
712f4ca0ef | ||
|
|
60bacff57e | ||
|
|
6208772ba4 | ||
|
|
67184c0498 | ||
|
|
1dd908fdf7 | ||
|
|
8179b4798e |
@@ -267,13 +267,15 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
|
|||||||
rev = "a10facb353b41460eeb98578868ebf19c2084fac"
|
rev = "a10facb353b41460eeb98578868ebf19c2084fac"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = 1
|
# debug = 1
|
||||||
|
split-debuginfo = "off"
|
||||||
|
|
||||||
[profile.nightly]
|
[profile.nightly]
|
||||||
inherits = "release"
|
inherits = "release"
|
||||||
strip = "debuginfo"
|
split-debuginfo = "off"
|
||||||
|
# strip = "debuginfo"
|
||||||
lto = "thin"
|
lto = "thin"
|
||||||
debug = false
|
# debug = false
|
||||||
incremental = false
|
incremental = false
|
||||||
|
|
||||||
[profile.ci]
|
[profile.ci]
|
||||||
|
|||||||
@@ -192,6 +192,7 @@ pub fn init_global_logging(
|
|||||||
if opts.log_format == LogFormat::Json {
|
if opts.log_format == LogFormat::Json {
|
||||||
Some(
|
Some(
|
||||||
Layer::new()
|
Layer::new()
|
||||||
|
.with_thread_ids(true)
|
||||||
.json()
|
.json()
|
||||||
.with_writer(writer)
|
.with_writer(writer)
|
||||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||||
@@ -200,6 +201,7 @@ pub fn init_global_logging(
|
|||||||
} else {
|
} else {
|
||||||
Some(
|
Some(
|
||||||
Layer::new()
|
Layer::new()
|
||||||
|
.with_thread_ids(true)
|
||||||
.with_writer(writer)
|
.with_writer(writer)
|
||||||
.with_ansi(atty::is(atty::Stream::Stdout))
|
.with_ansi(atty::is(atty::Stream::Stdout))
|
||||||
.boxed(),
|
.boxed(),
|
||||||
@@ -228,13 +230,20 @@ pub fn init_global_logging(
|
|||||||
if opts.log_format == LogFormat::Json {
|
if opts.log_format == LogFormat::Json {
|
||||||
Some(
|
Some(
|
||||||
Layer::new()
|
Layer::new()
|
||||||
|
.with_thread_ids(true)
|
||||||
.json()
|
.json()
|
||||||
.with_writer(writer)
|
.with_writer(writer)
|
||||||
.with_ansi(false)
|
.with_ansi(false)
|
||||||
.boxed(),
|
.boxed(),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
|
Some(
|
||||||
|
Layer::new()
|
||||||
|
.with_thread_ids(true)
|
||||||
|
.with_writer(writer)
|
||||||
|
.with_ansi(false)
|
||||||
|
.boxed(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -260,6 +269,7 @@ pub fn init_global_logging(
|
|||||||
Some(
|
Some(
|
||||||
Layer::new()
|
Layer::new()
|
||||||
.json()
|
.json()
|
||||||
|
.with_thread_ids(true)
|
||||||
.with_writer(writer)
|
.with_writer(writer)
|
||||||
.with_ansi(false)
|
.with_ansi(false)
|
||||||
.with_filter(filter::LevelFilter::ERROR)
|
.with_filter(filter::LevelFilter::ERROR)
|
||||||
@@ -268,6 +278,7 @@ pub fn init_global_logging(
|
|||||||
} else {
|
} else {
|
||||||
Some(
|
Some(
|
||||||
Layer::new()
|
Layer::new()
|
||||||
|
.with_thread_ids(true)
|
||||||
.with_writer(writer)
|
.with_writer(writer)
|
||||||
.with_ansi(false)
|
.with_ansi(false)
|
||||||
.with_filter(filter::LevelFilter::ERROR)
|
.with_filter(filter::LevelFilter::ERROR)
|
||||||
|
|||||||
@@ -62,6 +62,8 @@ impl IndexApplier for PredicatesIndexApplier {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!("Predicate apply, apply name start, name: {}", name);
|
||||||
|
|
||||||
let Some(meta) = metadata.metas.get(name) else {
|
let Some(meta) = metadata.metas.get(name) else {
|
||||||
match context.index_not_found_strategy {
|
match context.index_not_found_strategy {
|
||||||
IndexNotFoundStrategy::ReturnEmpty => {
|
IndexNotFoundStrategy::ReturnEmpty => {
|
||||||
@@ -85,6 +87,8 @@ impl IndexApplier for PredicatesIndexApplier {
|
|||||||
let bm = mapper.map_values(&values).await?;
|
let bm = mapper.map_values(&values).await?;
|
||||||
|
|
||||||
bitmap &= bm;
|
bitmap &= bm;
|
||||||
|
|
||||||
|
common_telemetry::info!("Predicate apply, apply name end, name: {}", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
output.matched_segment_ids = bitmap;
|
output.matched_segment_ids = bitmap;
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
|||||||
use common_error::ext::ErrorExt;
|
use common_error::ext::ErrorExt;
|
||||||
use common_macro::stack_trace_debug;
|
use common_macro::stack_trace_debug;
|
||||||
use common_runtime::error::Error as RuntimeError;
|
use common_runtime::error::Error as RuntimeError;
|
||||||
|
use common_runtime::JoinError;
|
||||||
use serde_json::error::Error as JsonError;
|
use serde_json::error::Error as JsonError;
|
||||||
use snafu::{Location, Snafu};
|
use snafu::{Location, Snafu};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
@@ -306,6 +307,14 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Join error"))]
|
||||||
|
Join {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
#[snafu(source)]
|
||||||
|
error: JoinError,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ErrorExt for Error {
|
impl ErrorExt for Error {
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ use store_api::storage::RegionId;
|
|||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
|
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
|
||||||
IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu,
|
IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, JoinSnafu,
|
||||||
RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
|
OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
|
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
|
||||||
@@ -250,6 +250,12 @@ impl LogStore for RaftEngineLogStore {
|
|||||||
.engine
|
.engine
|
||||||
.write(&mut batch, sync)
|
.write(&mut batch, sync)
|
||||||
.context(RaftEngineSnafu)?;
|
.context(RaftEngineSnafu)?;
|
||||||
|
let engine = self.engine.clone();
|
||||||
|
let _ = common_runtime::spawn_blocking_global(move || {
|
||||||
|
engine.write(&mut batch, sync).context(RaftEngineSnafu)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context(JoinSnafu)?;
|
||||||
|
|
||||||
Ok(AppendBatchResponse { last_entry_ids })
|
Ok(AppendBatchResponse { last_entry_ids })
|
||||||
}
|
}
|
||||||
|
|||||||
67
src/mito2/src/cache/index.rs
vendored
67
src/mito2/src/cache/index.rs
vendored
@@ -87,7 +87,16 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
|||||||
&mut self,
|
&mut self,
|
||||||
dest: &mut Vec<u8>,
|
dest: &mut Vec<u8>,
|
||||||
) -> index::inverted_index::error::Result<usize> {
|
) -> index::inverted_index::error::Result<usize> {
|
||||||
self.inner.read_all(dest).await
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader read_all start, file_id: {}",
|
||||||
|
self.file_id,
|
||||||
|
);
|
||||||
|
let res = self.inner.read_all(dest).await;
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader read_all end, file_id: {}",
|
||||||
|
self.file_id,
|
||||||
|
);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn seek_read(
|
async fn seek_read(
|
||||||
@@ -95,7 +104,20 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
|||||||
offset: u64,
|
offset: u64,
|
||||||
size: u32,
|
size: u32,
|
||||||
) -> index::inverted_index::error::Result<Vec<u8>> {
|
) -> index::inverted_index::error::Result<Vec<u8>> {
|
||||||
self.inner.seek_read(offset, size).await
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader seek_read start, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
let res = self.inner.seek_read(offset, size).await;
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader seek_read end, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> {
|
async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> {
|
||||||
@@ -103,8 +125,16 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
|||||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||||
Ok(cached)
|
Ok(cached)
|
||||||
} else {
|
} else {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader get metadata start, file_id: {}",
|
||||||
|
self.file_id,
|
||||||
|
);
|
||||||
let meta = self.inner.metadata().await?;
|
let meta = self.inner.metadata().await?;
|
||||||
self.cache.put_index_metadata(self.file_id, meta.clone());
|
self.cache.put_index_metadata(self.file_id, meta.clone());
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader get metadata end, file_id: {}",
|
||||||
|
self.file_id,
|
||||||
|
);
|
||||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||||
Ok(meta)
|
Ok(meta)
|
||||||
}
|
}
|
||||||
@@ -115,9 +145,23 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
|||||||
offset: u64,
|
offset: u64,
|
||||||
size: u32,
|
size: u32,
|
||||||
) -> index::inverted_index::error::Result<FstMap> {
|
) -> index::inverted_index::error::Result<FstMap> {
|
||||||
self.get_or_load(offset, size)
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader fst start, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
let res = self
|
||||||
|
.get_or_load(offset, size)
|
||||||
.await
|
.await
|
||||||
.and_then(|r| FstMap::new(r).context(DecodeFstSnafu))
|
.and_then(|r| FstMap::new(r).context(DecodeFstSnafu));
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader fst end, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn bitmap(
|
async fn bitmap(
|
||||||
@@ -125,7 +169,20 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
|||||||
offset: u64,
|
offset: u64,
|
||||||
size: u32,
|
size: u32,
|
||||||
) -> index::inverted_index::error::Result<BitVec> {
|
) -> index::inverted_index::error::Result<BitVec> {
|
||||||
self.get_or_load(offset, size).await.map(BitVec::from_vec)
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader bitmap start, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
let res = self.get_or_load(offset, size).await.map(BitVec::from_vec);
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted index reader bitmap end, file_id: {}, offset: {}, size: {}",
|
||||||
|
self.file_id,
|
||||||
|
offset,
|
||||||
|
size,
|
||||||
|
);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -259,6 +259,7 @@ impl Compactor for DefaultCompactor {
|
|||||||
|
|
||||||
let write_opts = WriteOptions {
|
let write_opts = WriteOptions {
|
||||||
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
|
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
|
||||||
|
compression_method: compaction_region.engine_config.compression_method,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use common_telemetry::{debug, info};
|
use common_telemetry::{info, trace};
|
||||||
use common_time::timestamp::TimeUnit;
|
use common_time::timestamp::TimeUnit;
|
||||||
use common_time::timestamp_millis::BucketAligned;
|
use common_time::timestamp_millis::BucketAligned;
|
||||||
use common_time::Timestamp;
|
use common_time::Timestamp;
|
||||||
@@ -114,7 +114,7 @@ impl TwcsPicker {
|
|||||||
// Files in window exceeds file num limit
|
// Files in window exceeds file num limit
|
||||||
vec![enforce_file_num(&files.files, max_files)]
|
vec![enforce_file_num(&files.files, max_files)]
|
||||||
} else {
|
} else {
|
||||||
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -297,6 +297,9 @@ fn assign_to_windows<'a>(
|
|||||||
let mut windows: HashMap<i64, Window> = HashMap::new();
|
let mut windows: HashMap<i64, Window> = HashMap::new();
|
||||||
// Iterates all files and assign to time windows according to max timestamp
|
// Iterates all files and assign to time windows according to max timestamp
|
||||||
for f in files {
|
for f in files {
|
||||||
|
if f.compacting() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let (_, end) = f.time_range();
|
let (_, end) = f.time_range();
|
||||||
let time_window = end
|
let time_window = end
|
||||||
.convert_to(TimeUnit::Second)
|
.convert_to(TimeUnit::Second)
|
||||||
|
|||||||
@@ -128,6 +128,20 @@ pub struct MitoConfig {
|
|||||||
/// To align with the old behavior, the default value is 0 (no restrictions).
|
/// To align with the old behavior, the default value is 0 (no restrictions).
|
||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
pub min_compaction_interval: Duration,
|
pub min_compaction_interval: Duration,
|
||||||
|
|
||||||
|
/// Skip wal
|
||||||
|
pub skip_wal: bool,
|
||||||
|
/// SST compression method.
|
||||||
|
pub compression_method: CompressionMethod,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum CompressionMethod {
|
||||||
|
#[default]
|
||||||
|
Zstd,
|
||||||
|
Lz4,
|
||||||
|
None,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for MitoConfig {
|
impl Default for MitoConfig {
|
||||||
@@ -161,6 +175,8 @@ impl Default for MitoConfig {
|
|||||||
fulltext_index: FulltextIndexConfig::default(),
|
fulltext_index: FulltextIndexConfig::default(),
|
||||||
memtable: MemtableConfig::default(),
|
memtable: MemtableConfig::default(),
|
||||||
min_compaction_interval: Duration::from_secs(0),
|
min_compaction_interval: Duration::from_secs(0),
|
||||||
|
skip_wal: false,
|
||||||
|
compression_method: CompressionMethod::Zstd,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Adjust buffer and cache size according to system memory if we can.
|
// Adjust buffer and cache size according to system memory if we can.
|
||||||
|
|||||||
@@ -870,6 +870,15 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Timeout: {}", msg))]
|
||||||
|
Timeout {
|
||||||
|
msg: String,
|
||||||
|
#[snafu(source)]
|
||||||
|
error: tokio::time::error::Elapsed,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
@@ -1002,6 +1011,7 @@ impl ErrorExt for Error {
|
|||||||
| ApplyFulltextIndex { source, .. } => source.status_code(),
|
| ApplyFulltextIndex { source, .. } => source.status_code(),
|
||||||
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
|
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
|
||||||
RegionBusy { .. } => StatusCode::RegionBusy,
|
RegionBusy { .. } => StatusCode::RegionBusy,
|
||||||
|
Timeout { .. } => StatusCode::Cancelled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -321,6 +321,7 @@ impl RegionFlushTask {
|
|||||||
|
|
||||||
let mut write_opts = WriteOptions {
|
let mut write_opts = WriteOptions {
|
||||||
write_buffer_size: self.engine_config.sst_write_buffer_size,
|
write_buffer_size: self.engine_config.sst_write_buffer_size,
|
||||||
|
compression_method: self.engine_config.compression_method,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
if let Some(row_group_size) = self.row_group_size {
|
if let Some(row_group_size) = self.row_group_size {
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason";
|
|||||||
pub const FILE_TYPE_LABEL: &str = "file_type";
|
pub const FILE_TYPE_LABEL: &str = "file_type";
|
||||||
/// Region worker id label.
|
/// Region worker id label.
|
||||||
pub const WORKER_LABEL: &str = "worker";
|
pub const WORKER_LABEL: &str = "worker";
|
||||||
|
/// Partition label.
|
||||||
|
pub const PARTITION_LABEL: &str = "partition";
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Global write buffer size in bytes.
|
/// Global write buffer size in bytes.
|
||||||
@@ -134,6 +136,14 @@ lazy_static! {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
|
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
|
||||||
|
pub static ref READ_STAGE_BUILD_PAGE_READER: Histogram = READ_STAGE_ELAPSED.with_label_values(&["build_page_reader"]);
|
||||||
|
/// In progress scan for each partition.
|
||||||
|
pub static ref SCAN_PARTITION: IntGaugeVec = register_int_gauge_vec!(
|
||||||
|
"greptime_mito_scan_partition",
|
||||||
|
"mito partitions scanning",
|
||||||
|
&[TYPE_LABEL, PARTITION_LABEL]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
/// Counter of rows read from different source.
|
/// Counter of rows read from different source.
|
||||||
pub static ref READ_ROWS_TOTAL: IntCounterVec =
|
pub static ref READ_ROWS_TOTAL: IntCounterVec =
|
||||||
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
|
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
|
||||||
|
|||||||
@@ -493,7 +493,7 @@ impl Batch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Checks the batch is monotonic by timestamps.
|
/// Checks the batch is monotonic by timestamps.
|
||||||
#[cfg(debug_assertions)]
|
// #[cfg(debug_assertions)]
|
||||||
pub(crate) fn check_monotonic(&self) -> Result<(), String> {
|
pub(crate) fn check_monotonic(&self) -> Result<(), String> {
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
if self.timestamps_native().is_none() {
|
if self.timestamps_native().is_none() {
|
||||||
@@ -501,12 +501,12 @@ impl Batch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let timestamps = self.timestamps_native().unwrap();
|
let timestamps = self.timestamps_native().unwrap();
|
||||||
let sequences = self.sequences.as_arrow().values();
|
// let sequences = self.sequences.as_arrow().values();
|
||||||
for (i, window) in timestamps.windows(2).enumerate() {
|
for (i, window) in timestamps.windows(2).enumerate() {
|
||||||
let current = window[0];
|
let current = window[0];
|
||||||
let next = window[1];
|
let next = window[1];
|
||||||
let current_sequence = sequences[i];
|
// let current_sequence = sequences[i];
|
||||||
let next_sequence = sequences[i + 1];
|
// let next_sequence = sequences[i + 1];
|
||||||
match current.cmp(&next) {
|
match current.cmp(&next) {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
// The current timestamp is less than the next timestamp.
|
// The current timestamp is less than the next timestamp.
|
||||||
@@ -514,12 +514,12 @@ impl Batch {
|
|||||||
}
|
}
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
// The current timestamp is equal to the next timestamp.
|
// The current timestamp is equal to the next timestamp.
|
||||||
if current_sequence < next_sequence {
|
// if current_sequence < next_sequence {
|
||||||
return Err(format!(
|
// return Err(format!(
|
||||||
"sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
|
// "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
|
||||||
current, next, current_sequence, next_sequence, i
|
// current, next, current_sequence, next_sequence, i
|
||||||
));
|
// ));
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
Ordering::Greater => {
|
Ordering::Greater => {
|
||||||
// The current timestamp is greater than the next timestamp.
|
// The current timestamp is greater than the next timestamp.
|
||||||
@@ -535,7 +535,7 @@ impl Batch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns Ok if the given batch is behind the current batch.
|
/// Returns Ok if the given batch is behind the current batch.
|
||||||
#[cfg(debug_assertions)]
|
// #[cfg(debug_assertions)]
|
||||||
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
|
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
|
||||||
// Checks the primary key
|
// Checks the primary key
|
||||||
if self.primary_key() < other.primary_key() {
|
if self.primary_key() < other.primary_key() {
|
||||||
@@ -560,19 +560,20 @@ impl Batch {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
// Checks the sequence.
|
// Checks the sequence.
|
||||||
if self.last_sequence() >= other.first_sequence() {
|
Ok(())
|
||||||
return Ok(());
|
// if self.last_sequence() >= other.first_sequence() {
|
||||||
}
|
// return Ok(());
|
||||||
Err(format!(
|
// }
|
||||||
"sequences are not monotonic: {:?} < {:?}",
|
// Err(format!(
|
||||||
self.last_sequence(),
|
// "sequences are not monotonic: {:?} < {:?}",
|
||||||
other.first_sequence()
|
// self.last_sequence(),
|
||||||
))
|
// other.first_sequence()
|
||||||
|
// ))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A struct to check the batch is monotonic.
|
/// A struct to check the batch is monotonic.
|
||||||
#[cfg(debug_assertions)]
|
// #[cfg(debug_assertions)]
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub(crate) struct BatchChecker {
|
pub(crate) struct BatchChecker {
|
||||||
last_batch: Option<Batch>,
|
last_batch: Option<Batch>,
|
||||||
@@ -580,7 +581,7 @@ pub(crate) struct BatchChecker {
|
|||||||
end: Option<Timestamp>,
|
end: Option<Timestamp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
// #[cfg(debug_assertions)]
|
||||||
impl BatchChecker {
|
impl BatchChecker {
|
||||||
/// Attaches the given start timestamp to the checker.
|
/// Attaches the given start timestamp to the checker.
|
||||||
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
|
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
|
||||||
|
|||||||
@@ -662,10 +662,17 @@ impl ScanInput {
|
|||||||
/// Prunes a file to scan and returns the builder to build readers.
|
/// Prunes a file to scan and returns the builder to build readers.
|
||||||
async fn prune_file(
|
async fn prune_file(
|
||||||
&self,
|
&self,
|
||||||
|
row_group_index: RowGroupIndex,
|
||||||
file_index: usize,
|
file_index: usize,
|
||||||
reader_metrics: &mut ReaderMetrics,
|
reader_metrics: &mut ReaderMetrics,
|
||||||
) -> Result<FileRangeBuilder> {
|
) -> Result<FileRangeBuilder> {
|
||||||
let file = &self.files[file_index];
|
let file = &self.files[file_index];
|
||||||
|
common_telemetry::info!(
|
||||||
|
"ScanInput prune file start, region_id: {}, file: {}, row_group_index: {:?}",
|
||||||
|
file.region_id(),
|
||||||
|
file.file_id(),
|
||||||
|
row_group_index,
|
||||||
|
);
|
||||||
let res = self
|
let res = self
|
||||||
.access_layer
|
.access_layer
|
||||||
.read_sst(file.clone())
|
.read_sst(file.clone())
|
||||||
@@ -701,6 +708,13 @@ impl ScanInput {
|
|||||||
)?;
|
)?;
|
||||||
file_range_ctx.set_compat_batch(Some(compat));
|
file_range_ctx.set_compat_batch(Some(compat));
|
||||||
}
|
}
|
||||||
|
common_telemetry::info!(
|
||||||
|
"ScanInput prune file end, region_id: {}, file: {}, row_groups_num: {}, row_group_index: {:?}",
|
||||||
|
file.region_id(),
|
||||||
|
file.file_id(),
|
||||||
|
row_groups.len(),
|
||||||
|
row_group_index,
|
||||||
|
);
|
||||||
Ok(FileRangeBuilder {
|
Ok(FileRangeBuilder {
|
||||||
context: Some(Arc::new(file_range_ctx)),
|
context: Some(Arc::new(file_range_ctx)),
|
||||||
row_groups,
|
row_groups,
|
||||||
@@ -821,11 +835,12 @@ impl StreamContext {
|
|||||||
pub(crate) async fn build_file_ranges(
|
pub(crate) async fn build_file_ranges(
|
||||||
&self,
|
&self,
|
||||||
index: RowGroupIndex,
|
index: RowGroupIndex,
|
||||||
|
read_type: &'static str,
|
||||||
reader_metrics: &mut ReaderMetrics,
|
reader_metrics: &mut ReaderMetrics,
|
||||||
) -> Result<SmallVec<[FileRange; 2]>> {
|
) -> Result<SmallVec<[FileRange; 2]>> {
|
||||||
let mut ranges = SmallVec::new();
|
let mut ranges = SmallVec::new();
|
||||||
self.range_builders
|
self.range_builders
|
||||||
.build_file_ranges(&self.input, index, &mut ranges, reader_metrics)
|
.build_file_ranges(&self.input, index, read_type, &mut ranges, reader_metrics)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(ranges)
|
Ok(ranges)
|
||||||
}
|
}
|
||||||
@@ -896,19 +911,52 @@ impl RangeBuilderList {
|
|||||||
&self,
|
&self,
|
||||||
input: &ScanInput,
|
input: &ScanInput,
|
||||||
index: RowGroupIndex,
|
index: RowGroupIndex,
|
||||||
|
read_type: &'static str,
|
||||||
ranges: &mut SmallVec<[FileRange; 2]>,
|
ranges: &mut SmallVec<[FileRange; 2]>,
|
||||||
reader_metrics: &mut ReaderMetrics,
|
reader_metrics: &mut ReaderMetrics,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let file_index = index.index - self.mem_builders.len();
|
let file_index = index.index - self.mem_builders.len();
|
||||||
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] RangeBuilderList build ranges start, region_id: {}, row_group_index: {:?}",
|
||||||
|
input.mapper.metadata().region_id,
|
||||||
|
index,
|
||||||
|
);
|
||||||
|
}
|
||||||
let mut builder_opt = self.file_builders[file_index].lock().await;
|
let mut builder_opt = self.file_builders[file_index].lock().await;
|
||||||
match &mut *builder_opt {
|
match &mut *builder_opt {
|
||||||
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
|
Some(builder) => {
|
||||||
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] RangeBuilderList build ranges get lock, build ranges, region_id: {}, row_group_index: {:?}",
|
||||||
|
input.mapper.metadata().region_id,
|
||||||
|
index,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
builder.build_ranges(index.row_group_index, ranges)
|
||||||
|
}
|
||||||
None => {
|
None => {
|
||||||
let builder = input.prune_file(file_index, reader_metrics).await?;
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] RangeBuilderList build ranges get lock, build builder, region_id: {}, row_group_index: {:?}",
|
||||||
|
input.mapper.metadata().region_id,
|
||||||
|
index,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let builder = input.prune_file(index, file_index, reader_metrics).await?;
|
||||||
builder.build_ranges(index.row_group_index, ranges);
|
builder.build_ranges(index.row_group_index, ranges);
|
||||||
*builder_opt = Some(builder);
|
*builder_opt = Some(builder);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] RangeBuilderList build ranges end, region_id: {}, row_group_index: {:?}, ranges: {}",
|
||||||
|
input.mapper.metadata().region_id,
|
||||||
|
index,
|
||||||
|
ranges.len(),
|
||||||
|
);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,15 +20,21 @@ use std::time::{Duration, Instant};
|
|||||||
use async_stream::try_stream;
|
use async_stream::try_stream;
|
||||||
use common_telemetry::debug;
|
use common_telemetry::debug;
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
|
use prometheus::IntGauge;
|
||||||
|
use snafu::ResultExt;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
use tokio::task::yield_now;
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::{Result, TimeoutSnafu};
|
||||||
|
use crate::metrics::SCAN_PARTITION;
|
||||||
use crate::read::range::RowGroupIndex;
|
use crate::read::range::RowGroupIndex;
|
||||||
use crate::read::scan_region::StreamContext;
|
use crate::read::scan_region::StreamContext;
|
||||||
use crate::read::{Batch, ScannerMetrics, Source};
|
use crate::read::{Batch, ScannerMetrics, Source};
|
||||||
use crate::sst::file::FileTimeRange;
|
use crate::sst::file::FileTimeRange;
|
||||||
use crate::sst::parquet::reader::ReaderMetrics;
|
use crate::sst::parquet::reader::ReaderMetrics;
|
||||||
|
|
||||||
|
const BUILD_RANGES_TIMEOUT: Duration = Duration::from_secs(60 * 5);
|
||||||
|
|
||||||
struct PartitionMetricsInner {
|
struct PartitionMetricsInner {
|
||||||
region_id: RegionId,
|
region_id: RegionId,
|
||||||
/// Index of the partition to scan.
|
/// Index of the partition to scan.
|
||||||
@@ -41,6 +47,7 @@ struct PartitionMetricsInner {
|
|||||||
first_poll: Duration,
|
first_poll: Duration,
|
||||||
metrics: ScannerMetrics,
|
metrics: ScannerMetrics,
|
||||||
reader_metrics: ReaderMetrics,
|
reader_metrics: ReaderMetrics,
|
||||||
|
scan_partition_gauge: IntGauge,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartitionMetricsInner {
|
impl PartitionMetricsInner {
|
||||||
@@ -56,6 +63,7 @@ impl Drop for PartitionMetricsInner {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.on_finish();
|
self.on_finish();
|
||||||
self.metrics.observe_metrics();
|
self.metrics.observe_metrics();
|
||||||
|
self.scan_partition_gauge.dec();
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
|
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
|
||||||
@@ -76,6 +84,10 @@ impl PartitionMetrics {
|
|||||||
query_start: Instant,
|
query_start: Instant,
|
||||||
metrics: ScannerMetrics,
|
metrics: ScannerMetrics,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let partition_str = partition.to_string();
|
||||||
|
let scan_partition_gauge =
|
||||||
|
SCAN_PARTITION.with_label_values(&[scanner_type, &partition_str]);
|
||||||
|
scan_partition_gauge.inc();
|
||||||
let inner = PartitionMetricsInner {
|
let inner = PartitionMetricsInner {
|
||||||
region_id,
|
region_id,
|
||||||
partition,
|
partition,
|
||||||
@@ -84,10 +96,15 @@ impl PartitionMetrics {
|
|||||||
first_poll: Duration::default(),
|
first_poll: Duration::default(),
|
||||||
metrics,
|
metrics,
|
||||||
reader_metrics: ReaderMetrics::default(),
|
reader_metrics: ReaderMetrics::default(),
|
||||||
|
scan_partition_gauge,
|
||||||
};
|
};
|
||||||
Self(Arc::new(Mutex::new(inner)))
|
Self(Arc::new(Mutex::new(inner)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn partition(&self) -> usize {
|
||||||
|
self.0.lock().unwrap().partition
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn on_first_poll(&self) {
|
pub(crate) fn on_first_poll(&self) {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
inner.first_poll = inner.query_start.elapsed();
|
inner.first_poll = inner.query_start.elapsed();
|
||||||
@@ -126,6 +143,7 @@ impl PartitionMetrics {
|
|||||||
|
|
||||||
/// Scans memtable ranges at `index`.
|
/// Scans memtable ranges at `index`.
|
||||||
pub(crate) fn scan_mem_ranges(
|
pub(crate) fn scan_mem_ranges(
|
||||||
|
partition: usize,
|
||||||
stream_ctx: Arc<StreamContext>,
|
stream_ctx: Arc<StreamContext>,
|
||||||
part_metrics: PartitionMetrics,
|
part_metrics: PartitionMetrics,
|
||||||
index: RowGroupIndex,
|
index: RowGroupIndex,
|
||||||
@@ -137,7 +155,17 @@ pub(crate) fn scan_mem_ranges(
|
|||||||
for range in ranges {
|
for range in ranges {
|
||||||
let build_reader_start = Instant::now();
|
let build_reader_start = Instant::now();
|
||||||
let iter = range.build_iter(time_range)?;
|
let iter = range.build_iter(time_range)?;
|
||||||
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
|
let build_cost = build_reader_start.elapsed();
|
||||||
|
part_metrics.inc_build_reader_cost(build_cost);
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Thread: {:?}, Scan mem range, region_id: {}, partition: {}, time_range: {:?}, index: {:?}, build_cost: {:?}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
time_range,
|
||||||
|
index,
|
||||||
|
build_cost
|
||||||
|
);
|
||||||
|
|
||||||
let mut source = Source::Iter(iter);
|
let mut source = Source::Iter(iter);
|
||||||
while let Some(batch) = source.next_batch().await? {
|
while let Some(batch) = source.next_batch().await? {
|
||||||
@@ -149,6 +177,7 @@ pub(crate) fn scan_mem_ranges(
|
|||||||
|
|
||||||
/// Scans file ranges at `index`.
|
/// Scans file ranges at `index`.
|
||||||
pub(crate) fn scan_file_ranges(
|
pub(crate) fn scan_file_ranges(
|
||||||
|
partition: usize,
|
||||||
stream_ctx: Arc<StreamContext>,
|
stream_ctx: Arc<StreamContext>,
|
||||||
part_metrics: PartitionMetrics,
|
part_metrics: PartitionMetrics,
|
||||||
index: RowGroupIndex,
|
index: RowGroupIndex,
|
||||||
@@ -156,14 +185,70 @@ pub(crate) fn scan_file_ranges(
|
|||||||
) -> impl Stream<Item = Result<Batch>> {
|
) -> impl Stream<Item = Result<Batch>> {
|
||||||
try_stream! {
|
try_stream! {
|
||||||
let mut reader_metrics = ReaderMetrics::default();
|
let mut reader_metrics = ReaderMetrics::default();
|
||||||
let ranges = stream_ctx
|
if read_type == "unordered_scan_files" {
|
||||||
.build_file_ranges(index, &mut reader_metrics)
|
common_telemetry::debug!(
|
||||||
.await?;
|
"[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges start, region_id: {}, partition: {}, index: {:?}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
index,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let ranges = tokio::time::timeout(
|
||||||
|
BUILD_RANGES_TIMEOUT,
|
||||||
|
stream_ctx.build_file_ranges(index, read_type, &mut reader_metrics),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.with_context(|_| TimeoutSnafu {
|
||||||
|
msg: format!(
|
||||||
|
"build file ranges for {}, partition: {}",
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.inspect_err(|e| {
|
||||||
|
common_telemetry::error!(
|
||||||
|
e; "Thread: {:?}, Scan file ranges build ranges timeout, region_id: {}, partition: {}, index: {:?}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
index,
|
||||||
|
);
|
||||||
|
})??;
|
||||||
|
// let ranges = stream_ctx
|
||||||
|
// .build_file_ranges(index, read_type, &mut reader_metrics)
|
||||||
|
// .await?;
|
||||||
part_metrics.inc_num_file_ranges(ranges.len());
|
part_metrics.inc_num_file_ranges(ranges.len());
|
||||||
|
|
||||||
|
// Notify other partitions.
|
||||||
|
yield_now().await;
|
||||||
|
|
||||||
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges end, region_id: {}, partition: {}, index: {:?}, ranges: {}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
index,
|
||||||
|
ranges.len(),
|
||||||
|
);
|
||||||
|
}
|
||||||
for range in ranges {
|
for range in ranges {
|
||||||
let build_reader_start = Instant::now();
|
let build_reader_start = Instant::now();
|
||||||
let reader = range.reader(None).await?;
|
let reader = range.reader(None).await?;
|
||||||
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
|
let build_cost = build_reader_start.elapsed();
|
||||||
|
part_metrics.inc_build_reader_cost(build_cost);
|
||||||
|
if read_type == "unordered_scan_files" {
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
range.file_handle().file_id(),
|
||||||
|
index,
|
||||||
|
build_cost
|
||||||
|
);
|
||||||
|
}
|
||||||
let compat_batch = range.compat_batch();
|
let compat_batch = range.compat_batch();
|
||||||
let mut source = Source::PruneReader(reader);
|
let mut source = Source::PruneReader(reader);
|
||||||
while let Some(mut batch) = source.next_batch().await? {
|
while let Some(mut batch) = source.next_batch().await? {
|
||||||
|
|||||||
@@ -361,6 +361,7 @@ fn build_sources(
|
|||||||
for index in &range_meta.row_group_indices {
|
for index in &range_meta.row_group_indices {
|
||||||
let stream = if stream_ctx.is_mem_range_index(*index) {
|
let stream = if stream_ctx.is_mem_range_index(*index) {
|
||||||
let stream = scan_mem_ranges(
|
let stream = scan_mem_ranges(
|
||||||
|
part_metrics.partition(),
|
||||||
stream_ctx.clone(),
|
stream_ctx.clone(),
|
||||||
part_metrics.clone(),
|
part_metrics.clone(),
|
||||||
*index,
|
*index,
|
||||||
@@ -373,8 +374,13 @@ fn build_sources(
|
|||||||
} else {
|
} else {
|
||||||
"seq_scan_files"
|
"seq_scan_files"
|
||||||
};
|
};
|
||||||
let stream =
|
let stream = scan_file_ranges(
|
||||||
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type);
|
part_metrics.partition(),
|
||||||
|
stream_ctx.clone(),
|
||||||
|
part_metrics.clone(),
|
||||||
|
*index,
|
||||||
|
read_type,
|
||||||
|
);
|
||||||
Box::pin(stream) as _
|
Box::pin(stream) as _
|
||||||
};
|
};
|
||||||
sources.push(Source::Stream(stream));
|
sources.push(Source::Stream(stream));
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ impl UnorderedScan {
|
|||||||
|
|
||||||
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
|
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
|
||||||
fn scan_partition_range(
|
fn scan_partition_range(
|
||||||
|
partition: usize,
|
||||||
stream_ctx: Arc<StreamContext>,
|
stream_ctx: Arc<StreamContext>,
|
||||||
part_range_id: usize,
|
part_range_id: usize,
|
||||||
part_metrics: PartitionMetrics,
|
part_metrics: PartitionMetrics,
|
||||||
@@ -90,12 +91,12 @@ impl UnorderedScan {
|
|||||||
let range_meta = &stream_ctx.ranges[part_range_id];
|
let range_meta = &stream_ctx.ranges[part_range_id];
|
||||||
for index in &range_meta.row_group_indices {
|
for index in &range_meta.row_group_indices {
|
||||||
if stream_ctx.is_mem_range_index(*index) {
|
if stream_ctx.is_mem_range_index(*index) {
|
||||||
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
|
let stream = scan_mem_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
|
||||||
for await batch in stream {
|
for await batch in stream {
|
||||||
yield batch;
|
yield batch;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
|
let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
|
||||||
for await batch in stream {
|
for await batch in stream {
|
||||||
yield batch;
|
yield batch;
|
||||||
}
|
}
|
||||||
@@ -132,24 +133,45 @@ impl UnorderedScan {
|
|||||||
let part_ranges = self.properties.partitions[partition].clone();
|
let part_ranges = self.properties.partitions[partition].clone();
|
||||||
let distinguish_range = self.properties.distinguish_partition_range();
|
let distinguish_range = self.properties.distinguish_partition_range();
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[DEBUG_SCAN] Thread: {:?}, Unordered scan start, region_id: {}, partition: {}, num_ranges: {}, part_ranges: {:?}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
part_ranges.len(),
|
||||||
|
part_ranges,
|
||||||
|
);
|
||||||
|
|
||||||
let stream = try_stream! {
|
let stream = try_stream! {
|
||||||
part_metrics.on_first_poll();
|
part_metrics.on_first_poll();
|
||||||
|
|
||||||
let cache = stream_ctx.input.cache_manager.as_deref();
|
let cache = stream_ctx.input.cache_manager.as_deref();
|
||||||
|
let ranges_len = part_ranges.len();
|
||||||
// Scans each part.
|
// Scans each part.
|
||||||
for part_range in part_ranges {
|
for (part_idx, part_range) in part_ranges.into_iter().enumerate() {
|
||||||
let mut metrics = ScannerMetrics::default();
|
common_telemetry::debug!(
|
||||||
let mut fetch_start = Instant::now();
|
"[DEBUG_SCAN] Thread: {:?}, Unordered scan range start {}/{}, region_id: {}, partition: {}, part_range: {:?}, range_meta: {:?}",
|
||||||
#[cfg(debug_assertions)]
|
std::thread::current().id(),
|
||||||
|
part_idx,
|
||||||
|
ranges_len,
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
part_range,
|
||||||
|
stream_ctx.ranges[part_range.identifier]
|
||||||
|
);
|
||||||
|
// #[cfg(debug_assertions)]
|
||||||
let mut checker = crate::read::BatchChecker::default()
|
let mut checker = crate::read::BatchChecker::default()
|
||||||
.with_start(Some(part_range.start))
|
.with_start(Some(part_range.start))
|
||||||
.with_end(Some(part_range.end));
|
.with_end(Some(part_range.end));
|
||||||
|
|
||||||
let stream = Self::scan_partition_range(
|
let stream = Self::scan_partition_range(
|
||||||
|
partition,
|
||||||
stream_ctx.clone(),
|
stream_ctx.clone(),
|
||||||
part_range.identifier,
|
part_range.identifier,
|
||||||
part_metrics.clone(),
|
part_metrics.clone(),
|
||||||
);
|
);
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
for await batch in stream {
|
for await batch in stream {
|
||||||
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
|
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
@@ -161,7 +183,7 @@ impl UnorderedScan {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
// #[cfg(debug_assertions)]
|
||||||
checker.ensure_part_range_batch(
|
checker.ensure_part_range_batch(
|
||||||
"UnorderedScan",
|
"UnorderedScan",
|
||||||
stream_ctx.input.mapper.metadata().region_id,
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
@@ -188,7 +210,20 @@ impl UnorderedScan {
|
|||||||
metrics.yield_cost += yield_start.elapsed();
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
let scan_cost = fetch_start.elapsed();
|
||||||
|
metrics.scan_cost += scan_cost;
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"[DEBUG_SCAN] Thread: {:?}, Unordered scan range end {}/{}, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}",
|
||||||
|
std::thread::current().id(),
|
||||||
|
part_idx,
|
||||||
|
ranges_len,
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
part_range,
|
||||||
|
metrics.scan_cost,
|
||||||
|
metrics.yield_cost,
|
||||||
|
metrics.num_rows,
|
||||||
|
);
|
||||||
part_metrics.merge_metrics(&metrics);
|
part_metrics.merge_metrics(&metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -106,6 +106,10 @@ impl InvertedIndexApplier {
|
|||||||
if let Err(err) = other {
|
if let Err(err) = other {
|
||||||
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
|
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
|
||||||
}
|
}
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Inverted applier get from remote blob reader, file_id: {}",
|
||||||
|
file_id,
|
||||||
|
);
|
||||||
self.remote_blob_reader(file_id).await?
|
self.remote_blob_reader(file_id).await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use std::sync::Arc;
|
|||||||
use common_base::readable_size::ReadableSize;
|
use common_base::readable_size::ReadableSize;
|
||||||
use parquet::file::metadata::ParquetMetaData;
|
use parquet::file::metadata::ParquetMetaData;
|
||||||
|
|
||||||
|
use crate::config::CompressionMethod;
|
||||||
use crate::sst::file::FileTimeRange;
|
use crate::sst::file::FileTimeRange;
|
||||||
use crate::sst::index::IndexOutput;
|
use crate::sst::index::IndexOutput;
|
||||||
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
|
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
|
||||||
@@ -49,6 +50,8 @@ pub struct WriteOptions {
|
|||||||
pub write_buffer_size: ReadableSize,
|
pub write_buffer_size: ReadableSize,
|
||||||
/// Row group size.
|
/// Row group size.
|
||||||
pub row_group_size: usize,
|
pub row_group_size: usize,
|
||||||
|
/// Compression method.
|
||||||
|
pub compression_method: CompressionMethod,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for WriteOptions {
|
impl Default for WriteOptions {
|
||||||
@@ -56,6 +59,7 @@ impl Default for WriteOptions {
|
|||||||
WriteOptions {
|
WriteOptions {
|
||||||
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
|
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
|
||||||
row_group_size: DEFAULT_ROW_GROUP_SIZE,
|
row_group_size: DEFAULT_ROW_GROUP_SIZE,
|
||||||
|
compression_method: CompressionMethod::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,6 +193,12 @@ impl ParquetReaderBuilder {
|
|||||||
let file_size = self.file_handle.meta_ref().file_size;
|
let file_size = self.file_handle.meta_ref().file_size;
|
||||||
// Loads parquet metadata of the file.
|
// Loads parquet metadata of the file.
|
||||||
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
|
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Parquet read metadata done, region_id: {}, file_id: {}, elapsed: {:?}",
|
||||||
|
self.file_handle.region_id(),
|
||||||
|
self.file_handle.file_id(),
|
||||||
|
start.elapsed(),
|
||||||
|
);
|
||||||
// Decodes region metadata.
|
// Decodes region metadata.
|
||||||
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
|
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
|
||||||
// Gets the metadata stored in the SST.
|
// Gets the metadata stored in the SST.
|
||||||
@@ -476,6 +482,12 @@ impl ParquetReaderBuilder {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Parquet prune by inverted index start, region_id: {}, file_id: {}",
|
||||||
|
self.file_handle.region_id(),
|
||||||
|
self.file_handle.file_id(),
|
||||||
|
);
|
||||||
|
|
||||||
let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
|
let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
|
||||||
Ok(output) => output,
|
Ok(output) => output,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -497,6 +509,12 @@ impl ParquetReaderBuilder {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
common_telemetry::debug!(
|
||||||
|
"Parquet prune by inverted index stop, region_id: {}, file_id: {}",
|
||||||
|
self.file_handle.region_id(),
|
||||||
|
self.file_handle.file_id(),
|
||||||
|
);
|
||||||
|
|
||||||
let segment_row_count = apply_output.segment_row_count;
|
let segment_row_count = apply_output.segment_row_count;
|
||||||
let grouped_in_row_groups = apply_output
|
let grouped_in_row_groups = apply_output
|
||||||
.matched_segment_ids
|
.matched_segment_ids
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ use tokio::task::yield_now;
|
|||||||
|
|
||||||
use crate::cache::file_cache::{FileType, IndexKey};
|
use crate::cache::file_cache::{FileType, IndexKey};
|
||||||
use crate::cache::{CacheManagerRef, PageKey, PageValue};
|
use crate::cache::{CacheManagerRef, PageKey, PageValue};
|
||||||
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
|
use crate::metrics::{READ_STAGE_BUILD_PAGE_READER, READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
|
||||||
use crate::sst::file::FileId;
|
use crate::sst::file::FileId;
|
||||||
use crate::sst::parquet::helper::fetch_byte_ranges;
|
use crate::sst::parquet::helper::fetch_byte_ranges;
|
||||||
use crate::sst::parquet::page_reader::RowGroupCachedReader;
|
use crate::sst::parquet::page_reader::RowGroupCachedReader;
|
||||||
@@ -308,6 +308,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
|||||||
|
|
||||||
/// Creates a page reader to read column at `i`.
|
/// Creates a page reader to read column at `i`.
|
||||||
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
|
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
|
||||||
|
let _timer = READ_STAGE_BUILD_PAGE_READER.start_timer();
|
||||||
if let Some(cached_pages) = &self.column_uncompressed_pages[i] {
|
if let Some(cached_pages) = &self.column_uncompressed_pages[i] {
|
||||||
debug_assert!(!cached_pages.row_group.is_empty());
|
debug_assert!(!cached_pages.row_group.is_empty());
|
||||||
// Hits the row group level page cache.
|
// Hits the row group level page cache.
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
|
|||||||
use tokio::io::AsyncWrite;
|
use tokio::io::AsyncWrite;
|
||||||
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
|
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
|
||||||
|
|
||||||
|
use crate::config::CompressionMethod;
|
||||||
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
|
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
|
||||||
use crate::read::{Batch, Source};
|
use crate::read::{Batch, Source};
|
||||||
use crate::sst::index::Indexer;
|
use crate::sst::index::Indexer;
|
||||||
@@ -217,9 +218,14 @@ where
|
|||||||
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
|
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
|
||||||
|
|
||||||
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
|
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
|
||||||
|
let compression = match opts.compression_method {
|
||||||
|
CompressionMethod::Zstd => Compression::ZSTD(ZstdLevel::default()),
|
||||||
|
CompressionMethod::Lz4 => Compression::LZ4_RAW,
|
||||||
|
CompressionMethod::None => Compression::UNCOMPRESSED,
|
||||||
|
};
|
||||||
let props_builder = WriterProperties::builder()
|
let props_builder = WriterProperties::builder()
|
||||||
.set_key_value_metadata(Some(vec![key_value_meta]))
|
.set_key_value_metadata(Some(vec![key_value_meta]))
|
||||||
.set_compression(Compression::ZSTD(ZstdLevel::default()))
|
.set_compression(compression)
|
||||||
.set_encoding(Encoding::PLAIN)
|
.set_encoding(Encoding::PLAIN)
|
||||||
.set_max_row_group_size(opts.row_group_size);
|
.set_max_row_group_size(opts.row_group_size);
|
||||||
|
|
||||||
|
|||||||
@@ -79,21 +79,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
region_ctx.set_error(e);
|
region_ctx.set_error(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
match wal_writer.write_to_wal().await.map_err(Arc::new) {
|
if !self.config.skip_wal {
|
||||||
Ok(response) => {
|
match wal_writer.write_to_wal().await.map_err(Arc::new) {
|
||||||
for (region_id, region_ctx) in region_ctxs.iter_mut() {
|
Ok(response) => {
|
||||||
// Safety: the log store implementation ensures that either the `write_to_wal` fails and no
|
for (region_id, region_ctx) in region_ctxs.iter_mut() {
|
||||||
// response is returned or the last entry ids for each region do exist.
|
// Safety: the log store implementation ensures that either the `write_to_wal` fails and no
|
||||||
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
|
// response is returned or the last entry ids for each region do exist.
|
||||||
region_ctx.set_next_entry_id(last_entry_id + 1);
|
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
|
||||||
|
region_ctx.set_next_entry_id(last_entry_id + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
Err(e) => {
|
||||||
Err(e) => {
|
// Failed to write wal.
|
||||||
// Failed to write wal.
|
for mut region_ctx in region_ctxs.into_values() {
|
||||||
for mut region_ctx in region_ctxs.into_values() {
|
region_ctx.set_error(e.clone());
|
||||||
region_ctx.set_error(e.clone());
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -274,7 +274,7 @@ impl<'a> RuleChecker<'a> {
|
|||||||
fn check_axis(&self) -> Result<()> {
|
fn check_axis(&self) -> Result<()> {
|
||||||
for (col_index, axis) in self.axis.iter().enumerate() {
|
for (col_index, axis) in self.axis.iter().enumerate() {
|
||||||
for (val, split_point) in axis {
|
for (val, split_point) in axis {
|
||||||
if split_point.less_than_counter != 0 || !split_point.is_equal {
|
if !split_point.is_equal {
|
||||||
UnclosedValueSnafu {
|
UnclosedValueSnafu {
|
||||||
value: format!("{val:?}"),
|
value: format!("{val:?}"),
|
||||||
column: self.rule.partition_columns[col_index].clone(),
|
column: self.rule.partition_columns[col_index].clone(),
|
||||||
@@ -410,6 +410,7 @@ mod tests {
|
|||||||
/// b <= h b >= s
|
/// b <= h b >= s
|
||||||
/// ```
|
/// ```
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "don't check unmatched `>` and `<` for now"]
|
||||||
fn empty_expr_case_1() {
|
fn empty_expr_case_1() {
|
||||||
// PARTITION ON COLUMNS (b) (
|
// PARTITION ON COLUMNS (b) (
|
||||||
// b <= 'h',
|
// b <= 'h',
|
||||||
@@ -451,6 +452,7 @@ mod tests {
|
|||||||
/// 10 20
|
/// 10 20
|
||||||
/// ```
|
/// ```
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "don't check unmatched `>` and `<` for now"]
|
||||||
fn empty_expr_case_2() {
|
fn empty_expr_case_2() {
|
||||||
// PARTITION ON COLUMNS (b) (
|
// PARTITION ON COLUMNS (b) (
|
||||||
// a >= 100 AND b <= 10 OR a > 100 AND a <= 200 AND b <= 10 OR a >= 200 AND b > 10 AND b <= 20 OR a > 200 AND b <= 20
|
// a >= 100 AND b <= 10 OR a > 100 AND a <= 200 AND b <= 10 OR a >= 200 AND b > 10 AND b <= 20 OR a > 200 AND b <= 20
|
||||||
@@ -580,6 +582,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "don't check unmatched `>` and `<` for now"]
|
||||||
fn duplicate_expr_case_1() {
|
fn duplicate_expr_case_1() {
|
||||||
// PARTITION ON COLUMNS (a) (
|
// PARTITION ON COLUMNS (a) (
|
||||||
// a <= 20,
|
// a <= 20,
|
||||||
|
|||||||
@@ -15,8 +15,11 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use datafusion::functions_aggregate::sum::Sum;
|
||||||
|
use datafusion_expr::aggregate_function::AggregateFunction as BuiltInAggregateFunction;
|
||||||
|
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition};
|
||||||
use datafusion_expr::utils::exprlist_to_columns;
|
use datafusion_expr::utils::exprlist_to_columns;
|
||||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||||
use promql::extension_plan::{
|
use promql::extension_plan::{
|
||||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||||
};
|
};
|
||||||
@@ -25,21 +28,91 @@ use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan}
|
|||||||
use crate::dist_plan::MergeScanLogicalPlan;
|
use crate::dist_plan::MergeScanLogicalPlan;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub enum Commutativity {
|
pub enum Commutativity<T> {
|
||||||
Commutative,
|
Commutative,
|
||||||
PartialCommutative,
|
PartialCommutative,
|
||||||
ConditionalCommutative(Option<Transformer>),
|
ConditionalCommutative(Option<Transformer<T>>),
|
||||||
TransformedCommutative(Option<Transformer>),
|
TransformedCommutative(Option<Transformer<T>>),
|
||||||
NonCommutative,
|
NonCommutative,
|
||||||
Unimplemented,
|
Unimplemented,
|
||||||
/// For unrelated plans like DDL
|
/// For unrelated plans like DDL
|
||||||
Unsupported,
|
Unsupported,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> Commutativity<T> {
|
||||||
|
/// Check if self is stricter than `lhs`
|
||||||
|
fn is_stricter_than(&self, lhs: &Self) -> bool {
|
||||||
|
match (lhs, self) {
|
||||||
|
(Commutativity::Commutative, Commutativity::Commutative) => false,
|
||||||
|
(Commutativity::Commutative, _) => true,
|
||||||
|
|
||||||
|
(
|
||||||
|
Commutativity::PartialCommutative,
|
||||||
|
Commutativity::Commutative | Commutativity::PartialCommutative,
|
||||||
|
) => false,
|
||||||
|
(Commutativity::PartialCommutative, _) => true,
|
||||||
|
|
||||||
|
(
|
||||||
|
Commutativity::ConditionalCommutative(_),
|
||||||
|
Commutativity::Commutative
|
||||||
|
| Commutativity::PartialCommutative
|
||||||
|
| Commutativity::ConditionalCommutative(_),
|
||||||
|
) => false,
|
||||||
|
(Commutativity::ConditionalCommutative(_), _) => true,
|
||||||
|
|
||||||
|
(
|
||||||
|
Commutativity::TransformedCommutative(_),
|
||||||
|
Commutativity::Commutative
|
||||||
|
| Commutativity::PartialCommutative
|
||||||
|
| Commutativity::ConditionalCommutative(_)
|
||||||
|
| Commutativity::TransformedCommutative(_),
|
||||||
|
) => false,
|
||||||
|
(Commutativity::TransformedCommutative(_), _) => true,
|
||||||
|
|
||||||
|
(
|
||||||
|
Commutativity::NonCommutative
|
||||||
|
| Commutativity::Unimplemented
|
||||||
|
| Commutativity::Unsupported,
|
||||||
|
_,
|
||||||
|
) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a bare commutative level without any transformer
|
||||||
|
fn bare_level<To>(&self) -> Commutativity<To> {
|
||||||
|
match self {
|
||||||
|
Commutativity::Commutative => Commutativity::Commutative,
|
||||||
|
Commutativity::PartialCommutative => Commutativity::PartialCommutative,
|
||||||
|
Commutativity::ConditionalCommutative(_) => Commutativity::ConditionalCommutative(None),
|
||||||
|
Commutativity::TransformedCommutative(_) => Commutativity::TransformedCommutative(None),
|
||||||
|
Commutativity::NonCommutative => Commutativity::NonCommutative,
|
||||||
|
Commutativity::Unimplemented => Commutativity::Unimplemented,
|
||||||
|
Commutativity::Unsupported => Commutativity::Unsupported,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> std::fmt::Debug for Commutativity<T> {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Commutativity::Commutative => write!(f, "Commutative"),
|
||||||
|
Commutativity::PartialCommutative => write!(f, "PartialCommutative"),
|
||||||
|
Commutativity::ConditionalCommutative(_) => write!(f, "ConditionalCommutative"),
|
||||||
|
Commutativity::TransformedCommutative(_) => write!(f, "TransformedCommutative"),
|
||||||
|
Commutativity::NonCommutative => write!(f, "NonCommutative"),
|
||||||
|
Commutativity::Unimplemented => write!(f, "Unimplemented"),
|
||||||
|
Commutativity::Unsupported => write!(f, "Unsupported"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Categorizer {}
|
pub struct Categorizer {}
|
||||||
|
|
||||||
impl Categorizer {
|
impl Categorizer {
|
||||||
pub fn check_plan(plan: &LogicalPlan, partition_cols: Option<Vec<String>>) -> Commutativity {
|
pub fn check_plan(
|
||||||
|
plan: &LogicalPlan,
|
||||||
|
partition_cols: Option<Vec<String>>,
|
||||||
|
) -> Commutativity<LogicalPlan> {
|
||||||
let partition_cols = partition_cols.unwrap_or_default();
|
let partition_cols = partition_cols.unwrap_or_default();
|
||||||
|
|
||||||
match plan {
|
match plan {
|
||||||
@@ -47,21 +120,104 @@ impl Categorizer {
|
|||||||
for expr in &proj.expr {
|
for expr in &proj.expr {
|
||||||
let commutativity = Self::check_expr(expr);
|
let commutativity = Self::check_expr(expr);
|
||||||
if !matches!(commutativity, Commutativity::Commutative) {
|
if !matches!(commutativity, Commutativity::Commutative) {
|
||||||
return commutativity;
|
return commutativity.bare_level();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Commutativity::Commutative
|
Commutativity::Commutative
|
||||||
}
|
}
|
||||||
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
|
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
|
||||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate).bare_level(),
|
||||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||||
LogicalPlan::Aggregate(aggr) => {
|
LogicalPlan::Aggregate(aggr) => {
|
||||||
|
// fast path: if the group_expr is a subset of partition_cols
|
||||||
if Self::check_partition(&aggr.group_expr, &partition_cols) {
|
if Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||||
return Commutativity::Commutative;
|
return Commutativity::Commutative;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check all children exprs and uses the strictest level
|
common_telemetry::info!("[DEBUG] aggregate plan expr: {:?}", aggr.aggr_expr);
|
||||||
Commutativity::Unimplemented
|
|
||||||
|
// get all commutativity levels of aggregate exprs and find the strictest one
|
||||||
|
let aggr_expr_comm = aggr
|
||||||
|
.aggr_expr
|
||||||
|
.iter()
|
||||||
|
.map(Self::check_expr)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut strictest = Commutativity::Commutative;
|
||||||
|
for comm in &aggr_expr_comm {
|
||||||
|
if comm.is_stricter_than(&strictest) {
|
||||||
|
strictest = comm.bare_level();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!("[DEBUG] aggr_expr_comm: {:?}", aggr_expr_comm);
|
||||||
|
common_telemetry::info!("[DEBUG] strictest: {:?}", strictest);
|
||||||
|
|
||||||
|
// fast path: if any expr is commutative or non-commutative
|
||||||
|
if matches!(
|
||||||
|
strictest,
|
||||||
|
Commutativity::Commutative
|
||||||
|
| Commutativity::NonCommutative
|
||||||
|
| Commutativity::Unimplemented
|
||||||
|
| Commutativity::Unsupported
|
||||||
|
) {
|
||||||
|
return strictest.bare_level();
|
||||||
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!("[DEBUG] continue for strictest",);
|
||||||
|
|
||||||
|
// collect expr transformers
|
||||||
|
let mut expr_transformer = Vec::with_capacity(aggr.aggr_expr.len());
|
||||||
|
for expr_comm in aggr_expr_comm {
|
||||||
|
match expr_comm {
|
||||||
|
Commutativity::Commutative => expr_transformer.push(None),
|
||||||
|
Commutativity::ConditionalCommutative(transformer) => {
|
||||||
|
expr_transformer.push(transformer.clone());
|
||||||
|
}
|
||||||
|
Commutativity::PartialCommutative => expr_transformer
|
||||||
|
.push(Some(Arc::new(expr_partial_commutative_transformer))),
|
||||||
|
_ => expr_transformer.push(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// build plan transformer
|
||||||
|
let transformer = Arc::new(move |plan: &LogicalPlan| {
|
||||||
|
if let LogicalPlan::Aggregate(aggr) = plan {
|
||||||
|
let mut new_plan = aggr.clone();
|
||||||
|
|
||||||
|
// transform aggr exprs
|
||||||
|
for (expr, transformer) in
|
||||||
|
new_plan.aggr_expr.iter_mut().zip(&expr_transformer)
|
||||||
|
{
|
||||||
|
if let Some(transformer) = transformer {
|
||||||
|
let new_expr = transformer(expr)?;
|
||||||
|
*expr = new_expr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// transform group exprs
|
||||||
|
for expr in new_plan.group_expr.iter_mut() {
|
||||||
|
// if let Some(transformer) = transformer {
|
||||||
|
// let new_expr = transformer(expr)?;
|
||||||
|
// *expr = new_expr;
|
||||||
|
// }
|
||||||
|
let expr_name = expr.name_for_alias().expect("not a sort expr");
|
||||||
|
*expr = Expr::Column(expr_name.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[DEBUG] new plan aggr expr: {:?}, group expr: {:?}",
|
||||||
|
new_plan.aggr_expr,
|
||||||
|
new_plan.group_expr
|
||||||
|
);
|
||||||
|
Some(LogicalPlan::Aggregate(new_plan))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
common_telemetry::info!("[DEBUG] done TransformedCommutative for aggr plan ");
|
||||||
|
|
||||||
|
Commutativity::TransformedCommutative(Some(transformer))
|
||||||
}
|
}
|
||||||
LogicalPlan::Sort(_) => {
|
LogicalPlan::Sort(_) => {
|
||||||
if partition_cols.is_empty() {
|
if partition_cols.is_empty() {
|
||||||
@@ -113,7 +269,7 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity {
|
pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity<LogicalPlan> {
|
||||||
match plan.name() {
|
match plan.name() {
|
||||||
name if name == EmptyMetric::name()
|
name if name == EmptyMetric::name()
|
||||||
|| name == InstantManipulate::name()
|
|| name == InstantManipulate::name()
|
||||||
@@ -129,7 +285,7 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_expr(expr: &Expr) -> Commutativity {
|
pub fn check_expr(expr: &Expr) -> Commutativity<Expr> {
|
||||||
match expr {
|
match expr {
|
||||||
Expr::Column(_)
|
Expr::Column(_)
|
||||||
| Expr::ScalarVariable(_, _)
|
| Expr::ScalarVariable(_, _)
|
||||||
@@ -155,13 +311,14 @@ impl Categorizer {
|
|||||||
| Expr::Case(_)
|
| Expr::Case(_)
|
||||||
| Expr::Cast(_)
|
| Expr::Cast(_)
|
||||||
| Expr::TryCast(_)
|
| Expr::TryCast(_)
|
||||||
| Expr::AggregateFunction(_)
|
|
||||||
| Expr::WindowFunction(_)
|
| Expr::WindowFunction(_)
|
||||||
| Expr::InList(_)
|
| Expr::InList(_)
|
||||||
| Expr::InSubquery(_)
|
| Expr::InSubquery(_)
|
||||||
| Expr::ScalarSubquery(_)
|
| Expr::ScalarSubquery(_)
|
||||||
| Expr::Wildcard { .. } => Commutativity::Unimplemented,
|
| Expr::Wildcard { .. } => Commutativity::Unimplemented,
|
||||||
|
|
||||||
|
Expr::AggregateFunction(aggr_fn) => Self::check_aggregate_fn(aggr_fn),
|
||||||
|
|
||||||
Expr::Alias(_)
|
Expr::Alias(_)
|
||||||
| Expr::Unnest(_)
|
| Expr::Unnest(_)
|
||||||
| Expr::GroupingSet(_)
|
| Expr::GroupingSet(_)
|
||||||
@@ -170,6 +327,59 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_aggregate_fn(aggr_fn: &AggregateFunction) -> Commutativity<Expr> {
|
||||||
|
common_telemetry::info!("[DEBUG] checking aggr_fn: {:?}", aggr_fn);
|
||||||
|
match &aggr_fn.func_def {
|
||||||
|
AggregateFunctionDefinition::BuiltIn(func_def) => match func_def {
|
||||||
|
BuiltInAggregateFunction::Max | BuiltInAggregateFunction::Min => {
|
||||||
|
// Commutativity::PartialCommutative
|
||||||
|
common_telemetry::info!("[DEBUG] checking min/max: {:?}", aggr_fn);
|
||||||
|
let mut new_fn = aggr_fn.clone();
|
||||||
|
let col_name = Expr::AggregateFunction(aggr_fn.clone())
|
||||||
|
.name_for_alias()
|
||||||
|
.expect("not a sort expr");
|
||||||
|
let alias = col_name.clone();
|
||||||
|
new_fn.args = vec![Expr::Column(col_name.into())];
|
||||||
|
|
||||||
|
// new_fn.func_def =
|
||||||
|
// AggregateFunctionDefinition::BuiltIn(BuiltInAggregateFunction::Sum);
|
||||||
|
Commutativity::ConditionalCommutative(Some(Arc::new(move |_| {
|
||||||
|
common_telemetry::info!("[DEBUG] transforming min/max fn: {:?}", new_fn);
|
||||||
|
Some(Expr::AggregateFunction(new_fn.clone()).alias(alias.clone()))
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
BuiltInAggregateFunction::Count => {
|
||||||
|
common_telemetry::info!("[DEBUG] checking count_fn: {:?}", aggr_fn);
|
||||||
|
let col_name = Expr::AggregateFunction(aggr_fn.clone())
|
||||||
|
.name_for_alias()
|
||||||
|
.expect("not a sort expr");
|
||||||
|
let sum_udf = Arc::new(AggregateUDF::new_from_impl(Sum::new()));
|
||||||
|
let alias = col_name.clone();
|
||||||
|
// let sum_func = Arc::new(AggregateFunction::new_udf(
|
||||||
|
// sum_udf,
|
||||||
|
// vec![Expr::Column(col_name.into())],
|
||||||
|
// false,
|
||||||
|
// None,
|
||||||
|
// None,
|
||||||
|
// None,
|
||||||
|
// ));
|
||||||
|
let mut sum_expr = aggr_fn.clone();
|
||||||
|
sum_expr.func_def = AggregateFunctionDefinition::UDF(sum_udf);
|
||||||
|
sum_expr.args = vec![Expr::Column(col_name.into())];
|
||||||
|
// let mut sum_fn = aggr_fn.clone();
|
||||||
|
// sum_fn.func_def =
|
||||||
|
// AggregateFunctionDefinition::BuiltIn(BuiltInAggregateFunction::Sum);
|
||||||
|
Commutativity::ConditionalCommutative(Some(Arc::new(move |_| {
|
||||||
|
common_telemetry::info!("[DEBUG] transforming sum_fn: {:?}", sum_expr);
|
||||||
|
Some(Expr::AggregateFunction(sum_expr.clone()).alias(alias.clone()))
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
_ => Commutativity::Unimplemented,
|
||||||
|
},
|
||||||
|
AggregateFunctionDefinition::UDF(_) => Commutativity::Unimplemented,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Return true if the given expr and partition cols satisfied the rule.
|
/// Return true if the given expr and partition cols satisfied the rule.
|
||||||
/// In this case the plan can be treated as fully commutative.
|
/// In this case the plan can be treated as fully commutative.
|
||||||
fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool {
|
fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool {
|
||||||
@@ -191,12 +401,16 @@ impl Categorizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
|
pub type Transformer<T> = Arc<dyn for<'a> Fn(&'a T) -> Option<T>>;
|
||||||
|
|
||||||
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
|
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
|
||||||
Some(plan.clone())
|
Some(plan.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn expr_partial_commutative_transformer(expr: &Expr) -> Option<Expr> {
|
||||||
|
Some(expr.clone())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use datafusion_expr::{LogicalPlanBuilder, Sort};
|
use datafusion_expr::{LogicalPlanBuilder, Sort};
|
||||||
|
|||||||
@@ -224,6 +224,11 @@ impl MergeScanExec {
|
|||||||
region_id,
|
region_id,
|
||||||
plan: plan.clone(),
|
plan: plan.clone(),
|
||||||
};
|
};
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Merge scan start poll stream, partition: {}, region_id: {}",
|
||||||
|
partition,
|
||||||
|
region_id,
|
||||||
|
);
|
||||||
let do_get_start = Instant::now();
|
let do_get_start = Instant::now();
|
||||||
let mut stream = region_query_handler
|
let mut stream = region_query_handler
|
||||||
.do_get(request)
|
.do_get(request)
|
||||||
@@ -255,7 +260,7 @@ impl MergeScanExec {
|
|||||||
// reset poll timer
|
// reset poll timer
|
||||||
poll_timer = Instant::now();
|
poll_timer = Instant::now();
|
||||||
}
|
}
|
||||||
common_telemetry::debug!(
|
common_telemetry::info!(
|
||||||
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
|
"Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
|
||||||
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
|
partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -19,11 +19,13 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
|
|||||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||||
use datafusion::physical_plan::repartition::RepartitionExec;
|
use datafusion::physical_plan::repartition::RepartitionExec;
|
||||||
use datafusion::physical_plan::sorts::sort::SortExec;
|
use datafusion::physical_plan::sorts::sort::SortExec;
|
||||||
|
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
|
||||||
use datafusion::physical_plan::ExecutionPlan;
|
use datafusion::physical_plan::ExecutionPlan;
|
||||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||||
use datafusion_common::Result as DataFusionResult;
|
use datafusion_common::Result as DataFusionResult;
|
||||||
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
|
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
|
||||||
use store_api::region_engine::PartitionRange;
|
use store_api::region_engine::PartitionRange;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
use table::table::scan::RegionScanExec;
|
use table::table::scan::RegionScanExec;
|
||||||
|
|
||||||
use crate::part_sort::PartSortExec;
|
use crate::part_sort::PartSortExec;
|
||||||
@@ -67,10 +69,12 @@ impl WindowedSortPhysicalRule {
|
|||||||
.transform_down(|plan| {
|
.transform_down(|plan| {
|
||||||
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
|
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
|
||||||
// TODO: support multiple expr in windowed sort
|
// TODO: support multiple expr in windowed sort
|
||||||
if !sort_exec.preserve_partitioning() || sort_exec.expr().len() != 1 {
|
if sort_exec.expr().len() != 1 {
|
||||||
return Ok(Transformed::no(plan));
|
return Ok(Transformed::no(plan));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let preserve_partitioning = sort_exec.preserve_partitioning();
|
||||||
|
|
||||||
let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())?
|
let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())?
|
||||||
else {
|
else {
|
||||||
return Ok(Transformed::no(plan));
|
return Ok(Transformed::no(plan));
|
||||||
@@ -97,6 +101,7 @@ impl WindowedSortPhysicalRule {
|
|||||||
sort_exec.input().clone()
|
sort_exec.input().clone()
|
||||||
} else {
|
} else {
|
||||||
Arc::new(PartSortExec::new(
|
Arc::new(PartSortExec::new(
|
||||||
|
scanner_info.region_id,
|
||||||
first_sort_expr.clone(),
|
first_sort_expr.clone(),
|
||||||
sort_exec.fetch(),
|
sort_exec.fetch(),
|
||||||
scanner_info.partition_ranges.clone(),
|
scanner_info.partition_ranges.clone(),
|
||||||
@@ -105,17 +110,30 @@ impl WindowedSortPhysicalRule {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let windowed_sort_exec = WindowedSortExec::try_new(
|
let windowed_sort_exec = WindowedSortExec::try_new(
|
||||||
|
scanner_info.region_id,
|
||||||
first_sort_expr,
|
first_sort_expr,
|
||||||
sort_exec.fetch(),
|
sort_exec.fetch(),
|
||||||
scanner_info.partition_ranges,
|
scanner_info.partition_ranges,
|
||||||
new_input,
|
new_input,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
return Ok(Transformed {
|
if !preserve_partitioning {
|
||||||
data: Arc::new(windowed_sort_exec),
|
let order_preserving_merge = SortPreservingMergeExec::new(
|
||||||
transformed: true,
|
sort_exec.expr().to_vec(),
|
||||||
tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
|
Arc::new(windowed_sort_exec),
|
||||||
});
|
);
|
||||||
|
return Ok(Transformed {
|
||||||
|
data: Arc::new(order_preserving_merge),
|
||||||
|
transformed: true,
|
||||||
|
tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return Ok(Transformed {
|
||||||
|
data: Arc::new(windowed_sort_exec),
|
||||||
|
transformed: true,
|
||||||
|
tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Transformed::no(plan))
|
Ok(Transformed::no(plan))
|
||||||
@@ -126,7 +144,9 @@ impl WindowedSortPhysicalRule {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct ScannerInfo {
|
struct ScannerInfo {
|
||||||
|
region_id: RegionId,
|
||||||
partition_ranges: Vec<Vec<PartitionRange>>,
|
partition_ranges: Vec<Vec<PartitionRange>>,
|
||||||
time_index: String,
|
time_index: String,
|
||||||
tag_columns: Vec<String>,
|
tag_columns: Vec<String>,
|
||||||
@@ -136,11 +156,12 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
|||||||
let mut partition_ranges = None;
|
let mut partition_ranges = None;
|
||||||
let mut time_index = None;
|
let mut time_index = None;
|
||||||
let mut tag_columns = None;
|
let mut tag_columns = None;
|
||||||
|
let mut region_id = None;
|
||||||
|
let mut is_batch_coalesced = false;
|
||||||
|
|
||||||
input.transform_up(|plan| {
|
input.transform_up(|plan| {
|
||||||
// Unappliable case, reset the state.
|
// Unappliable case, reset the state.
|
||||||
if plan.as_any().is::<RepartitionExec>()
|
if plan.as_any().is::<RepartitionExec>()
|
||||||
|| plan.as_any().is::<CoalesceBatchesExec>()
|
|
||||||
|| plan.as_any().is::<CoalescePartitionsExec>()
|
|| plan.as_any().is::<CoalescePartitionsExec>()
|
||||||
|| plan.as_any().is::<SortExec>()
|
|| plan.as_any().is::<SortExec>()
|
||||||
|| plan.as_any().is::<WindowedSortExec>()
|
|| plan.as_any().is::<WindowedSortExec>()
|
||||||
@@ -148,13 +169,20 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
|||||||
partition_ranges = None;
|
partition_ranges = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if plan.as_any().is::<CoalesceBatchesExec>() {
|
||||||
|
is_batch_coalesced = true;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
||||||
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
|
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
|
||||||
time_index = Some(region_scan_exec.time_index());
|
time_index = Some(region_scan_exec.time_index());
|
||||||
tag_columns = Some(region_scan_exec.tag_columns());
|
tag_columns = Some(region_scan_exec.tag_columns());
|
||||||
|
region_id = Some(region_scan_exec.region_id());
|
||||||
|
|
||||||
// set distinguish_partition_ranges to true, this is an incorrect workaround
|
// set distinguish_partition_ranges to true, this is an incorrect workaround
|
||||||
region_scan_exec.with_distinguish_partition_range(true);
|
if !is_batch_coalesced {
|
||||||
|
region_scan_exec.with_distinguish_partition_range(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Transformed::no(plan))
|
Ok(Transformed::no(plan))
|
||||||
@@ -162,6 +190,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
|||||||
|
|
||||||
let result = try {
|
let result = try {
|
||||||
ScannerInfo {
|
ScannerInfo {
|
||||||
|
region_id: region_id?,
|
||||||
partition_ranges: partition_ranges?,
|
partition_ranges: partition_ranges?,
|
||||||
time_index: time_index?,
|
time_index: time_index?,
|
||||||
tag_columns: tag_columns?,
|
tag_columns: tag_columns?,
|
||||||
|
|||||||
@@ -12,6 +12,12 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Module for sorting input data within each [`PartitionRange`].
|
||||||
|
//!
|
||||||
|
//! This module defines the [`PartSortExec`] execution plan, which sorts each
|
||||||
|
//! partition ([`PartitionRange`]) independently based on the provided physical
|
||||||
|
//! sort expressions.
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -35,8 +41,9 @@ use futures::Stream;
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use snafu::location;
|
use snafu::location;
|
||||||
use store_api::region_engine::PartitionRange;
|
use store_api::region_engine::PartitionRange;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::downcast_ts_array;
|
use crate::{array_iter_helper, downcast_ts_array};
|
||||||
|
|
||||||
/// Sort input within given PartitionRange
|
/// Sort input within given PartitionRange
|
||||||
///
|
///
|
||||||
@@ -45,6 +52,7 @@ use crate::downcast_ts_array;
|
|||||||
/// and this operator will sort each partition independently within the partition.
|
/// and this operator will sort each partition independently within the partition.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PartSortExec {
|
pub struct PartSortExec {
|
||||||
|
region_id: RegionId,
|
||||||
/// Physical sort expressions(that is, sort by timestamp)
|
/// Physical sort expressions(that is, sort by timestamp)
|
||||||
expression: PhysicalSortExpr,
|
expression: PhysicalSortExpr,
|
||||||
limit: Option<usize>,
|
limit: Option<usize>,
|
||||||
@@ -57,6 +65,7 @@ pub struct PartSortExec {
|
|||||||
|
|
||||||
impl PartSortExec {
|
impl PartSortExec {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
region_id: RegionId,
|
||||||
expression: PhysicalSortExpr,
|
expression: PhysicalSortExpr,
|
||||||
limit: Option<usize>,
|
limit: Option<usize>,
|
||||||
partition_ranges: Vec<Vec<PartitionRange>>,
|
partition_ranges: Vec<Vec<PartitionRange>>,
|
||||||
@@ -70,6 +79,7 @@ impl PartSortExec {
|
|||||||
);
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
region_id,
|
||||||
expression,
|
expression,
|
||||||
limit,
|
limit,
|
||||||
input,
|
input,
|
||||||
@@ -88,14 +98,20 @@ impl PartSortExec {
|
|||||||
self.input.execute(partition, context.clone())?;
|
self.input.execute(partition, context.clone())?;
|
||||||
|
|
||||||
if partition >= self.partition_ranges.len() {
|
if partition >= self.partition_ranges.len() {
|
||||||
|
common_telemetry::error!(
|
||||||
|
"to_stream: Partition index out of range: {} >= {}",
|
||||||
|
partition,
|
||||||
|
self.partition_ranges.len()
|
||||||
|
);
|
||||||
internal_err!(
|
internal_err!(
|
||||||
"Partition index out of range: {} >= {}",
|
"to_stream: Partition index out of range: {} >= {}",
|
||||||
partition,
|
partition,
|
||||||
self.partition_ranges.len()
|
self.partition_ranges.len()
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let df_stream = Box::pin(PartSortStream::new(
|
let df_stream = Box::pin(PartSortStream::new(
|
||||||
|
self.region_id,
|
||||||
context,
|
context,
|
||||||
self,
|
self,
|
||||||
self.limit,
|
self.limit,
|
||||||
@@ -150,6 +166,7 @@ impl ExecutionPlan for PartSortExec {
|
|||||||
internal_err!("No children found")?
|
internal_err!("No children found")?
|
||||||
};
|
};
|
||||||
Ok(Arc::new(Self::new(
|
Ok(Arc::new(Self::new(
|
||||||
|
self.region_id,
|
||||||
self.expression.clone(),
|
self.expression.clone(),
|
||||||
self.limit,
|
self.limit,
|
||||||
self.partition_ranges.clone(),
|
self.partition_ranges.clone(),
|
||||||
@@ -180,6 +197,7 @@ impl ExecutionPlan for PartSortExec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct PartSortStream {
|
struct PartSortStream {
|
||||||
|
region_id: RegionId,
|
||||||
/// Memory pool for this stream
|
/// Memory pool for this stream
|
||||||
reservation: MemoryReservation,
|
reservation: MemoryReservation,
|
||||||
buffer: Vec<DfRecordBatch>,
|
buffer: Vec<DfRecordBatch>,
|
||||||
@@ -193,11 +211,14 @@ struct PartSortStream {
|
|||||||
#[allow(dead_code)] // this is used under #[debug_assertions]
|
#[allow(dead_code)] // this is used under #[debug_assertions]
|
||||||
partition: usize,
|
partition: usize,
|
||||||
cur_part_idx: usize,
|
cur_part_idx: usize,
|
||||||
|
evaluating_batch: Option<DfRecordBatch>,
|
||||||
metrics: BaselineMetrics,
|
metrics: BaselineMetrics,
|
||||||
|
pending: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartSortStream {
|
impl PartSortStream {
|
||||||
fn new(
|
fn new(
|
||||||
|
region_id: RegionId,
|
||||||
context: Arc<TaskContext>,
|
context: Arc<TaskContext>,
|
||||||
sort: &PartSortExec,
|
sort: &PartSortExec,
|
||||||
limit: Option<usize>,
|
limit: Option<usize>,
|
||||||
@@ -205,7 +226,11 @@ impl PartSortStream {
|
|||||||
partition_ranges: Vec<PartitionRange>,
|
partition_ranges: Vec<PartitionRange>,
|
||||||
partition: usize,
|
partition: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {region_id} Partition {partition} new stream with ranges: {partition_ranges:?}"
|
||||||
|
);
|
||||||
Self {
|
Self {
|
||||||
|
region_id,
|
||||||
reservation: MemoryConsumer::new("PartSortStream".to_string())
|
reservation: MemoryConsumer::new("PartSortStream".to_string())
|
||||||
.register(&context.runtime_env().memory_pool),
|
.register(&context.runtime_env().memory_pool),
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
@@ -218,7 +243,9 @@ impl PartSortStream {
|
|||||||
partition_ranges,
|
partition_ranges,
|
||||||
partition,
|
partition,
|
||||||
cur_part_idx: 0,
|
cur_part_idx: 0,
|
||||||
|
evaluating_batch: None,
|
||||||
metrics: BaselineMetrics::new(&sort.metrics, partition),
|
metrics: BaselineMetrics::new(&sort.metrics, partition),
|
||||||
|
pending: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -269,8 +296,14 @@ impl PartSortStream {
|
|||||||
min_max_idx: (usize, usize),
|
min_max_idx: (usize, usize),
|
||||||
) -> datafusion_common::Result<()> {
|
) -> datafusion_common::Result<()> {
|
||||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||||
|
common_telemetry::error!(
|
||||||
|
"check_in_range: Partition index out of range: {} >= {}, ranges: {:?}",
|
||||||
|
self.cur_part_idx,
|
||||||
|
self.partition_ranges.len(),
|
||||||
|
self.partition_ranges,
|
||||||
|
);
|
||||||
internal_err!(
|
internal_err!(
|
||||||
"Partition index out of range: {} >= {}",
|
"check_in_range: Partition index out of range: {} >= {}",
|
||||||
self.cur_part_idx,
|
self.cur_part_idx,
|
||||||
self.partition_ranges.len()
|
self.partition_ranges.len()
|
||||||
)?;
|
)?;
|
||||||
@@ -288,13 +321,90 @@ impl PartSortStream {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try find data whose value exceeds the current partition range.
|
||||||
|
///
|
||||||
|
/// Returns `None` if no such data is found, and `Some(idx)` where idx points to
|
||||||
|
/// the first data that exceeds the current partition range.
|
||||||
|
fn try_find_next_range(
|
||||||
|
&self,
|
||||||
|
sort_column: &ArrayRef,
|
||||||
|
) -> datafusion_common::Result<Option<usize>> {
|
||||||
|
if sort_column.len() == 0 {
|
||||||
|
return Ok(Some(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if the current partition index is out of range
|
||||||
|
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||||
|
common_telemetry::error!(
|
||||||
|
"try_find_next_range: Region {} Partition {} index out of range: {} >= {}, ranges: {:?}",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
self.partition_ranges.len(),
|
||||||
|
self.partition_ranges,
|
||||||
|
);
|
||||||
|
internal_err!(
|
||||||
|
"try_find_next_range: Region {} Partition {} index out of range: {} >= {}",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
self.partition_ranges.len()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
let cur_range = self.partition_ranges[self.cur_part_idx];
|
||||||
|
|
||||||
|
let sort_column_iter = downcast_ts_array!(
|
||||||
|
sort_column.data_type() => (array_iter_helper, sort_column),
|
||||||
|
_ => internal_err!(
|
||||||
|
"Unsupported data type for sort column: {:?}",
|
||||||
|
sort_column.data_type()
|
||||||
|
)?,
|
||||||
|
);
|
||||||
|
|
||||||
|
match sort_column.data_type() {
|
||||||
|
arrow_schema::DataType::Timestamp(unit, _) => {
|
||||||
|
assert_eq!(cur_range.start.unit().as_arrow_time_unit(), *unit);
|
||||||
|
assert_eq!(cur_range.end.unit().as_arrow_time_unit(), *unit);
|
||||||
|
}
|
||||||
|
_ => panic!(
|
||||||
|
"Unsupported data type for sort column: {:?}",
|
||||||
|
sort_column.data_type()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
for (idx, val) in sort_column_iter {
|
||||||
|
// ignore vacant time index data
|
||||||
|
if let Some(val) = val {
|
||||||
|
if val >= cur_range.end.value() || val < cur_range.start.value() {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} find val {} at index {} out of range {}: {:?}",
|
||||||
|
self.region_id, self.partition, val, idx, self.cur_part_idx, cur_range,
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(Some(idx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
/// Sort and clear the buffer and return the sorted record batch
|
/// Sort and clear the buffer and return the sorted record batch
|
||||||
///
|
///
|
||||||
/// this function should return a empty record batch if the buffer is empty
|
/// this function will return a empty record batch if the buffer is empty
|
||||||
fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
|
fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
|
||||||
if self.buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
|
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} sort {} rows",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
self.buffer.iter().map(|b| b.num_rows()).sum::<usize>(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut sort_columns = Vec::with_capacity(self.buffer.len());
|
let mut sort_columns = Vec::with_capacity(self.buffer.len());
|
||||||
let mut opt = None;
|
let mut opt = None;
|
||||||
for batch in self.buffer.iter() {
|
for batch in self.buffer.iter() {
|
||||||
@@ -317,6 +427,9 @@ impl PartSortStream {
|
|||||||
Some(format!("Fail to sort to indices at {}", location!())),
|
Some(format!("Fail to sort to indices at {}", location!())),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
if indices.is_empty() {
|
||||||
|
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
self.check_in_range(
|
self.check_in_range(
|
||||||
&sort_column,
|
&sort_column,
|
||||||
@@ -374,11 +487,89 @@ impl PartSortStream {
|
|||||||
Ok(sorted)
|
Ok(sorted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn split_batch(
|
||||||
|
&mut self,
|
||||||
|
batch: DfRecordBatch,
|
||||||
|
) -> datafusion_common::Result<Option<DfRecordBatch>> {
|
||||||
|
if batch.num_rows() == 0 {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let sort_column = self
|
||||||
|
.expression
|
||||||
|
.expr
|
||||||
|
.evaluate(&batch)?
|
||||||
|
.into_array(batch.num_rows())?;
|
||||||
|
|
||||||
|
let next_range_idx = self.try_find_next_range(&sort_column)?;
|
||||||
|
let Some(idx) = next_range_idx else {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
batch.num_rows(),
|
||||||
|
);
|
||||||
|
self.buffer.push(batch);
|
||||||
|
// keep polling input for next batch
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let this_range = batch.slice(0, idx);
|
||||||
|
let remaining_range = batch.slice(idx, batch.num_rows() - idx);
|
||||||
|
if this_range.num_rows() != 0 {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
batch.num_rows(),
|
||||||
|
);
|
||||||
|
self.buffer.push(this_range);
|
||||||
|
}
|
||||||
|
// mark end of current PartitionRange
|
||||||
|
let sorted_batch = self.sort_buffer();
|
||||||
|
// step to next proper PartitionRange
|
||||||
|
self.cur_part_idx += 1;
|
||||||
|
let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
|
||||||
|
if self.try_find_next_range(&next_sort_column)?.is_some() {
|
||||||
|
// remaining batch still contains data that exceeds the current partition range
|
||||||
|
// register the remaining batch for next polling
|
||||||
|
self.evaluating_batch = Some(remaining_range);
|
||||||
|
} else {
|
||||||
|
// remaining batch is within the current partition range
|
||||||
|
// push to the buffer and continue polling
|
||||||
|
if remaining_range.num_rows() != 0 {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
batch.num_rows(),
|
||||||
|
);
|
||||||
|
self.buffer.push(remaining_range);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sorted_batch.map(|x| if x.num_rows() == 0 { None } else { Some(x) })
|
||||||
|
}
|
||||||
|
|
||||||
pub fn poll_next_inner(
|
pub fn poll_next_inner(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
|
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
|
||||||
|
if self.pending {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} poll from pending",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
);
|
||||||
|
self.as_mut().pending = false;
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
// no more input, sort the buffer and return
|
||||||
if self.input_complete {
|
if self.input_complete {
|
||||||
if self.buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
@@ -386,30 +577,51 @@ impl PartSortStream {
|
|||||||
return Poll::Ready(Some(self.sort_buffer()));
|
return Poll::Ready(Some(self.sort_buffer()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if there is a remaining batch being evaluated from last run,
|
||||||
|
// split on it instead of fetching new batch
|
||||||
|
if let Some(evaluating_batch) = self.evaluating_batch.take()
|
||||||
|
&& evaluating_batch.num_rows() != 0
|
||||||
|
{
|
||||||
|
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
|
||||||
|
return Poll::Ready(Some(Ok(sorted_batch)));
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch next batch from input
|
||||||
let res = self.input.as_mut().poll_next(cx);
|
let res = self.input.as_mut().poll_next(cx);
|
||||||
match res {
|
match res {
|
||||||
Poll::Ready(Some(Ok(batch))) => {
|
Poll::Ready(Some(Ok(batch))) => {
|
||||||
if batch.num_rows() == 0 {
|
if let Some(sorted_batch) = self.split_batch(batch)? {
|
||||||
// mark end of current PartitionRange
|
|
||||||
let sorted_batch = self.sort_buffer()?;
|
|
||||||
self.cur_part_idx += 1;
|
|
||||||
if sorted_batch.num_rows() == 0 {
|
|
||||||
// Current part is empty, continue polling next part.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
return Poll::Ready(Some(Ok(sorted_batch)));
|
return Poll::Ready(Some(Ok(sorted_batch)));
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
self.buffer.push(batch);
|
|
||||||
// keep polling until boundary(a empty RecordBatch) is reached
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
// input stream end, sort the buffer and return
|
// input stream end, mark and continue
|
||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
self.input_complete = true;
|
self.input_complete = true;
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} input complete",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
|
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => {
|
||||||
|
self.as_mut().pending = true;
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[PartSortStream] Region {} Partition {} part index {} is pending",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
self.cur_part_idx,
|
||||||
|
);
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -448,6 +660,7 @@ mod test {
|
|||||||
use crate::test_util::{new_ts_array, MockInputExec};
|
use crate::test_util::{new_ts_array, MockInputExec};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[ignore = "behavior changed"]
|
||||||
async fn fuzzy_test() {
|
async fn fuzzy_test() {
|
||||||
let test_cnt = 100;
|
let test_cnt = 100;
|
||||||
let part_cnt_bound = 100;
|
let part_cnt_bound = 100;
|
||||||
@@ -490,7 +703,11 @@ mod test {
|
|||||||
// generate each `PartitionRange`'s timestamp range
|
// generate each `PartitionRange`'s timestamp range
|
||||||
let (start, end) = if descending {
|
let (start, end) = if descending {
|
||||||
let end = bound_val
|
let end = bound_val
|
||||||
.map(|i| i.checked_sub(rng.i64(0..range_offset_bound)).expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again"))
|
.map(
|
||||||
|
|i| i
|
||||||
|
.checked_sub(rng.i64(0..range_offset_bound))
|
||||||
|
.expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")
|
||||||
|
)
|
||||||
.unwrap_or_else(|| rng.i64(..));
|
.unwrap_or_else(|| rng.i64(..));
|
||||||
bound_val = Some(end);
|
bound_val = Some(end);
|
||||||
let start = end - rng.i64(1..range_size_bound);
|
let start = end - rng.i64(1..range_size_bound);
|
||||||
@@ -578,7 +795,7 @@ mod test {
|
|||||||
((5, 10), vec![vec![5, 6], vec![7, 8]]),
|
((5, 10), vec![vec![5, 6], vec![7, 8]]),
|
||||||
],
|
],
|
||||||
false,
|
false,
|
||||||
vec![vec![1, 2, 3, 4, 5, 6, 7, 8, 9], vec![5, 6, 7, 8]],
|
vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]],
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
TimeUnit::Millisecond,
|
TimeUnit::Millisecond,
|
||||||
@@ -664,7 +881,14 @@ mod test {
|
|||||||
})
|
})
|
||||||
.collect_vec();
|
.collect_vec();
|
||||||
|
|
||||||
run_test(0, input_ranged_data, schema.clone(), opt, expected_output).await;
|
run_test(
|
||||||
|
identifier,
|
||||||
|
input_ranged_data,
|
||||||
|
schema.clone(),
|
||||||
|
opt,
|
||||||
|
expected_output,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -687,6 +911,7 @@ mod test {
|
|||||||
let mock_input = MockInputExec::new(batches, schema.clone());
|
let mock_input = MockInputExec::new(batches, schema.clone());
|
||||||
|
|
||||||
let exec = PartSortExec::new(
|
let exec = PartSortExec::new(
|
||||||
|
RegionId::new(0, 0),
|
||||||
PhysicalSortExpr {
|
PhysicalSortExpr {
|
||||||
expr: Arc::new(Column::new("ts", 0)),
|
expr: Arc::new(Column::new("ts", 0)),
|
||||||
options: opt,
|
options: opt,
|
||||||
@@ -714,8 +939,8 @@ mod test {
|
|||||||
buf.push(b',');
|
buf.push(b',');
|
||||||
}
|
}
|
||||||
// TODO(discord9): better ways to print buf
|
// TODO(discord9): better ways to print buf
|
||||||
let _buf = String::from_utf8_lossy(&buf);
|
let buf = String::from_utf8_lossy(&buf);
|
||||||
full_msg += &format!("case_id:{case_id}, real_output");
|
full_msg += &format!("\ncase_id:{case_id}, real_output \n{buf}\n");
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut buf = Vec::with_capacity(10 * real_output.len());
|
let mut buf = Vec::with_capacity(10 * real_output.len());
|
||||||
@@ -727,8 +952,8 @@ mod test {
|
|||||||
buf.append(&mut rb_json);
|
buf.append(&mut rb_json);
|
||||||
buf.push(b',');
|
buf.push(b',');
|
||||||
}
|
}
|
||||||
let _buf = String::from_utf8_lossy(&buf);
|
let buf = String::from_utf8_lossy(&buf);
|
||||||
full_msg += &format!("case_id:{case_id}, expected_output");
|
full_msg += &format!("case_id:{case_id}, expected_output \n{buf}");
|
||||||
}
|
}
|
||||||
panic!(
|
panic!(
|
||||||
"case_{} failed, opt: {:?}, full msg: {}",
|
"case_{} failed, opt: {:?}, full msg: {}",
|
||||||
|
|||||||
@@ -20,8 +20,9 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use arrow::array::{Array, ArrayRef, PrimitiveArray};
|
use arrow::array::{Array, ArrayRef};
|
||||||
use arrow::compute::SortColumn;
|
use arrow::compute::SortColumn;
|
||||||
use arrow_schema::{DataType, SchemaRef, SortOptions};
|
use arrow_schema::{DataType, SchemaRef, SortOptions};
|
||||||
use common_error::ext::{BoxedError, PlainError};
|
use common_error::ext::{BoxedError, PlainError};
|
||||||
@@ -45,6 +46,7 @@ use futures::Stream;
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use store_api::region_engine::PartitionRange;
|
use store_api::region_engine::PartitionRange;
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::error::{QueryExecutionSnafu, Result};
|
use crate::error::{QueryExecutionSnafu, Result};
|
||||||
|
|
||||||
@@ -64,6 +66,7 @@ use crate::error::{QueryExecutionSnafu, Result};
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct WindowedSortExec {
|
pub struct WindowedSortExec {
|
||||||
|
region_id: RegionId,
|
||||||
/// Physical sort expressions(that is, sort by timestamp)
|
/// Physical sort expressions(that is, sort by timestamp)
|
||||||
expression: PhysicalSortExpr,
|
expression: PhysicalSortExpr,
|
||||||
/// Optional number of rows to fetch. Stops producing rows after this fetch
|
/// Optional number of rows to fetch. Stops producing rows after this fetch
|
||||||
@@ -110,6 +113,7 @@ fn check_partition_range_monotonicity(
|
|||||||
|
|
||||||
impl WindowedSortExec {
|
impl WindowedSortExec {
|
||||||
pub fn try_new(
|
pub fn try_new(
|
||||||
|
region_id: RegionId,
|
||||||
expression: PhysicalSortExpr,
|
expression: PhysicalSortExpr,
|
||||||
fetch: Option<usize>,
|
fetch: Option<usize>,
|
||||||
ranges: Vec<Vec<PartitionRange>>,
|
ranges: Vec<Vec<PartitionRange>>,
|
||||||
@@ -126,6 +130,8 @@ impl WindowedSortExec {
|
|||||||
input.execution_mode(),
|
input.execution_mode(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
let mut all_avail_working_range = Vec::with_capacity(ranges.len());
|
let mut all_avail_working_range = Vec::with_capacity(ranges.len());
|
||||||
for r in &ranges {
|
for r in &ranges {
|
||||||
let overlap_counts = split_overlapping_ranges(r);
|
let overlap_counts = split_overlapping_ranges(r);
|
||||||
@@ -134,7 +140,13 @@ impl WindowedSortExec {
|
|||||||
all_avail_working_range.push(working_ranges);
|
all_avail_working_range.push(working_ranges);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"WindowSortExec compute working ranges, cost: {:?}",
|
||||||
|
start.elapsed()
|
||||||
|
);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
region_id,
|
||||||
expression,
|
expression,
|
||||||
fetch,
|
fetch,
|
||||||
ranges,
|
ranges,
|
||||||
@@ -157,6 +169,7 @@ impl WindowedSortExec {
|
|||||||
self.input.execute(partition, context.clone())?;
|
self.input.execute(partition, context.clone())?;
|
||||||
|
|
||||||
let df_stream = Box::pin(WindowedSortStream::new(
|
let df_stream = Box::pin(WindowedSortStream::new(
|
||||||
|
self.region_id,
|
||||||
context,
|
context,
|
||||||
self,
|
self,
|
||||||
input_stream,
|
input_stream,
|
||||||
@@ -209,6 +222,7 @@ impl ExecutionPlan for WindowedSortExec {
|
|||||||
internal_err!("No children found")?
|
internal_err!("No children found")?
|
||||||
};
|
};
|
||||||
let new = Self::try_new(
|
let new = Self::try_new(
|
||||||
|
self.region_id,
|
||||||
self.expression.clone(),
|
self.expression.clone(),
|
||||||
self.fetch,
|
self.fetch,
|
||||||
self.ranges.clone(),
|
self.ranges.clone(),
|
||||||
@@ -283,10 +297,14 @@ pub struct WindowedSortStream {
|
|||||||
ranges: Vec<PartitionRange>,
|
ranges: Vec<PartitionRange>,
|
||||||
/// Execution metrics
|
/// Execution metrics
|
||||||
metrics: BaselineMetrics,
|
metrics: BaselineMetrics,
|
||||||
|
pending: bool,
|
||||||
|
region_id: RegionId,
|
||||||
|
partition: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WindowedSortStream {
|
impl WindowedSortStream {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
region_id: RegionId,
|
||||||
context: Arc<TaskContext>,
|
context: Arc<TaskContext>,
|
||||||
exec: &WindowedSortExec,
|
exec: &WindowedSortExec,
|
||||||
input: DfSendableRecordBatchStream,
|
input: DfSendableRecordBatchStream,
|
||||||
@@ -310,6 +328,9 @@ impl WindowedSortStream {
|
|||||||
all_avail_working_range: exec.all_avail_working_range[partition].clone(),
|
all_avail_working_range: exec.all_avail_working_range[partition].clone(),
|
||||||
ranges: exec.ranges[partition].clone(),
|
ranges: exec.ranges[partition].clone(),
|
||||||
metrics: BaselineMetrics::new(&exec.metrics, partition),
|
metrics: BaselineMetrics::new(&exec.metrics, partition),
|
||||||
|
pending: false,
|
||||||
|
region_id,
|
||||||
|
partition,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -410,6 +431,12 @@ impl WindowedSortStream {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
|
self.pending = true;
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[WindowedSortStream] Region {} Partition {} is pending",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
);
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -450,7 +477,15 @@ impl WindowedSortStream {
|
|||||||
self.is_terminated = true;
|
self.is_terminated = true;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => {
|
||||||
|
self.as_mut().pending = true;
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[WindowedSortStream] Region {} Partition {} is pending",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
);
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(SortedRunSet {
|
let Some(SortedRunSet {
|
||||||
@@ -665,6 +700,15 @@ impl Stream for WindowedSortStream {
|
|||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
|
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
|
||||||
|
if self.pending {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"[WindowedSortStream] Region {} Partition {} poll from pending",
|
||||||
|
self.region_id,
|
||||||
|
self.partition,
|
||||||
|
);
|
||||||
|
self.as_mut().pending = false;
|
||||||
|
}
|
||||||
|
|
||||||
let result = self.as_mut().poll_next_inner(cx);
|
let result = self.as_mut().poll_next_inner(cx);
|
||||||
self.metrics.record_poll(result)
|
self.metrics.record_poll(result)
|
||||||
}
|
}
|
||||||
@@ -812,9 +856,16 @@ fn find_slice_from_range(
|
|||||||
Ok((start, end - start))
|
Ok((start, end - start))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get an iterator from a primitive array.
|
||||||
|
///
|
||||||
|
/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
|
||||||
|
#[macro_export]
|
||||||
macro_rules! array_iter_helper {
|
macro_rules! array_iter_helper {
|
||||||
($t:ty, $unit:expr, $arr:expr) => {{
|
($t:ty, $unit:expr, $arr:expr) => {{
|
||||||
let typed = $arr.as_any().downcast_ref::<PrimitiveArray<$t>>().unwrap();
|
let typed = $arr
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
|
||||||
|
.unwrap();
|
||||||
let iter = typed.iter().enumerate();
|
let iter = typed.iter().enumerate();
|
||||||
Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
|
Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
|
||||||
}};
|
}};
|
||||||
@@ -2511,6 +2562,7 @@ mod test {
|
|||||||
let mock_input = MockInputExec::new(batches, self.schema.clone());
|
let mock_input = MockInputExec::new(batches, self.schema.clone());
|
||||||
|
|
||||||
let exec = WindowedSortExec::try_new(
|
let exec = WindowedSortExec::try_new(
|
||||||
|
RegionId::new(0, 0),
|
||||||
self.expression.clone(),
|
self.expression.clone(),
|
||||||
self.fetch,
|
self.fetch,
|
||||||
vec![ranges],
|
vec![ranges],
|
||||||
|
|||||||
@@ -77,6 +77,7 @@ use crate::query_handler::{
|
|||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
|
|
||||||
pub mod authorize;
|
pub mod authorize;
|
||||||
|
pub mod dump;
|
||||||
pub mod dyn_log;
|
pub mod dyn_log;
|
||||||
pub mod event;
|
pub mod event;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
@@ -744,6 +745,7 @@ impl HttpServer {
|
|||||||
"/log_level",
|
"/log_level",
|
||||||
routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler),
|
routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler),
|
||||||
)
|
)
|
||||||
|
.route("/dump_tasks", routing::get(dump::dump_tasks_handler))
|
||||||
.nest(
|
.nest(
|
||||||
"/prof",
|
"/prof",
|
||||||
Router::new()
|
Router::new()
|
||||||
|
|||||||
72
src/servers/src/http/dump.rs
Normal file
72
src/servers/src/http/dump.rs
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::fmt::Write;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use axum::extract::Query;
|
||||||
|
use axum::http::StatusCode;
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::runtime::Handle;
|
||||||
|
|
||||||
|
use crate::error::Result;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct DumpQuery {
|
||||||
|
seconds: u64,
|
||||||
|
path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for DumpQuery {
|
||||||
|
fn default() -> DumpQuery {
|
||||||
|
DumpQuery {
|
||||||
|
seconds: 5,
|
||||||
|
path: "/tmp/greptimedb/dump-tasks.txt".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[axum_macros::debug_handler]
|
||||||
|
pub async fn dump_tasks_handler(Query(req): Query<DumpQuery>) -> Result<impl IntoResponse> {
|
||||||
|
common_telemetry::info!("Dump start, request: {:?}", req);
|
||||||
|
|
||||||
|
let handle = Handle::current();
|
||||||
|
let Ok(mut file) = tokio::fs::File::create(&req.path).await.inspect_err(|e| {
|
||||||
|
common_telemetry::error!(e; "Failed to open to {}", req.path);
|
||||||
|
}) else {
|
||||||
|
return Ok((StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file."));
|
||||||
|
};
|
||||||
|
if let Ok(dump) = tokio::time::timeout(Duration::from_secs(req.seconds), handle.dump()).await {
|
||||||
|
for (i, task) in dump.tasks().iter().enumerate() {
|
||||||
|
let trace = task.trace();
|
||||||
|
let mut lines = String::new();
|
||||||
|
writeln!(lines, "TASK {i}:").unwrap();
|
||||||
|
writeln!(lines, "{trace}\n").unwrap();
|
||||||
|
if let Err(e) = file.write_all(lines.as_bytes()).await {
|
||||||
|
common_telemetry::error!(e; "Failed to open to {}", req.path);
|
||||||
|
return Ok((StatusCode::INTERNAL_SERVER_ERROR, "Failed to write file."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
common_telemetry::info!("Dump tasks timeout.");
|
||||||
|
return Ok((StatusCode::REQUEST_TIMEOUT, "Dump tasks timeout."));
|
||||||
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!("Dump tasks done.");
|
||||||
|
Ok((StatusCode::OK, "Dump tasks done."))
|
||||||
|
}
|
||||||
@@ -36,6 +36,7 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSort
|
|||||||
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
|
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||||
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::table::metrics::StreamMetrics;
|
use crate::table::metrics::StreamMetrics;
|
||||||
|
|
||||||
@@ -166,6 +167,10 @@ impl RegionScanExec {
|
|||||||
.map(|col| col.column_schema.name.clone())
|
.map(|col| col.column_schema.name.clone())
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn region_id(&self) -> RegionId {
|
||||||
|
self.scanner.lock().unwrap().metadata().region_id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExecutionPlan for RegionScanExec {
|
impl ExecutionPlan for RegionScanExec {
|
||||||
|
|||||||
@@ -897,6 +897,8 @@ sst_write_buffer_size = "8MiB"
|
|||||||
parallel_scan_channel_size = 32
|
parallel_scan_channel_size = 32
|
||||||
allow_stale_entries = false
|
allow_stale_entries = false
|
||||||
min_compaction_interval = "0s"
|
min_compaction_interval = "0s"
|
||||||
|
skip_wal = false
|
||||||
|
compression_method = "zstd"
|
||||||
|
|
||||||
[region_engine.mito.index]
|
[region_engine.mito.index]
|
||||||
aux_path = ""
|
aux_path = ""
|
||||||
|
|||||||
Reference in New Issue
Block a user