mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 08:20:36 +00:00
feat: tune constants (#7851)
* feat: tune constants Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * cap output batch size Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * handle empty input Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * one more ut for cr Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -23,7 +23,9 @@ use datafusion::error::Result as DatafusionResult;
|
||||
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
|
||||
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
|
||||
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
|
||||
use datafusion::parquet::file::metadata::ParquetMetaData;
|
||||
use datafusion::parquet::file::metadata::{
|
||||
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use datafusion_datasource::PartitionedFile;
|
||||
@@ -94,35 +96,40 @@ impl DefaultParquetFileReaderFactory {
|
||||
}
|
||||
|
||||
impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
|
||||
// TODO(weny): Supports [`metadata_size_hint`].
|
||||
// The upstream has a implementation supports [`metadata_size_hint`],
|
||||
// however it coupled with Box<dyn ObjectStore>.
|
||||
fn create_reader(
|
||||
&self,
|
||||
_partition_index: usize,
|
||||
partitioned_file: PartitionedFile,
|
||||
_metadata_size_hint: Option<usize>,
|
||||
metadata_size_hint: Option<usize>,
|
||||
_metrics: &ExecutionPlanMetricsSet,
|
||||
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
|
||||
let path = partitioned_file.path().to_string();
|
||||
let object_store = self.object_store.clone();
|
||||
|
||||
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
|
||||
Ok(Box::new(LazyParquetFileReader::new(
|
||||
object_store,
|
||||
path,
|
||||
metadata_size_hint,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LazyParquetFileReader {
|
||||
object_store: ObjectStore,
|
||||
reader: Option<Compat<FuturesAsyncReader>>,
|
||||
file_size: Option<u64>,
|
||||
metadata_size_hint: Option<usize>,
|
||||
path: String,
|
||||
}
|
||||
|
||||
impl LazyParquetFileReader {
|
||||
pub fn new(object_store: ObjectStore, path: String) -> Self {
|
||||
pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
|
||||
LazyParquetFileReader {
|
||||
object_store,
|
||||
path,
|
||||
reader: None,
|
||||
file_size: None,
|
||||
metadata_size_hint,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,6 +137,7 @@ impl LazyParquetFileReader {
|
||||
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
|
||||
if self.reader.is_none() {
|
||||
let meta = self.object_store.stat(&self.path).await?;
|
||||
self.file_size = Some(meta.content_length());
|
||||
let reader = self
|
||||
.object_store
|
||||
.reader(&self.path)
|
||||
@@ -166,8 +174,19 @@ impl AsyncFileReader for LazyParquetFileReader {
|
||||
self.maybe_initialize()
|
||||
.await
|
||||
.map_err(|e| ParquetError::External(Box::new(e)))?;
|
||||
// Safety: Must initialized
|
||||
self.reader.as_mut().unwrap().get_metadata(options).await
|
||||
|
||||
let metadata_opts = options.map(|o| o.metadata_options().clone());
|
||||
let metadata_reader = ParquetMetaDataReader::new()
|
||||
.with_metadata_options(metadata_opts)
|
||||
.with_page_index_policy(PageIndexPolicy::from(
|
||||
options.is_some_and(|o| o.page_index()),
|
||||
))
|
||||
.with_prefetch_hint(self.metadata_size_hint);
|
||||
|
||||
let metadata = metadata_reader
|
||||
.load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
|
||||
.await?;
|
||||
Ok(Arc::new(metadata))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
43
src/mito2/src/cache/file_cache.rs
vendored
43
src/mito2/src/cache/file_cache.rs
vendored
@@ -288,6 +288,17 @@ pub(crate) struct FileCache {
|
||||
pub(crate) type FileCacheRef = Arc<FileCache>;
|
||||
|
||||
impl FileCache {
|
||||
/// Splits the configured total capacity between parquet and puffin caches
|
||||
/// without exceeding the requested overall budget.
|
||||
fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) {
|
||||
let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100;
|
||||
let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2);
|
||||
let puffin_capacity =
|
||||
desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity);
|
||||
let parquet_capacity = total_capacity - puffin_capacity;
|
||||
(parquet_capacity, puffin_capacity)
|
||||
}
|
||||
|
||||
/// Creates a new file cache.
|
||||
pub(crate) fn new(
|
||||
local_store: ObjectStore,
|
||||
@@ -302,14 +313,8 @@ impl FileCache {
|
||||
.unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
|
||||
let total_capacity = capacity.as_bytes();
|
||||
|
||||
// Convert percent to ratio and calculate capacity for each cache
|
||||
let index_ratio = index_percent as f64 / 100.0;
|
||||
let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
|
||||
let parquet_capacity = total_capacity - puffin_capacity;
|
||||
|
||||
// Ensure both capacities are at least 512MB
|
||||
let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
|
||||
let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
|
||||
let (parquet_capacity, puffin_capacity) =
|
||||
Self::split_cache_capacities(total_capacity, index_percent);
|
||||
|
||||
info!(
|
||||
"Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
|
||||
@@ -1064,6 +1069,28 @@ mod tests {
|
||||
assert_eq!(data, bytes[3].as_ref());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_cache_capacity_respects_total_budget() {
|
||||
let total_capacity = ReadableSize::mb(256).as_bytes();
|
||||
let (parquet_capacity, puffin_capacity) =
|
||||
FileCache::split_cache_capacities(total_capacity, 20);
|
||||
|
||||
assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
|
||||
assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity);
|
||||
assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_cache_capacity_keeps_split_when_total_allows_it() {
|
||||
let total_capacity = ReadableSize::gb(5).as_bytes();
|
||||
let (parquet_capacity, puffin_capacity) =
|
||||
FileCache::split_cache_capacities(total_capacity, 20);
|
||||
|
||||
assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
|
||||
assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity);
|
||||
assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_file_path() {
|
||||
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
|
||||
|
||||
@@ -137,7 +137,7 @@ struct CollectedParts {
|
||||
/// All parts in a bulk memtable.
|
||||
#[derive(Default)]
|
||||
struct BulkParts {
|
||||
/// Unordered small parts (< 1024 rows).
|
||||
/// Unordered small parts.
|
||||
unordered_part: UnorderedPart,
|
||||
/// All parts (raw and encoded).
|
||||
parts: Vec<BulkPartWrapper>,
|
||||
|
||||
@@ -50,6 +50,7 @@ use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource
|
||||
use crate::metrics::{
|
||||
PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
|
||||
};
|
||||
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
|
||||
const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
|
||||
|
||||
@@ -821,7 +822,11 @@ impl DataPart {
|
||||
/// Reads frozen data part and yields [DataBatch]es.
|
||||
pub fn read(&self) -> Result<DataPartReader> {
|
||||
match self {
|
||||
DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
|
||||
// Keep encoded memtable scans aligned with mito/DataFusion batch sizing instead of
|
||||
// parquet-rs's implicit 1024-row default.
|
||||
DataPart::Parquet(data_bytes) => {
|
||||
DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,9 +41,17 @@ pub mod writer;
|
||||
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
|
||||
|
||||
/// Default batch size to read parquet files.
|
||||
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
|
||||
///
|
||||
/// This is a runtime-only scan granularity, so we align it with DataFusion's
|
||||
/// default execution batch size to reduce rebatching and concatenation in the
|
||||
/// query pipeline.
|
||||
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
|
||||
/// Default row group size for parquet files.
|
||||
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
|
||||
///
|
||||
/// Keep the existing persisted/on-disk default stable. It intentionally stays
|
||||
/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
|
||||
/// batching without changing the row group layout of newly written SSTs.
|
||||
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
|
||||
|
||||
/// Parquet write options.
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -49,9 +49,6 @@ use snafu::ResultExt;
|
||||
use crate::error::DeserializeSnafu;
|
||||
use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
|
||||
|
||||
/// Maximum number of rows per output batch
|
||||
const ABSENT_BATCH_SIZE: usize = 8192;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct Absent {
|
||||
start: Millisecond,
|
||||
@@ -390,11 +387,13 @@ impl ExecutionPlan for AbsentExec {
|
||||
context: Arc<TaskContext>,
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
let batch_size = context.session_config().batch_size();
|
||||
let input = self.input.execute(partition, context)?;
|
||||
|
||||
Ok(Box::pin(AbsentStream {
|
||||
end: self.end,
|
||||
step: self.step,
|
||||
batch_size,
|
||||
time_index_column_index: self
|
||||
.input
|
||||
.schema()
|
||||
@@ -407,6 +406,8 @@ impl ExecutionPlan for AbsentExec {
|
||||
metric: baseline_metric,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec::new(),
|
||||
input_timestamps: Vec::new(),
|
||||
input_timestamp_offset: 0,
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: self.start,
|
||||
input_finished: false,
|
||||
@@ -441,6 +442,7 @@ impl DisplayAs for AbsentExec {
|
||||
pub struct AbsentStream {
|
||||
end: Millisecond,
|
||||
step: Millisecond,
|
||||
batch_size: usize,
|
||||
time_index_column_index: usize,
|
||||
output_schema: SchemaRef,
|
||||
fake_labels: Vec<(String, String)>,
|
||||
@@ -448,6 +450,9 @@ pub struct AbsentStream {
|
||||
metric: BaselineMetrics,
|
||||
// Buffer for streaming output timestamps
|
||||
output_timestamps: Vec<Millisecond>,
|
||||
// Current input timestamps being processed incrementally.
|
||||
input_timestamps: Vec<Millisecond>,
|
||||
input_timestamp_offset: usize,
|
||||
// Current timestamp in the output range
|
||||
output_ts_cursor: Millisecond,
|
||||
input_finished: bool,
|
||||
@@ -464,52 +469,53 @@ impl Stream for AbsentStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if !self.input_finished {
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(Ok(batch)) => {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_input_batch(&batch) {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
// If we have enough data for a batch, output it
|
||||
if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
|
||||
let timer = std::time::Instant::now();
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match result {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
|
||||
None => {
|
||||
self.input_finished = true;
|
||||
|
||||
let timer = std::time::Instant::now();
|
||||
// Process any remaining absent timestamps
|
||||
if let Err(e) = self.process_remaining_absent_timestamps() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
let result = self.flush_output_batch();
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
return Poll::Ready(result.transpose());
|
||||
}
|
||||
if self.has_pending_input_timestamps() {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_input_batch() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match self.flush_output_batch() {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
|
||||
if self.input_finished {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.process_remaining_absent_timestamps() {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
|
||||
match self.flush_output_batch() {
|
||||
Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
|
||||
Ok(None) => return Poll::Ready(None),
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(Ok(batch)) => {
|
||||
let timer = std::time::Instant::now();
|
||||
if let Err(e) = self.buffer_input_timestamps(&batch) {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
}
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
}
|
||||
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
|
||||
None => {
|
||||
self.input_finished = true;
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AbsentStream {
|
||||
fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
|
||||
// Extract timestamps from this batch
|
||||
fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> {
|
||||
let timestamp_array = batch.column(self.time_index_column_index);
|
||||
let milli_ts_array = arrow::compute::cast(
|
||||
timestamp_array,
|
||||
@@ -519,29 +525,52 @@ impl AbsentStream {
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
self.input_timestamps.clear();
|
||||
self.input_timestamps
|
||||
.extend_from_slice(timestamp_array.values());
|
||||
self.input_timestamp_offset = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_pending_input_timestamps(&self) -> bool {
|
||||
self.input_timestamp_offset < self.input_timestamps.len()
|
||||
}
|
||||
|
||||
fn process_input_batch(&mut self) -> DataFusionResult<()> {
|
||||
while self.input_timestamp_offset < self.input_timestamps.len() {
|
||||
let input_ts = self.input_timestamps[self.input_timestamp_offset];
|
||||
|
||||
// Process against current output cursor position
|
||||
for &input_ts in timestamp_array.values() {
|
||||
// Generate absent timestamps up to this input timestamp
|
||||
while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
|
||||
if self.output_timestamps.len() >= self.batch_size {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Skip the input timestamp if it matches our cursor
|
||||
if self.output_ts_cursor == input_ts {
|
||||
self.output_ts_cursor += self.step;
|
||||
}
|
||||
|
||||
self.input_timestamp_offset += 1;
|
||||
}
|
||||
|
||||
self.input_timestamps.clear();
|
||||
self.input_timestamp_offset = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> {
|
||||
// Generate all remaining absent timestamps (input is finished)
|
||||
while self.output_ts_cursor <= self.end {
|
||||
self.output_timestamps.push(self.output_ts_cursor);
|
||||
self.output_ts_cursor += self.step;
|
||||
|
||||
if self.output_timestamps.len() >= self.batch_size {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -551,11 +580,16 @@ impl AbsentStream {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let timestamps = if self.output_timestamps.len() <= self.batch_size {
|
||||
std::mem::take(&mut self.output_timestamps)
|
||||
} else {
|
||||
let remaining = self.output_timestamps.split_off(self.batch_size);
|
||||
std::mem::replace(&mut self.output_timestamps, remaining)
|
||||
};
|
||||
|
||||
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
|
||||
let num_rows = self.output_timestamps.len();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(
|
||||
self.output_timestamps.clone(),
|
||||
)) as _);
|
||||
let num_rows = timestamps.len();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _);
|
||||
columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _);
|
||||
|
||||
for (_, value) in self.fake_labels.iter() {
|
||||
@@ -567,7 +601,6 @@ impl AbsentStream {
|
||||
|
||||
let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?;
|
||||
|
||||
self.output_timestamps.clear();
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
@@ -580,7 +613,7 @@ mod tests {
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::catalog::memory::DataSourceExec;
|
||||
use datafusion::datasource::memory::MemorySourceConfig;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion::prelude::{SessionConfig, SessionContext};
|
||||
use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
|
||||
|
||||
use super::*;
|
||||
@@ -725,4 +758,146 @@ mod tests {
|
||||
// Should output all timestamps in range: 0, 1000, 2000
|
||||
assert_eq!(output_timestamps, vec![0, 1000, 2000]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_respects_session_batch_size_for_large_gap() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
|
||||
let value_array = Arc::new(Float64Array::from(vec![1.0]));
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
|
||||
|
||||
let memory_exec = DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
|
||||
));
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let absent_exec = AbsentExec {
|
||||
start: 0,
|
||||
end: 10,
|
||||
step: 1,
|
||||
time_index_column: "timestamp".to_string(),
|
||||
value_column: "value".to_string(),
|
||||
fake_labels: vec![],
|
||||
output_schema: output_schema.clone(),
|
||||
input: Arc::new(memory_exec),
|
||||
properties: Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
)),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
};
|
||||
|
||||
let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
|
||||
let task_ctx = session_ctx.task_ctx();
|
||||
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
|
||||
|
||||
let mut batch_sizes = Vec::new();
|
||||
let mut output_timestamps = Vec::new();
|
||||
while let Some(batch_result) = stream.next().await {
|
||||
let batch = batch_result.unwrap();
|
||||
batch_sizes.push(batch.num_rows());
|
||||
|
||||
let ts_array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
for i in 0..ts_array.len() {
|
||||
if !ts_array.is_null(i) {
|
||||
output_timestamps.push(ts_array.value(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(batch_sizes, vec![3, 3, 3, 1]);
|
||||
assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_resumes_same_input_timestamp_after_batch_flush() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9]));
|
||||
let value_array = Arc::new(Float64Array::from(vec![1.0]));
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap();
|
||||
|
||||
let memory_exec = DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
|
||||
));
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"timestamp",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
true,
|
||||
),
|
||||
Field::new("value", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let absent_exec = AbsentExec {
|
||||
start: 0,
|
||||
end: 9,
|
||||
step: 1,
|
||||
time_index_column: "timestamp".to_string(),
|
||||
value_column: "value".to_string(),
|
||||
fake_labels: vec![],
|
||||
output_schema: output_schema.clone(),
|
||||
input: Arc::new(memory_exec),
|
||||
properties: Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
)),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
};
|
||||
|
||||
let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3));
|
||||
let task_ctx = session_ctx.task_ctx();
|
||||
let mut stream = absent_exec.execute(0, task_ctx).unwrap();
|
||||
|
||||
let mut output_timestamps = Vec::new();
|
||||
while let Some(batch_result) = stream.next().await {
|
||||
let batch = batch_result.unwrap();
|
||||
let ts_array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
for i in 0..ts_array.len() {
|
||||
if !ts_array.is_null(i) {
|
||||
output_timestamps.push(ts_array.value(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -836,6 +836,7 @@ impl ExecutionPlan for RangeSelectExec {
|
||||
context: Arc<TaskContext>,
|
||||
) -> DfResult<DfSendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
let batch_size = context.session_config().batch_size();
|
||||
let input = self.input.execute(partition, context)?;
|
||||
let schema = input.schema();
|
||||
let time_index = schema
|
||||
@@ -852,6 +853,7 @@ impl ExecutionPlan for RangeSelectExec {
|
||||
.collect(),
|
||||
)?;
|
||||
Ok(Box::pin(RangeSelectStream {
|
||||
batch_size,
|
||||
schema: self.schema.clone(),
|
||||
range_exec: self.range_exec.clone(),
|
||||
input,
|
||||
@@ -868,6 +870,8 @@ impl ExecutionPlan for RangeSelectExec {
|
||||
metric: baseline_metric,
|
||||
schema_project: self.schema_project.clone(),
|
||||
schema_before_project: self.schema_before_project.clone(),
|
||||
output_batch: None,
|
||||
output_batch_offset: 0,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -881,6 +885,7 @@ impl ExecutionPlan for RangeSelectExec {
|
||||
}
|
||||
|
||||
struct RangeSelectStream {
|
||||
batch_size: usize,
|
||||
/// the schema of output column
|
||||
schema: SchemaRef,
|
||||
range_exec: Vec<RangeFnExec>,
|
||||
@@ -907,6 +912,8 @@ struct RangeSelectStream {
|
||||
metric: BaselineMetrics,
|
||||
schema_project: Option<Vec<usize>>,
|
||||
schema_before_project: SchemaRef,
|
||||
output_batch: Option<RecordBatch>,
|
||||
output_batch_offset: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -1149,6 +1156,36 @@ impl RangeSelectStream {
|
||||
};
|
||||
Ok(project_output)
|
||||
}
|
||||
|
||||
fn next_output_batch(&mut self) -> DfResult<Option<RecordBatch>> {
|
||||
if self.output_batch.is_none() {
|
||||
self.output_batch = Some(self.generate_output()?);
|
||||
self.output_batch_offset = 0;
|
||||
}
|
||||
|
||||
let num_rows = self.output_batch.as_ref().unwrap().num_rows();
|
||||
if num_rows == 0 {
|
||||
self.output_batch = None;
|
||||
self.output_batch_offset = 0;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if self.output_batch_offset == 0 && num_rows <= self.batch_size {
|
||||
return Ok(self.output_batch.take());
|
||||
}
|
||||
|
||||
let offset = self.output_batch_offset;
|
||||
let len = (num_rows - offset).min(self.batch_size);
|
||||
let batch = self.output_batch.as_ref().unwrap().slice(offset, len);
|
||||
self.output_batch_offset += len;
|
||||
|
||||
if self.output_batch_offset >= num_rows {
|
||||
self.output_batch = None;
|
||||
self.output_batch_offset = 0;
|
||||
}
|
||||
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
|
||||
enum ExecutionState {
|
||||
@@ -1191,13 +1228,19 @@ impl Stream for RangeSelectStream {
|
||||
}
|
||||
}
|
||||
ExecutionState::ProducingOutput => {
|
||||
let result = self.generate_output();
|
||||
let result = self.next_output_batch();
|
||||
return match result {
|
||||
// made output
|
||||
Ok(batch) => {
|
||||
self.exec_state = ExecutionState::Done;
|
||||
Ok(Some(batch)) => {
|
||||
if self.output_batch.is_none() {
|
||||
self.exec_state = ExecutionState::Done;
|
||||
}
|
||||
Poll::Ready(Some(Ok(batch)))
|
||||
}
|
||||
Ok(None) => {
|
||||
self.exec_state = ExecutionState::Done;
|
||||
Poll::Ready(None)
|
||||
}
|
||||
// error making output
|
||||
Err(error) => Poll::Ready(Some(Err(error))),
|
||||
};
|
||||
@@ -1251,7 +1294,7 @@ mod test {
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_physical_expr::PhysicalSortExpr;
|
||||
use datafusion_physical_expr::expressions::Column;
|
||||
use datatypes::arrow::array::TimestampMillisecondArray;
|
||||
use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray};
|
||||
use datatypes::arrow_array::StringArray;
|
||||
|
||||
use super::*;
|
||||
@@ -1313,15 +1356,49 @@ mod test {
|
||||
))
|
||||
}
|
||||
|
||||
async fn do_range_select_test(
|
||||
fn prepare_empty_test_data(is_float: bool) -> DataSourceExec {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
|
||||
Field::new(
|
||||
"value",
|
||||
if is_float {
|
||||
DataType::Float64
|
||||
} else {
|
||||
DataType::Int64
|
||||
},
|
||||
true,
|
||||
),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
]));
|
||||
let timestamp_column: Arc<dyn Array> =
|
||||
Arc::new(TimestampMillisecondArray::from(Vec::<i64>::new())) as _;
|
||||
let value_column: Arc<dyn Array> = if is_float {
|
||||
Arc::new(Float64Array::from(Vec::<Option<f64>>::new())) as _
|
||||
} else {
|
||||
Arc::new(Int64Array::from(Vec::<Option<i64>>::new())) as _
|
||||
};
|
||||
let host_column: Arc<dyn Array> =
|
||||
Arc::new(StringArray::from(Vec::<Option<&str>>::new())) as _;
|
||||
let data = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![timestamp_column, value_column, host_column],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn collect_range_select_test(
|
||||
range1: Millisecond,
|
||||
range2: Millisecond,
|
||||
align: Millisecond,
|
||||
fill: Option<Fill>,
|
||||
is_float: bool,
|
||||
is_gap: bool,
|
||||
expected: String,
|
||||
) {
|
||||
batch_size: usize,
|
||||
) -> Vec<RecordBatch> {
|
||||
let data_type = if is_float {
|
||||
DataType::Float64
|
||||
} else {
|
||||
@@ -1412,11 +1489,25 @@ mod test {
|
||||
.into(),
|
||||
range_select_exec,
|
||||
);
|
||||
let session_context = SessionContext::default();
|
||||
let session_context = SessionContext::new_with_config(
|
||||
datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size),
|
||||
);
|
||||
datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn do_range_select_test(
|
||||
range1: Millisecond,
|
||||
range2: Millisecond,
|
||||
align: Millisecond,
|
||||
fill: Option<Fill>,
|
||||
is_float: bool,
|
||||
is_gap: bool,
|
||||
expected: String,
|
||||
) {
|
||||
let result =
|
||||
datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await;
|
||||
|
||||
let result_literal = arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
@@ -1700,6 +1791,88 @@ mod test {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn range_select_respects_session_batch_size() {
|
||||
let result =
|
||||
collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await;
|
||||
|
||||
let row_counts = result
|
||||
.iter()
|
||||
.map(|batch| batch.num_rows())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(vec![3, 3, 3, 3], row_counts);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn range_select_skips_empty_output_batch() {
|
||||
let memory_exec = Arc::new(prepare_empty_test_data(true));
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("MIN(value)", DataType::Float64, true),
|
||||
Field::new("MAX(value)", DataType::Float64, true),
|
||||
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
]));
|
||||
let cache = Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
));
|
||||
let input_schema = memory_exec.schema().clone();
|
||||
let range_select_exec = Arc::new(RangeSelectExec {
|
||||
input: memory_exec,
|
||||
range_exec: vec![
|
||||
RangeFnExec {
|
||||
expr: Arc::new(
|
||||
AggregateExprBuilder::new(
|
||||
min_max::min_udaf(),
|
||||
vec![Arc::new(Column::new("value", 1))],
|
||||
)
|
||||
.schema(input_schema.clone())
|
||||
.alias("MIN(value)")
|
||||
.build()
|
||||
.unwrap(),
|
||||
),
|
||||
range: 10_000,
|
||||
fill: Some(Fill::Null),
|
||||
need_cast: None,
|
||||
},
|
||||
RangeFnExec {
|
||||
expr: Arc::new(
|
||||
AggregateExprBuilder::new(
|
||||
min_max::max_udaf(),
|
||||
vec![Arc::new(Column::new("value", 1))],
|
||||
)
|
||||
.schema(input_schema)
|
||||
.alias("MAX(value)")
|
||||
.build()
|
||||
.unwrap(),
|
||||
),
|
||||
range: 5_000,
|
||||
fill: Some(Fill::Null),
|
||||
need_cast: None,
|
||||
},
|
||||
],
|
||||
align: 5_000,
|
||||
align_to: 0,
|
||||
by: vec![Arc::new(Column::new("host", 2))],
|
||||
time_index: TIME_INDEX_COLUMN.to_string(),
|
||||
schema: schema.clone(),
|
||||
schema_before_project: schema.clone(),
|
||||
schema_project: None,
|
||||
by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
cache,
|
||||
});
|
||||
let session_context = SessionContext::new();
|
||||
let result =
|
||||
datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fill_test() {
|
||||
assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
|
||||
|
||||
Reference in New Issue
Block a user